@@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run()
812
812
ThreadIdentifier);
813
813
PROFILE (ThreadIdentifier << " ConnectionReceive: [" << m_connection->getDesc () << " ]" );
814
814
815
+ // use IPv6 minimum allowed MTU as receive buffer size as this is
816
+ // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
817
+ // infrastructure
818
+ const unsigned int packet_maxsize = 1500 ;
819
+ SharedBuffer<u8> packetdata (packet_maxsize);
820
+
821
+ bool packet_queued = true ;
822
+
815
823
#ifdef DEBUG_CONNECTION_KBPS
816
824
u64 curtime = porting::getTimeMs ();
817
825
u64 lasttime = curtime;
@@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run()
830
838
#endif
831
839
832
840
/* receive packets */
833
- receive ();
841
+ receive (packetdata, packet_queued );
834
842
835
843
#ifdef DEBUG_CONNECTION_KBPS
836
844
debug_print_timer += dtime;
@@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run()
892
900
}
893
901
894
902
// Receive packets from the network and buffers and create ConnectionEvents
895
- void ConnectionReceiveThread::receive ()
903
+ void ConnectionReceiveThread::receive (SharedBuffer<u8> &packetdata,
904
+ bool &packet_queued)
896
905
{
897
- // use IPv6 minimum allowed MTU as receive buffer size as this is
898
- // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
899
- // infrastructure
900
- unsigned int packet_maxsize = 1500 ;
901
- SharedBuffer<u8> packetdata (packet_maxsize);
902
-
903
- bool packet_queued = true ;
904
-
905
- unsigned int loop_count = 0 ;
906
-
907
- /* first of all read packets from socket */
908
- /* check for incoming data available */
909
- while ((loop_count < 10 ) &&
910
- (m_connection->m_udpSocket .WaitData (50 ))) {
911
- loop_count++;
912
- try {
913
- if (packet_queued) {
914
- bool data_left = true ;
915
- session_t peer_id;
916
- SharedBuffer<u8> resultdata;
917
- while (data_left) {
918
- try {
919
- data_left = getFromBuffers (peer_id, resultdata);
920
- if (data_left) {
921
- ConnectionEvent e;
922
- e.dataReceived (peer_id, resultdata);
923
- m_connection->putEvent (e);
924
- }
925
- }
926
- catch (ProcessedSilentlyException &e) {
927
- /* try reading again */
906
+ try {
907
+ // First, see if there any buffered packets we can process now
908
+ if (packet_queued) {
909
+ bool data_left = true ;
910
+ session_t peer_id;
911
+ SharedBuffer<u8> resultdata;
912
+ while (data_left) {
913
+ try {
914
+ data_left = getFromBuffers (peer_id, resultdata);
915
+ if (data_left) {
916
+ ConnectionEvent e;
917
+ e.dataReceived (peer_id, resultdata);
918
+ m_connection->putEvent (e);
928
919
}
929
920
}
930
- packet_queued = false ;
931
- }
932
-
933
- Address sender;
934
- s32 received_size = m_connection->m_udpSocket .Receive (sender, *packetdata,
935
- packet_maxsize);
936
-
937
- if ((received_size < BASE_HEADER_SIZE) ||
938
- (readU32 (&packetdata[0 ]) != m_connection->GetProtocolID ())) {
939
- LOG (derr_con << m_connection->getDesc ()
940
- << " Receive(): Invalid incoming packet, "
941
- << " size: " << received_size
942
- << " , protocol: "
943
- << ((received_size >= 4 ) ? readU32 (&packetdata[0 ]) : -1 )
944
- << std::endl);
945
- continue ;
921
+ catch (ProcessedSilentlyException &e) {
922
+ /* try reading again */
923
+ }
946
924
}
925
+ packet_queued = false ;
926
+ }
947
927
948
- session_t peer_id = readPeerId (*packetdata);
949
- u8 channelnum = readChannel (*packetdata);
928
+ // Call Receive() to wait for incoming data
929
+ Address sender;
930
+ s32 received_size = m_connection->m_udpSocket .Receive (sender,
931
+ *packetdata, packetdata.getSize ());
932
+ if (received_size < 0 )
933
+ return ;
950
934
951
- if (channelnum > CHANNEL_COUNT - 1 ) {
952
- LOG (derr_con << m_connection->getDesc ()
953
- << " Receive(): Invalid channel " << (u32)channelnum << std::endl);
954
- throw InvalidIncomingDataException (" Channel doesn't exist" );
955
- }
935
+ if ((received_size < BASE_HEADER_SIZE) ||
936
+ (readU32 (&packetdata[0 ]) != m_connection->GetProtocolID ())) {
937
+ LOG (derr_con << m_connection->getDesc ()
938
+ << " Receive(): Invalid incoming packet, "
939
+ << " size: " << received_size
940
+ << " , protocol: "
941
+ << ((received_size >= 4 ) ? readU32 (&packetdata[0 ]) : -1 )
942
+ << std::endl);
943
+ return ;
944
+ }
956
945
957
- /* Try to identify peer by sender address (may happen on join) */
958
- if (peer_id == PEER_ID_INEXISTENT) {
959
- peer_id = m_connection->lookupPeer (sender);
960
- // We do not have to remind the peer of its
961
- // peer id as the CONTROLTYPE_SET_PEER_ID
962
- // command was sent reliably.
963
- }
946
+ session_t peer_id = readPeerId (*packetdata);
947
+ u8 channelnum = readChannel (*packetdata);
964
948
965
- /* The peer was not found in our lists. Add it. */
966
- if (peer_id == PEER_ID_INEXISTENT) {
967
- peer_id = m_connection->createPeer (sender, MTP_MINETEST_RELIABLE_UDP, 0 );
968
- }
949
+ if (channelnum > CHANNEL_COUNT - 1 ) {
950
+ LOG (derr_con << m_connection->getDesc ()
951
+ << " Receive(): Invalid channel " << (u32)channelnum << std::endl);
952
+ return ;
953
+ }
969
954
970
- PeerHelper peer = m_connection->getPeerNoEx (peer_id);
955
+ /* Try to identify peer by sender address (may happen on join) */
956
+ if (peer_id == PEER_ID_INEXISTENT) {
957
+ peer_id = m_connection->lookupPeer (sender);
958
+ // We do not have to remind the peer of its
959
+ // peer id as the CONTROLTYPE_SET_PEER_ID
960
+ // command was sent reliably.
961
+ }
971
962
972
- if (!peer) {
973
- LOG (dout_con << m_connection->getDesc ()
974
- << " got packet from unknown peer_id: "
975
- << peer_id << " Ignoring." << std::endl);
976
- continue ;
977
- }
963
+ /* The peer was not found in our lists. Add it. */
964
+ if (peer_id == PEER_ID_INEXISTENT) {
965
+ peer_id = m_connection->createPeer (sender, MTP_MINETEST_RELIABLE_UDP, 0 );
966
+ }
978
967
979
- // Validate peer address
968
+ PeerHelper peer = m_connection->getPeerNoEx (peer_id);
969
+ if (!peer) {
970
+ LOG (dout_con << m_connection->getDesc ()
971
+ << " got packet from unknown peer_id: "
972
+ << peer_id << " Ignoring." << std::endl);
973
+ return ;
974
+ }
980
975
981
- Address peer_address;
976
+ // Validate peer address
982
977
983
- if (peer->getAddress (MTP_UDP, peer_address)) {
984
- if (peer_address != sender) {
985
- LOG (derr_con << m_connection->getDesc ()
986
- << m_connection->getDesc ()
987
- << " Peer " << peer_id << " sending from different address."
988
- " Ignoring." << std::endl);
989
- continue ;
990
- }
991
- } else {
992
-
993
- bool invalid_address = true ;
994
- if (invalid_address) {
995
- LOG (derr_con << m_connection->getDesc ()
996
- << m_connection->getDesc ()
997
- << " Peer " << peer_id << " unknown."
998
- " Ignoring." << std::endl);
999
- continue ;
1000
- }
978
+ Address peer_address;
979
+ if (peer->getAddress (MTP_UDP, peer_address)) {
980
+ if (peer_address != sender) {
981
+ LOG (derr_con << m_connection->getDesc ()
982
+ << " Peer " << peer_id << " sending from different address."
983
+ " Ignoring." << std::endl);
984
+ return ;
1001
985
}
986
+ } else {
987
+ LOG (derr_con << m_connection->getDesc ()
988
+ << " Peer " << peer_id << " doesn't have an address?!"
989
+ " Ignoring." << std::endl);
990
+ return ;
991
+ }
1002
992
1003
- peer->ResetTimeout ();
1004
-
1005
- Channel *channel = 0 ;
993
+ peer->ResetTimeout ();
1006
994
1007
- if (dynamic_cast <UDPPeer *>(&peer) != 0 ) {
1008
- channel = &(dynamic_cast <UDPPeer *>(&peer)->channels [channelnum]);
1009
- }
995
+ Channel *channel = nullptr ;
996
+ if (dynamic_cast <UDPPeer *>(&peer)) {
997
+ channel = &dynamic_cast <UDPPeer *>(&peer)->channels [channelnum];
998
+ } else {
999
+ LOG (derr_con << m_connection->getDesc ()
1000
+ << " Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
1001
+ " Ignoring." << std::endl);
1002
+ return ;
1003
+ }
1010
1004
1011
- if (channel != 0 ) {
1012
- channel->UpdateBytesReceived (received_size);
1013
- }
1005
+ channel->UpdateBytesReceived (received_size);
1014
1006
1015
- // Throw the received packet to channel->processPacket()
1007
+ // Throw the received packet to channel->processPacket()
1016
1008
1017
- // Make a new SharedBuffer from the data without the base headers
1018
- SharedBuffer<u8> strippeddata (received_size - BASE_HEADER_SIZE);
1019
- memcpy (*strippeddata, &packetdata[BASE_HEADER_SIZE],
1020
- strippeddata.getSize ());
1009
+ // Make a new SharedBuffer from the data without the base headers
1010
+ SharedBuffer<u8> strippeddata (received_size - BASE_HEADER_SIZE);
1011
+ memcpy (*strippeddata, &packetdata[BASE_HEADER_SIZE],
1012
+ strippeddata.getSize ());
1021
1013
1022
- try {
1023
- // Process it (the result is some data with no headers made by us)
1024
- SharedBuffer<u8> resultdata = processPacket
1025
- (channel, strippeddata, peer_id, channelnum, false );
1014
+ try {
1015
+ // Process it (the result is some data with no headers made by us)
1016
+ SharedBuffer<u8> resultdata = processPacket
1017
+ (channel, strippeddata, peer_id, channelnum, false );
1026
1018
1027
- LOG (dout_con << m_connection->getDesc ()
1028
- << " ProcessPacket from peer_id: " << peer_id
1029
- << " , channel: " << (u32)channelnum << " , returned "
1030
- << resultdata.getSize () << " bytes" << std::endl);
1019
+ LOG (dout_con << m_connection->getDesc ()
1020
+ << " ProcessPacket from peer_id: " << peer_id
1021
+ << " , channel: " << (u32)channelnum << " , returned "
1022
+ << resultdata.getSize () << " bytes" << std::endl);
1031
1023
1032
- ConnectionEvent e;
1033
- e.dataReceived (peer_id, resultdata);
1034
- m_connection->putEvent (e);
1035
- }
1036
- catch (ProcessedSilentlyException &e) {
1037
- }
1038
- catch (ProcessedQueued &e) {
1039
- packet_queued = true ;
1040
- }
1041
- }
1042
- catch (InvalidIncomingDataException &e) {
1024
+ ConnectionEvent e;
1025
+ e.dataReceived (peer_id, resultdata);
1026
+ m_connection->putEvent (e);
1043
1027
}
1044
1028
catch (ProcessedSilentlyException &e) {
1045
1029
}
1030
+ catch (ProcessedQueued &e) {
1031
+ // we set it to true anyway (see below)
1032
+ }
1033
+
1034
+ /* Every time we receive a packet it can happen that a previously
1035
+ * buffered packet is now ready to process. */
1036
+ packet_queued = true ;
1037
+ }
1038
+ catch (InvalidIncomingDataException &e) {
1046
1039
}
1047
1040
}
1048
1041
@@ -1189,7 +1182,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
1189
1182
m_connection->TriggerSend ();
1190
1183
} catch (NotFoundException &e) {
1191
1184
LOG (derr_con << m_connection->getDesc ()
1192
- << " WARNING: ACKed packet not in outgoing queue" << std::endl);
1185
+ << " WARNING: ACKed packet not in outgoing queue"
1186
+ << " seqnum=" << seqnum << std::endl);
1193
1187
channel->UpdatePacketTooLateCounter ();
1194
1188
}
1195
1189
0 commit comments