Skip to content

Commit b581fc4

Browse files
aggarw13sarenameas
andauthored
Bugfix: Fix interrupted network read operation (#120)
### Problem The `MQTT_ProcessLoop` and `MQTT_ReceiveLoop` read incoming MQTT packet payload over the network by calling the `recvExact` function. The `recvExact` function can be called multiple times to read the expected number of bytes for the MQTT packet but it also implements a timeout functionality of receiving the expected number of payload within the timeout value passed to the function. This causes problems when the `Transport_Recv` call returns less than requested number of bytes, and there is a timeout (for example, when calling `MQTT_ProcessLoop` with 0ms duration) which causes the function to assume failure instead of reading the remaining payload of the MQTT packet by calling `Transport_Recv` again. Thus, in such cases, the MQTT connection is severed prematurely even though there is a high probability of receiving the remaining bytes of the MQTT packet over the network. ### Solution Instead of implementing a timeout on the entire duration of receiving the expected number of remaining MQTT packet bytes in `recvExact`, the use of timeout is being changed to be relevant only on the total time of receiving 0 bytes over the network over multiple calls to `Transport_Recv`. As this modified meaning of the timeout duration is now unrelated to the timeout duration that the `MQTT_ProcessLoop` or `MQTT_ReceiveLoop` functions are called, a new configuration constant for the `recvExact` timeout value, `MQTT_RECV_POLLING_TIMEOUT_MS`, has been added to the library which will carry a default value of 10ms. Co-authored-by: Sarena Meas <[email protected]>
1 parent 77e7c01 commit b581fc4

File tree

12 files changed

+207
-38
lines changed

12 files changed

+207
-38
lines changed

docs/doxygen/pages.dox

+3
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ Some configuration settings are C pre-processor constants, and some are function
188188
@section MQTT_PINGRESP_TIMEOUT_MS
189189
@copydoc MQTT_PINGRESP_TIMEOUT_MS
190190

191+
@section MQTT_RECV_POLLING_TIMEOUT_MS
192+
@copydoc MQTT_RECV_POLLING_TIMEOUT_MS
193+
191194
@section MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
192195
@copydoc MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
193196

lexicon.txt

+2
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ mytcpsocketcontext
174174
mytlscontext
175175
networkbuffer
176176
networkcontext
177+
networkinterfacereceivestub
177178
networkinterfacesendstub
178179
networkrecv
179180
networksend
@@ -279,6 +280,7 @@ receivepacket
279280
recordcount
280281
recordindex
281282
recv
283+
recvexact
282284
recvfunc
283285
reestablishment
284286
remaininglength

source/core_mqtt.c

+36-28
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,25 @@ static uint32_t calculateElapsedTime( uint32_t later,
7171
static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
7272

7373
/**
74-
* @brief Receive bytes into the network buffer, with a timeout.
74+
* @brief Receive bytes into the network buffer.
7575
*
7676
* @param[in] pContext Initialized MQTT Context.
7777
* @param[in] bytesToRecv Number of bytes to receive.
78-
* @param[in] timeoutMs Time remaining to receive the packet.
78+
*
79+
* @note This operation calls the transport receive function
80+
* repeatedly to read bytes from the network until either:
81+
* 1. The requested number of bytes @a bytesToRecv are read.
82+
* OR
83+
* 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
84+
*
85+
* OR
86+
* 3. There is an error in reading from the network.
87+
*
7988
*
8089
* @return Number of bytes received, or negative number on network error.
8190
*/
8291
static int32_t recvExact( const MQTTContext_t * pContext,
83-
size_t bytesToRecv,
84-
uint32_t timeoutMs );
92+
size_t bytesToRecv );
8593

8694
/**
8795
* @brief Discard a packet from the transport interface.
@@ -683,13 +691,12 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType )
683691
/*-----------------------------------------------------------*/
684692

685693
static int32_t recvExact( const MQTTContext_t * pContext,
686-
size_t bytesToRecv,
687-
uint32_t timeoutMs )
694+
size_t bytesToRecv )
688695
{
689696
uint8_t * pIndex = NULL;
690697
size_t bytesRemaining = bytesToRecv;
691698
int32_t totalBytesRecvd = 0, bytesRecvd;
692-
uint32_t entryTimeMs = 0U, elapsedTimeMs = 0U;
699+
uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
693700
TransportRecv_t recvFunc = NULL;
694701
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
695702
bool receiveError = false;
@@ -704,7 +711,8 @@ static int32_t recvExact( const MQTTContext_t * pContext,
704711
recvFunc = pContext->transportInterface.recv;
705712
getTimeStampMs = pContext->getTime;
706713

707-
entryTimeMs = getTimeStampMs();
714+
/* Part of the MQTT packet has been read before calling this function. */
715+
lastDataRecvTimeMs = getTimeStampMs();
708716

709717
while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
710718
{
@@ -719,8 +727,11 @@ static int32_t recvExact( const MQTTContext_t * pContext,
719727
totalBytesRecvd = bytesRecvd;
720728
receiveError = true;
721729
}
722-
else
730+
else if( bytesRecvd > 0 )
723731
{
732+
/* Reset the starting time as we have received some data from the network. */
733+
lastDataRecvTimeMs = getTimeStampMs();
734+
724735
/* It is a bug in the application's transport receive implementation
725736
* if more bytes than expected are received. To avoid a possible
726737
* overflow in converting bytesRemaining from unsigned to signed,
@@ -732,18 +743,20 @@ static int32_t recvExact( const MQTTContext_t * pContext,
732743
totalBytesRecvd += ( int32_t ) bytesRecvd;
733744
pIndex += bytesRecvd;
734745
LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, "
735-
"TotalBytesReceived=%ld.",
736746
( long int ) bytesRecvd,
737-
( unsigned long ) bytesRemaining,
738-
( long int ) totalBytesRecvd ) );
747+
( unsigned long ) bytesRemaining ) );
739748
}
740-
741-
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
742-
743-
if( ( bytesRemaining > 0U ) && ( elapsedTimeMs >= timeoutMs ) )
749+
else
744750
{
745-
LogError( ( "Time expired while receiving packet." ) );
746-
receiveError = true;
751+
/* No bytes were read from the network. */
752+
timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
753+
754+
/* Check for timeout if we have been waiting to receive any byte on the network. */
755+
if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
756+
{
757+
LogError( ( "Time expired while receiving packet." ) );
758+
receiveError = true;
759+
}
747760
}
748761
}
749762

@@ -760,7 +773,6 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
760773
int32_t bytesReceived = 0;
761774
size_t bytesToReceive = 0U;
762775
uint32_t totalBytesReceived = 0U, entryTimeMs = 0U, elapsedTimeMs = 0U;
763-
uint32_t remainingTimeMs = timeoutMs;
764776
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
765777
bool receiveError = false;
766778

@@ -779,7 +791,7 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
779791
bytesToReceive = remainingLength - totalBytesReceived;
780792
}
781793

782-
bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs );
794+
bytesReceived = recvExact( pContext, bytesToReceive );
783795

784796
if( bytesReceived != ( int32_t ) bytesToReceive )
785797
{
@@ -795,12 +807,8 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
795807

796808
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs );
797809

798-
/* Update remaining time and check for timeout. */
799-
if( elapsedTimeMs < timeoutMs )
800-
{
801-
remainingTimeMs = timeoutMs - elapsedTimeMs;
802-
}
803-
else
810+
/* Check for timeout. */
811+
if( elapsedTimeMs >= timeoutMs )
804812
{
805813
LogError( ( "Time expired while discarding packet." ) );
806814
receiveError = true;
@@ -846,7 +854,7 @@ static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
846854
else
847855
{
848856
bytesToReceive = incomingPacket.remainingLength;
849-
bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs );
857+
bytesReceived = recvExact( pContext, bytesToReceive );
850858

851859
if( bytesReceived == ( int32_t ) bytesToReceive )
852860
{
@@ -2167,7 +2175,7 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
21672175
elapsedTimeMs = calculateElapsedTime( pContext->getTime(),
21682176
entryTimeMs );
21692177

2170-
if( elapsedTimeMs > timeoutMs )
2178+
if( elapsedTimeMs >= timeoutMs )
21712179
{
21722180
break;
21732181
}

source/include/core_mqtt.h

+16
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,14 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext );
603603
* @param[in] timeoutMs Minimum time in milliseconds that the receive loop will
604604
* run, unless an error occurs.
605605
*
606+
* @note Calling this function blocks the calling context for a time period that
607+
* depends on the passed @p timeoutMs, the configuration macro, #MQTT_RECV_POLLING_TIMEOUT_MS,
608+
* and the underlying transport interface implementation timeouts, unless an error
609+
* occurs.
610+
* Blocking Time = Max( timeoutMs parameter,
611+
* MQTT_RECV_POLLING_TIMEOUT_MS,
612+
* Transport interface send/recv implementation timeout )
613+
*
606614
* @return #MQTTBadParameter if context is NULL;
607615
* #MQTTRecvFailed if a network error occurs during reception;
608616
* #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
@@ -655,6 +663,14 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext,
655663
* @param[in] timeoutMs Minimum time in milliseconds that the receive loop will
656664
* run, unless an error occurs.
657665
*
666+
* @note Calling this function blocks the calling context for a time period that
667+
* depends on the passed @p timeoutMs, the configuration macro, #MQTT_RECV_POLLING_TIMEOUT_MS,
668+
* and the underlying transport interface implementation timeouts, unless an error
669+
* occurs.
670+
* Blocking Time = Max( timeoutMs parameter,
671+
* MQTT_RECV_POLLING_TIMEOUT_MS,
672+
* Transport interface send/recv implementation timeout )
673+
*
658674
* @return #MQTTBadParameter if context is NULL;
659675
* #MQTTRecvFailed if a network error occurs during reception;
660676
* #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;

source/include/core_mqtt_config_defaults.h

+21
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,27 @@
108108
#define MQTT_PINGRESP_TIMEOUT_MS ( 500U )
109109
#endif
110110

111+
/**
112+
* @brief The maximum duration between non-empty network reads while
113+
* receiving an MQTT packet via the #MQTT_ProcessLoop or #MQTT_ReceiveLoop
114+
* API functions.
115+
*
116+
* When an incoming MQTT packet is detected, the transport receive function
117+
* may be called multiple times until all of the expected number of bytes of the
118+
* packet are received. This timeout represents the maximum polling duration that
119+
* is allowed without any data reception from the network for the incoming packet.
120+
* If the timeout expires, the #MQTT_ProcessLoop and #MQTT_ReceiveLoop functions
121+
* return #MQTTRecvFailed.
122+
*
123+
* <b>Possible values:</b> Any positive integer up to SIZE_MAX. Recommended to
124+
* use a small timeout value. <br>
125+
* <b>Default value:</b> `10`
126+
*
127+
*/
128+
#ifndef MQTT_RECV_POLLING_TIMEOUT_MS
129+
#define MQTT_RECV_POLLING_TIMEOUT_MS ( 10U )
130+
#endif
131+
111132
/**
112133
* @brief Macro that is called in the MQTT library for logging "Error" level
113134
* messages.

source/interface/transport_interface.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ typedef struct NetworkContext NetworkContext_t;
163163
* @param[in] pBuffer Buffer to receive the data into.
164164
* @param[in] bytesToRecv Number of bytes requested from the network.
165165
*
166-
* @return The number of bytes received or a negative error code.
166+
* @return The number of bytes received or a negative value to indicate
167+
* error.
168+
* @note If no data is available on the network to read and no error
169+
* has occurred, zero MUST be the return value. Zero MUST NOT be used
170+
* if a network disconnection has occurred.
167171
*/
168172
/* @[define_transportrecv] */
169173
typedef int32_t ( * TransportRecv_t )( NetworkContext_t * pNetworkContext,

test/cbmc/include/core_mqtt_config.h

+18
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,22 @@ struct NetworkContext
7070
*/
7171
#define MQTT_PINGRESP_TIMEOUT_MS ( 500U )
7272

73+
/**
74+
* @brief The maximum duration of receiving no data over network when
75+
* attempting to read an incoming MQTT packet by the #MQTT_ProcessLoop or
76+
* #MQTT_ReceiveLoop API functions.
77+
*
78+
* When an incoming MQTT packet is detected, the transport receive function
79+
* may be called multiple times until all the expected number of bytes for the
80+
* packet are received. This timeout represents the maximum duration of polling
81+
* for any data to be received over the network for the incoming.
82+
* If the timeout expires, the #MQTT_ProcessLoop or #MQTT_ReceiveLoop functions
83+
* return #MQTTRecvFailed.
84+
*
85+
* This is set to 1 to exit right away after a zero is received in the transport
86+
* receive stub. There is no added value, in proving memory safety, to repeat
87+
* the logic that checks if the polling timeout is reached.
88+
*/
89+
#define MQTT_RECV_POLLING_TIMEOUT_MS ( 1U )
90+
7391
#endif /* ifndef CORE_MQTT_CONFIG_H_ */

test/cbmc/proofs/MQTT_Connect/Makefile

+7-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ MAX_NETWORK_SEND_TRIES=3
3232
# time out of 3 we can get coverage of the entire function. Another iteration
3333
# performed will unnecessarily duplicate the proof.
3434
MQTT_RECEIVE_TIMEOUT=3
35+
# The NetworkInterfaceReceiveStub is called once for getting the incoming packet
36+
# type with one byte of data, then it is called multiple times to reveive the
37+
# packet.
38+
MAX_NETWORK_RECV_TRIES=4
3539
# Please see test/cbmc/include/core_mqtt_config.h for more
3640
# information on these defines.
3741
MQTT_STATE_ARRAY_MAX_COUNT=11
3842
MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT=3
3943
DEFINES += -DMQTT_RECEIVE_TIMEOUT=$(MQTT_RECEIVE_TIMEOUT)
4044
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
45+
DEFINES += -DMAX_NETWORK_RECV_TRIES=$(MAX_NETWORK_RECV_TRIES)
4146
INCLUDES +=
4247

4348
# These functions do not coincide with the call graph of MQTT_Connect, but are
@@ -54,10 +59,10 @@ REMOVE_FUNCTION_BODY += __CPROVER_file_local_core_mqtt_c_handleKeepAlive
5459
# function.
5560
REMOVE_FUNCTION_BODY += memcpy
5661

57-
# The loops below are unwound once more than the timeout. The loops below use
62+
# The loop below is unwound once more than the timeout. The loop below uses
5863
# the user passed in timeout to break the loop.
59-
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MQTT_RECEIVE_TIMEOUT)
6064
UNWINDSET += __CPROVER_file_local_core_mqtt_c_discardPacket.0:$(MQTT_RECEIVE_TIMEOUT)
65+
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRIES)
6166
# If the user passed in timeout is zero, then the loop will run until the
6267
# MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT is reached.
6368
UNWINDSET += __CPROVER_file_local_core_mqtt_c_receiveConnack.0:$(MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT)

test/cbmc/proofs/MQTT_ProcessLoop/Makefile

+6-1
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,16 @@ MQTT_RECEIVE_TIMEOUT=3
3232
# Please see test/cbmc/stubs/network_interface_subs.c for
3333
# more information on MAX_NETWORK_SEND_TRIES.
3434
MAX_NETWORK_SEND_TRIES=3
35+
# The NetworkInterfaceReceiveStub is called once for getting the incoming packet
36+
# type with one byte of data, then it is called multiple times to reveive the
37+
# packet.
38+
MAX_NETWORK_RECV_TRIES=4
3539
# Please see test/cbmc/include/core_mqtt_config.h for more
3640
# information.
3741
MQTT_STATE_ARRAY_MAX_COUNT=11
3842
DEFINES += -DMQTT_RECEIVE_TIMEOUT=$(MQTT_RECEIVE_TIMEOUT)
3943
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
44+
DEFINES += -DMAX_NETWORK_RECV_TRIES=$(MAX_NETWORK_RECV_TRIES)
4045
INCLUDES +=
4146

4247
# These functions have their memory saftey proven in other harnesses.
@@ -46,7 +51,7 @@ REMOVE_FUNCTION_BODY += MQTT_SerializeAck
4651

4752
UNWINDSET += MQTT_ProcessLoop.0:$(MQTT_RECEIVE_TIMEOUT)
4853
UNWINDSET += __CPROVER_file_local_core_mqtt_c_discardPacket.0:$(MQTT_RECEIVE_TIMEOUT)
49-
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MQTT_RECEIVE_TIMEOUT)
54+
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRIES)
5055
# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
5156
# sendPacket will continue until all the bytes are sent or a network error
5257
# occurs. Please see NetworkInterfaceReceiveStub in

test/cbmc/proofs/MQTT_ReceiveLoop/Makefile

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ PROOF_UID=MQTT_ReceiveLoop
1111
# out of 2 we can get coverage of the entire function. Another iteration will
1212
# performed unnecessarily duplicating of the proof.
1313
MQTT_RECEIVE_TIMEOUT=3
14+
# The NetworkInterfaceReceiveStub is called once for getting the incoming packet
15+
# type with one byte of data, then it is called multiple times to reveive the
16+
# packet.
17+
MAX_NETWORK_RECV_TRIES=4
1418
# Please see test/cbmc/stubs/network_interface_subs.c for
1519
# more information on MAX_NETWORK_SEND_TRIES.
1620
MAX_NETWORK_SEND_TRIES=3
@@ -19,6 +23,7 @@ MAX_NETWORK_SEND_TRIES=3
1923
MQTT_STATE_ARRAY_MAX_COUNT=11
2024
DEFINES += -DMQTT_RECEIVE_TIMEOUT=$(MQTT_RECEIVE_TIMEOUT)
2125
DEFINES += -DMAX_NETWORK_SEND_TRIES=$(MAX_NETWORK_SEND_TRIES)
26+
DEFINES += -DMAX_NETWORK_RECV_TRIES=$(MAX_NETWORK_RECV_TRIES)
2227
INCLUDES +=
2328

2429
# These functions have their memory saftey proven in other harnesses.
@@ -28,7 +33,7 @@ REMOVE_FUNCTION_BODY += MQTT_SerializeAck
2833
# The loops below are unwound once more than the exclusive timeout bound.
2934
UNWINDSET += MQTT_ReceiveLoop.0:$(MQTT_RECEIVE_TIMEOUT)
3035
UNWINDSET += __CPROVER_file_local_core_mqtt_c_discardPacket.0:$(MQTT_RECEIVE_TIMEOUT)
31-
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MQTT_RECEIVE_TIMEOUT)
36+
UNWINDSET += __CPROVER_file_local_core_mqtt_c_recvExact.0:$(MAX_NETWORK_RECV_TRIES)
3237
# Unlike recvExact, sendPacket is not bounded by the timeout. The loop in
3338
# sendPacket will continue until all the bytes are sent or a network error
3439
# occurs. Please see NetworkInterfaceReceiveStub in

test/cbmc/stubs/network_interface_stubs.c

+17
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@
3535
#define MAX_NETWORK_SEND_TRIES 3
3636
#endif
3737

38+
/* An exclusive bound on the times that the NetworkInterfaceReceiveStub will
39+
* return an unbound value. At this value and beyond, the
40+
* NetworkInterfaceReceiveStub will return zero on every call. */
41+
#ifndef MAX_NETWORK_RECV_TRIES
42+
#define MAX_NETWORK_RECV_TRIES 4
43+
#endif
44+
3845
int32_t NetworkInterfaceReceiveStub( NetworkContext_t * pNetworkContext,
3946
void * pBuffer,
4047
size_t bytesToRecv )
@@ -48,11 +55,21 @@ int32_t NetworkInterfaceReceiveStub( NetworkContext_t * pNetworkContext,
4855
__CPROVER_havoc_object( pBuffer );
4956

5057
int32_t bytesOrError;
58+
static size_t tries = 0;
5159

5260
/* It is a bug for the application defined transport send function to return
5361
* more than bytesToRecv. */
5462
__CPROVER_assume( bytesOrError <= ( int32_t ) bytesToRecv );
5563

64+
if( tries < ( MAX_NETWORK_RECV_TRIES - 1 ) )
65+
{
66+
tries++;
67+
}
68+
else
69+
{
70+
bytesOrError = 0;
71+
}
72+
5673
return bytesOrError;
5774
}
5875

0 commit comments

Comments
 (0)