Skip to content

Commit 8ef239b

Browse files
authoredApr 20, 2020
Improve protocol-level receiving code (#9617)
1 parent c2ac7b1 commit 8ef239b

File tree

3 files changed

+126
-130
lines changed

3 files changed

+126
-130
lines changed
 

‎src/network/connection.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -1173,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
11731173
m_bc_peerhandler(peerhandler)
11741174

11751175
{
1176-
m_udpSocket.setTimeoutMs(5);
1176+
/* Amount of time Receive() will wait for data, this is entirely different
1177+
* from the connection timeout */
1178+
m_udpSocket.setTimeoutMs(500);
11771179

11781180
m_sendThread->setParent(this);
11791181
m_receiveThread->setParent(this);

‎src/network/connectionthreads.cpp

+122-128
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run()
812812
ThreadIdentifier);
813813
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
814814

815+
// use IPv6 minimum allowed MTU as receive buffer size as this is
816+
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
817+
// infrastructure
818+
const unsigned int packet_maxsize = 1500;
819+
SharedBuffer<u8> packetdata(packet_maxsize);
820+
821+
bool packet_queued = true;
822+
815823
#ifdef DEBUG_CONNECTION_KBPS
816824
u64 curtime = porting::getTimeMs();
817825
u64 lasttime = curtime;
@@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run()
830838
#endif
831839

832840
/* receive packets */
833-
receive();
841+
receive(packetdata, packet_queued);
834842

835843
#ifdef DEBUG_CONNECTION_KBPS
836844
debug_print_timer += dtime;
@@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run()
892900
}
893901

894902
// Receive packets from the network and buffers and create ConnectionEvents
895-
void ConnectionReceiveThread::receive()
903+
void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
904+
bool &packet_queued)
896905
{
897-
// use IPv6 minimum allowed MTU as receive buffer size as this is
898-
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
899-
// infrastructure
900-
unsigned int packet_maxsize = 1500;
901-
SharedBuffer<u8> packetdata(packet_maxsize);
902-
903-
bool packet_queued = true;
904-
905-
unsigned int loop_count = 0;
906-
907-
/* first of all read packets from socket */
908-
/* check for incoming data available */
909-
while ((loop_count < 10) &&
910-
(m_connection->m_udpSocket.WaitData(50))) {
911-
loop_count++;
912-
try {
913-
if (packet_queued) {
914-
bool data_left = true;
915-
session_t peer_id;
916-
SharedBuffer<u8> resultdata;
917-
while (data_left) {
918-
try {
919-
data_left = getFromBuffers(peer_id, resultdata);
920-
if (data_left) {
921-
ConnectionEvent e;
922-
e.dataReceived(peer_id, resultdata);
923-
m_connection->putEvent(e);
924-
}
925-
}
926-
catch (ProcessedSilentlyException &e) {
927-
/* try reading again */
906+
try {
907+
// First, see if there any buffered packets we can process now
908+
if (packet_queued) {
909+
bool data_left = true;
910+
session_t peer_id;
911+
SharedBuffer<u8> resultdata;
912+
while (data_left) {
913+
try {
914+
data_left = getFromBuffers(peer_id, resultdata);
915+
if (data_left) {
916+
ConnectionEvent e;
917+
e.dataReceived(peer_id, resultdata);
918+
m_connection->putEvent(e);
928919
}
929920
}
930-
packet_queued = false;
931-
}
932-
933-
Address sender;
934-
s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
935-
packet_maxsize);
936-
937-
if ((received_size < BASE_HEADER_SIZE) ||
938-
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
939-
LOG(derr_con << m_connection->getDesc()
940-
<< "Receive(): Invalid incoming packet, "
941-
<< "size: " << received_size
942-
<< ", protocol: "
943-
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
944-
<< std::endl);
945-
continue;
921+
catch (ProcessedSilentlyException &e) {
922+
/* try reading again */
923+
}
946924
}
925+
packet_queued = false;
926+
}
947927

948-
session_t peer_id = readPeerId(*packetdata);
949-
u8 channelnum = readChannel(*packetdata);
928+
// Call Receive() to wait for incoming data
929+
Address sender;
930+
s32 received_size = m_connection->m_udpSocket.Receive(sender,
931+
*packetdata, packetdata.getSize());
932+
if (received_size < 0)
933+
return;
950934

951-
if (channelnum > CHANNEL_COUNT - 1) {
952-
LOG(derr_con << m_connection->getDesc()
953-
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
954-
throw InvalidIncomingDataException("Channel doesn't exist");
955-
}
935+
if ((received_size < BASE_HEADER_SIZE) ||
936+
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
937+
LOG(derr_con << m_connection->getDesc()
938+
<< "Receive(): Invalid incoming packet, "
939+
<< "size: " << received_size
940+
<< ", protocol: "
941+
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
942+
<< std::endl);
943+
return;
944+
}
956945

957-
/* Try to identify peer by sender address (may happen on join) */
958-
if (peer_id == PEER_ID_INEXISTENT) {
959-
peer_id = m_connection->lookupPeer(sender);
960-
// We do not have to remind the peer of its
961-
// peer id as the CONTROLTYPE_SET_PEER_ID
962-
// command was sent reliably.
963-
}
946+
session_t peer_id = readPeerId(*packetdata);
947+
u8 channelnum = readChannel(*packetdata);
964948

965-
/* The peer was not found in our lists. Add it. */
966-
if (peer_id == PEER_ID_INEXISTENT) {
967-
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
968-
}
949+
if (channelnum > CHANNEL_COUNT - 1) {
950+
LOG(derr_con << m_connection->getDesc()
951+
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
952+
return;
953+
}
969954

970-
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
955+
/* Try to identify peer by sender address (may happen on join) */
956+
if (peer_id == PEER_ID_INEXISTENT) {
957+
peer_id = m_connection->lookupPeer(sender);
958+
// We do not have to remind the peer of its
959+
// peer id as the CONTROLTYPE_SET_PEER_ID
960+
// command was sent reliably.
961+
}
971962

972-
if (!peer) {
973-
LOG(dout_con << m_connection->getDesc()
974-
<< " got packet from unknown peer_id: "
975-
<< peer_id << " Ignoring." << std::endl);
976-
continue;
977-
}
963+
/* The peer was not found in our lists. Add it. */
964+
if (peer_id == PEER_ID_INEXISTENT) {
965+
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
966+
}
978967

979-
// Validate peer address
968+
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
969+
if (!peer) {
970+
LOG(dout_con << m_connection->getDesc()
971+
<< " got packet from unknown peer_id: "
972+
<< peer_id << " Ignoring." << std::endl);
973+
return;
974+
}
980975

981-
Address peer_address;
976+
// Validate peer address
982977

983-
if (peer->getAddress(MTP_UDP, peer_address)) {
984-
if (peer_address != sender) {
985-
LOG(derr_con << m_connection->getDesc()
986-
<< m_connection->getDesc()
987-
<< " Peer " << peer_id << " sending from different address."
988-
" Ignoring." << std::endl);
989-
continue;
990-
}
991-
} else {
992-
993-
bool invalid_address = true;
994-
if (invalid_address) {
995-
LOG(derr_con << m_connection->getDesc()
996-
<< m_connection->getDesc()
997-
<< " Peer " << peer_id << " unknown."
998-
" Ignoring." << std::endl);
999-
continue;
1000-
}
978+
Address peer_address;
979+
if (peer->getAddress(MTP_UDP, peer_address)) {
980+
if (peer_address != sender) {
981+
LOG(derr_con << m_connection->getDesc()
982+
<< " Peer " << peer_id << " sending from different address."
983+
" Ignoring." << std::endl);
984+
return;
1001985
}
986+
} else {
987+
LOG(derr_con << m_connection->getDesc()
988+
<< " Peer " << peer_id << " doesn't have an address?!"
989+
" Ignoring." << std::endl);
990+
return;
991+
}
1002992

1003-
peer->ResetTimeout();
1004-
1005-
Channel *channel = 0;
993+
peer->ResetTimeout();
1006994

1007-
if (dynamic_cast<UDPPeer *>(&peer) != 0) {
1008-
channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
1009-
}
995+
Channel *channel = nullptr;
996+
if (dynamic_cast<UDPPeer *>(&peer)) {
997+
channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
998+
} else {
999+
LOG(derr_con << m_connection->getDesc()
1000+
<< "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
1001+
" Ignoring." << std::endl);
1002+
return;
1003+
}
10101004

1011-
if (channel != 0) {
1012-
channel->UpdateBytesReceived(received_size);
1013-
}
1005+
channel->UpdateBytesReceived(received_size);
10141006

1015-
// Throw the received packet to channel->processPacket()
1007+
// Throw the received packet to channel->processPacket()
10161008

1017-
// Make a new SharedBuffer from the data without the base headers
1018-
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1019-
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1020-
strippeddata.getSize());
1009+
// Make a new SharedBuffer from the data without the base headers
1010+
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1011+
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1012+
strippeddata.getSize());
10211013

1022-
try {
1023-
// Process it (the result is some data with no headers made by us)
1024-
SharedBuffer<u8> resultdata = processPacket
1025-
(channel, strippeddata, peer_id, channelnum, false);
1014+
try {
1015+
// Process it (the result is some data with no headers made by us)
1016+
SharedBuffer<u8> resultdata = processPacket
1017+
(channel, strippeddata, peer_id, channelnum, false);
10261018

1027-
LOG(dout_con << m_connection->getDesc()
1028-
<< " ProcessPacket from peer_id: " << peer_id
1029-
<< ", channel: " << (u32)channelnum << ", returned "
1030-
<< resultdata.getSize() << " bytes" << std::endl);
1019+
LOG(dout_con << m_connection->getDesc()
1020+
<< " ProcessPacket from peer_id: " << peer_id
1021+
<< ", channel: " << (u32)channelnum << ", returned "
1022+
<< resultdata.getSize() << " bytes" << std::endl);
10311023

1032-
ConnectionEvent e;
1033-
e.dataReceived(peer_id, resultdata);
1034-
m_connection->putEvent(e);
1035-
}
1036-
catch (ProcessedSilentlyException &e) {
1037-
}
1038-
catch (ProcessedQueued &e) {
1039-
packet_queued = true;
1040-
}
1041-
}
1042-
catch (InvalidIncomingDataException &e) {
1024+
ConnectionEvent e;
1025+
e.dataReceived(peer_id, resultdata);
1026+
m_connection->putEvent(e);
10431027
}
10441028
catch (ProcessedSilentlyException &e) {
10451029
}
1030+
catch (ProcessedQueued &e) {
1031+
// we set it to true anyway (see below)
1032+
}
1033+
1034+
/* Every time we receive a packet it can happen that a previously
1035+
* buffered packet is now ready to process. */
1036+
packet_queued = true;
1037+
}
1038+
catch (InvalidIncomingDataException &e) {
10461039
}
10471040
}
10481041

@@ -1189,7 +1182,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
11891182
m_connection->TriggerSend();
11901183
} catch (NotFoundException &e) {
11911184
LOG(derr_con << m_connection->getDesc()
1192-
<< "WARNING: ACKed packet not in outgoing queue" << std::endl);
1185+
<< "WARNING: ACKed packet not in outgoing queue"
1186+
<< " seqnum=" << seqnum << std::endl);
11931187
channel->UpdatePacketTooLateCounter();
11941188
}
11951189

‎src/network/connectionthreads.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ConnectionReceiveThread : public Thread
101101
}
102102

103103
private:
104-
void receive();
104+
void receive(SharedBuffer<u8> &packetdata, bool &packet_queued);
105105

106106
// Returns next data from a buffer if possible
107107
// If found, returns true; if not, false.

0 commit comments

Comments
 (0)
Please sign in to comment.