-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix: Fix interrupted network read operation #120
Changes from all commits
5ec1c2f
4205109
0704c72
adce766
f5aa1b3
76166d0
1d73bb7
0c5a9e3
34adc86
2a3cc9a
70b181a
68fcb16
4ba8c53
b710790
f8586dc
c59d39b
af1ca02
86798bf
e1de841
9a792bc
ebcfb92
fe0a071
32f0291
dd91637
a4bf796
bf6f94d
092e7ff
f0a1808
fcaf4bb
b923ea5
b1eba61
25e74d6
b4b0c78
95abb93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,17 +71,25 @@ static uint32_t calculateElapsedTime( uint32_t later, | |
static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ); | ||
|
||
/** | ||
* @brief Receive bytes into the network buffer, with a timeout. | ||
* @brief Receive bytes into the network buffer. | ||
* | ||
* @param[in] pContext Initialized MQTT Context. | ||
* @param[in] bytesToRecv Number of bytes to receive. | ||
* @param[in] timeoutMs Time remaining to receive the packet. | ||
* | ||
* @note This operation calls the transport receive function | ||
* repeatedly to read bytes from the network until either: | ||
* 1. The requested number of bytes @a bytesToRecv are read. | ||
* OR | ||
* 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration. | ||
* | ||
* OR | ||
* 3. There is an error in reading from the network. | ||
* | ||
* | ||
* @return Number of bytes received, or negative number on network error. | ||
*/ | ||
static int32_t recvExact( const MQTTContext_t * pContext, | ||
size_t bytesToRecv, | ||
uint32_t timeoutMs ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to the removal of this parameter, some more timeouts could probably be removed due to similar reasoning. I don't think it's critical though and definitely shouldn't be done in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that the timeout passed to |
||
size_t bytesToRecv ); | ||
|
||
/** | ||
* @brief Discard a packet from the transport interface. | ||
|
@@ -683,13 +691,12 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ) | |
/*-----------------------------------------------------------*/ | ||
|
||
static int32_t recvExact( const MQTTContext_t * pContext, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. static int32_t recvExact( const MQTTContext_t * pContext,
size_t bytesToRecv )
{
uint8_t * pIndex = NULL;
size_t bytesRemaining = bytesToRecv;
int32_t totalBytesRecvd = 0, bytesRecvd;
uint32_t timestampBeforeRecvMs = 0U, timeSpentInRecvMs = 0U;
TransportRecv_t recvFunc = NULL;
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
bool receiveError = false;
assert( pContext != NULL );
assert( bytesToRecv <= pContext->networkBuffer.size );
assert( pContext->getTime != NULL );
assert( pContext->transportInterface.recv != NULL );
assert( pContext->networkBuffer.pBuffer != NULL );
pIndex = pContext->networkBuffer.pBuffer;
recvFunc = pContext->transportInterface.recv;
getTimeStampMs = pContext->getTime;
while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
{
timestampBeforeRecvMs = getTimeStampMs();
bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
pIndex,
bytesRemaining );
timeSpentInRecvMs = calculateElapsedTime( getTimeStampMs(), timestampBeforeRecvMs );
if( bytesRecvd < 0 )
{
LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
( long int ) bytesRecvd ) );
totalBytesRecvd = bytesRecvd;
receiveError = true;
}
else if( bytesRecvd > 0 )
{
/* It is a bug in the application's transport receive implementation
* if more bytes than expected are received. To avoid a possible
* overflow in converting bytesRemaining from unsigned to signed,
* this assert must exist after the check for bytesRecvd being
* negative. */
assert( ( size_t ) bytesRecvd <= bytesRemaining );
bytesRemaining -= ( size_t ) bytesRecvd;
totalBytesRecvd += ( int32_t ) bytesRecvd;
pIndex += bytesRecvd;
LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, "
"TotalBytesReceived=%ld.",
( long int ) bytesRecvd,
( unsigned long ) bytesRemaining,
( long int ) totalBytesRecvd ) );
}
else
{
/* No bytes were read from the network. */
}
/* If there is more data to be received and nothing was received
* for MQTT_RECV_POLLING_TIMEOUT_MS, treat this as error. */
if( ( bytesRemaining > 0U ) &&
( timeSpentInRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS ) &&
( bytesRecvd == 0 ) )
{
LogError( ( "Time expired while receiving packet." ) );
receiveError = true;
}
}
return totalBytesRecvd;
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will only calculate the time spent for each call of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In your code suggestion, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. In that case, changing the variable names would make it a bit more readable: entryTimeMs --> lastDataRecvdTimestampMs
noDataRecvdTimeMs --> timeSinceLastDataWasRecvd |
||
size_t bytesToRecv, | ||
uint32_t timeoutMs ) | ||
size_t bytesToRecv ) | ||
{ | ||
uint8_t * pIndex = NULL; | ||
size_t bytesRemaining = bytesToRecv; | ||
int32_t totalBytesRecvd = 0, bytesRecvd; | ||
uint32_t entryTimeMs = 0U, elapsedTimeMs = 0U; | ||
uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U; | ||
TransportRecv_t recvFunc = NULL; | ||
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; | ||
bool receiveError = false; | ||
|
@@ -704,7 +711,8 @@ static int32_t recvExact( const MQTTContext_t * pContext, | |
recvFunc = pContext->transportInterface.recv; | ||
getTimeStampMs = pContext->getTime; | ||
|
||
entryTimeMs = getTimeStampMs(); | ||
/* Part of the MQTT packet has been read before calling this function. */ | ||
lastDataRecvTimeMs = getTimeStampMs(); | ||
|
||
while( ( bytesRemaining > 0U ) && ( receiveError == false ) ) | ||
{ | ||
|
@@ -719,8 +727,11 @@ static int32_t recvExact( const MQTTContext_t * pContext, | |
totalBytesRecvd = bytesRecvd; | ||
receiveError = true; | ||
} | ||
else | ||
else if( bytesRecvd > 0 ) | ||
{ | ||
/* Reset the starting time as we have received some data from the network. */ | ||
lastDataRecvTimeMs = getTimeStampMs(); | ||
|
||
/* It is a bug in the application's transport receive implementation | ||
* if more bytes than expected are received. To avoid a possible | ||
* overflow in converting bytesRemaining from unsigned to signed, | ||
|
@@ -732,18 +743,20 @@ static int32_t recvExact( const MQTTContext_t * pContext, | |
totalBytesRecvd += ( int32_t ) bytesRecvd; | ||
pIndex += bytesRecvd; | ||
LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, " | ||
"TotalBytesReceived=%ld.", | ||
( long int ) bytesRecvd, | ||
( unsigned long ) bytesRemaining, | ||
( long int ) totalBytesRecvd ) ); | ||
( unsigned long ) bytesRemaining ) ); | ||
} | ||
|
||
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); | ||
|
||
if( ( bytesRemaining > 0U ) && ( elapsedTimeMs >= timeoutMs ) ) | ||
else | ||
{ | ||
LogError( ( "Time expired while receiving packet." ) ); | ||
receiveError = true; | ||
/* No bytes were read from the network. */ | ||
timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs ); | ||
|
||
/* Check for timeout if we have been waiting to receive any byte on the network. */ | ||
if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS ) | ||
{ | ||
LogError( ( "Time expired while receiving packet." ) ); | ||
receiveError = true; | ||
} | ||
} | ||
} | ||
|
||
|
@@ -760,7 +773,6 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, | |
int32_t bytesReceived = 0; | ||
size_t bytesToReceive = 0U; | ||
uint32_t totalBytesReceived = 0U, entryTimeMs = 0U, elapsedTimeMs = 0U; | ||
uint32_t remainingTimeMs = timeoutMs; | ||
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL; | ||
bool receiveError = false; | ||
|
||
|
@@ -779,7 +791,7 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, | |
bytesToReceive = remainingLength - totalBytesReceived; | ||
} | ||
|
||
bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs ); | ||
bytesReceived = recvExact( pContext, bytesToReceive ); | ||
|
||
if( bytesReceived != ( int32_t ) bytesToReceive ) | ||
{ | ||
|
@@ -795,12 +807,8 @@ static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, | |
|
||
elapsedTimeMs = calculateElapsedTime( getTimeStampMs(), entryTimeMs ); | ||
|
||
/* Update remaining time and check for timeout. */ | ||
if( elapsedTimeMs < timeoutMs ) | ||
{ | ||
remainingTimeMs = timeoutMs - elapsedTimeMs; | ||
} | ||
else | ||
/* Check for timeout. */ | ||
if( elapsedTimeMs >= timeoutMs ) | ||
{ | ||
LogError( ( "Time expired while discarding packet." ) ); | ||
receiveError = true; | ||
|
@@ -846,7 +854,7 @@ static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, | |
else | ||
{ | ||
bytesToReceive = incomingPacket.remainingLength; | ||
bytesReceived = recvExact( pContext, bytesToReceive, remainingTimeMs ); | ||
bytesReceived = recvExact( pContext, bytesToReceive ); | ||
|
||
if( bytesReceived == ( int32_t ) bytesToReceive ) | ||
{ | ||
|
@@ -2167,7 +2175,7 @@ MQTTStatus_t MQTT_ProcessLoop( MQTTContext_t * pContext, | |
elapsedTimeMs = calculateElapsedTime( pContext->getTime(), | ||
entryTimeMs ); | ||
|
||
if( elapsedTimeMs > timeoutMs ) | ||
if( elapsedTimeMs >= timeoutMs ) | ||
muneebahmed10 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
break; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the doc for
MQTT_Init
, we state the following:We need to update this to say that
MQTT_RECV_POLLING_TIMEOUT_MS
must also be set to 0 to prevent an infinite loop.However, this is now insufficient as one iteration of
MQTT_ReceiveLoop
orMQTT_ProcessLoop
may not be enough to receive the entire payload. The way around that is to setMQTT_RECV_POLLING_TIMEOUT_MS
to a desired number of receive iterations, then pass a timer query function that increments on each call. However, that contradicts labeling the unit of time as milliseconds, but why does the timeout need to be in milliseconds here anyway?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that setting the timer function to a value that returns 0 can result in an infinite loop for the
MQTT_ProcessLoop
function as this is the condition used for determining that the timeout period has expired:We should change that condition to be
>=
instead...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal is to allow unbound number of retries for partial reads of packet data over the network as long as the transport recv function is able to successfully read data over the network. (Reason is that the
recvExact
function is called after some part of the MQTT packet is already read, and thus, we want to attempt to read the remaining packet data as long as the transport recv function does not return an error).Thus, even if a dummy timer function that always returns 0 is used, as long as the transport receive function is able to perform partial reads, the transport recv calls will be retried.
If a dummy timer that always returns 0 is used and the network read returns 0 bytes, there still wouldn't be an infinite loop inside the
recvExact
function as it uses the>=
check for the timeout.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That definitely looks like a typo as it is now. MQTT_ReceiveLoop uses
>=
, which looks correctThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have fixed it for
MQTT_ProcessLoop
in 32f0291