Skip to content

Commit b86c618

Browse files
committedMay 30, 2022
Fixed multithread issue in CTX. Closes jgaa#128
1 parent 0e09292 commit b86c618

15 files changed

+339
-159
lines changed
 

‎CMakeLists.txt

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ if (DEFINED ENV{RESTC_CPP_VERSION})
55
endif()
66

77
if (NOT DEFINED RESTC_CPP_VERSION)
8-
set(RESTC_CPP_VERSION 0.92.0)
8+
set(RESTC_CPP_VERSION 0.93.0)
99
endif()
1010

1111
if(NOT DEFINED RESTC_BOOST_VERSION)
@@ -56,9 +56,9 @@ if (NOT RESTC_CPP_LOG_WITH_INTERNAL_LOG AND NOT RESTC_CPP_LOG_WITH_LOGFAULT AND
5656
set(RESTC_CPP_LOG_WITH_INTERNAL_LOG ON)
5757
endif()
5858

59-
set(RESTC_CPP_LOG_LEVEL_STR "info" CACHE STRING "Limit logs to: none, error, warn, info, debug, trace")
59+
set(RESTC_CPP_LOG_LEVEL_STR "trace" CACHE STRING "Limit logs to: none, error, warn, info, debug, trace")
6060

61-
option(RESTC_CPP_LOG_JSON_SERIALIZATION "Enable trace logging for json serialization debugging")
61+
option(RESTC_CPP_LOG_JSON_SERIALIZATION "Enable trace logging for json serialization debugging" OFF)
6262

6363
option(RESTC_CPP_WITH_ZLIB "Use zlib" ON)
6464

‎include/restc-cpp/ConnectionPool.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class ConnectionPool
2121
const Connection::Type connectionType,
2222
bool new_connection_please = false) = 0;
2323

24-
virtual std::future<std::size_t> GetIdleConnections() const = 0;
24+
virtual size_t GetIdleConnections() const = 0;
2525
static std::shared_ptr<ConnectionPool> Create(RestClient& owner);
2626

2727
/*! Close the connection-pool

‎include/restc-cpp/Socket.h

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include <boost/system/error_code.hpp>
1212

1313
#include "restc-cpp/typename.h"
14+
#include "restc-cpp/logging.h"
15+
1416
#include "error.h"
1517

1618
namespace restc_cpp {
@@ -68,6 +70,10 @@ class ExceptionWrapper {
6870
try {
6971
return fn();
7072
} catch (const boost::system::system_error& ex) {
73+
74+
RESTC_CPP_LOG_TRACE_("ExceptionWrapper: " << ex.what()
75+
<< ", value=" << ex.code());
76+
7177
if (ex.code().value() == boost::system::errc::operation_canceled) {
7278
if (reason_ == Socket::Reason::TIME_OUT) {
7379
throw RequestTimeOutException();

‎include/restc-cpp/internals/helpers.h

+2
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77
#else
88
# define LOCK_
99
#endif
10+
11+
#define LOCK_ALWAYS_ std::lock_guard<std::mutex> lock_{mutex_}

‎include/restc-cpp/logging.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ class Logger {
107107

108108
}
109109

110-
#define RESTC_CPP_TEST_LOGGING_SETUP(level) RestcCppTestStartLogger(level)
110+
//#define RESTC_CPP_TEST_LOGGING_SETUP(level) RestcCppTestStartLogger(level)
111+
#define RESTC_CPP_TEST_LOGGING_SETUP(level) RestcCppTestStartLogger("trace")
111112

112113
inline void RestcCppTestStartLogger(const std::string& level = "info") {
113114
auto llevel = restc_cpp::LogLevel::INFO;

‎include/restc-cpp/restc-cpp.h

+8
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ class Request {
174174
general_callback_t beforeWriteFn;
175175
general_callback_t afterWriteFn;
176176
std::string bindToLocalAddress; // host:port
177+
#ifdef RESTC_CPP_THREADED_CTX
178+
size_t threads = 4; // Threads created for the Client.
179+
#else
180+
size_t threads = 1;
181+
#endif
177182
};
178183

179184
virtual const Properties& GetProperties() const = 0;
@@ -419,6 +424,8 @@ class RestClient {
419424
*/
420425
virtual void CloseWhenReady(bool wait = true) = 0;
421426

427+
virtual bool IsClosed() const noexcept = 0;
428+
422429
/*! Factory */
423430
static std::unique_ptr<RestClient> Create();
424431

@@ -449,6 +456,7 @@ class RestClient {
449456
static std::unique_ptr<RestClient>
450457
Create(boost::asio::io_service& ioservice);
451458

459+
452460
protected:
453461
virtual std::unique_ptr<DoneHandler> GetDoneHandler() = 0;
454462
};

‎src/ConnectionPoolImpl.cpp

+91-90
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,22 @@ class ConnectionPoolImpl
8787
Connection::ptr_t& GetConnection() noexcept { return connection; }
8888
int GetTtl() const noexcept { return ttl; }
8989
time_t GetCreated() const noexcept { return created;}
90-
timestamp_t GetLastUsed() const noexcept { return last_used; }
90+
timestamp_t GetLastUsed() const noexcept {
91+
LOCK_ALWAYS_;
92+
return last_used;
93+
}
94+
void SetLastUsed(timestamp_t ts) {
95+
LOCK_ALWAYS_;
96+
last_used = ts;
97+
}
9198

9299
private:
93100
const Key key;
94101
Connection::ptr_t connection;
95102
const int ttl = 60;
96103
const time_t created;
97-
#ifdef RESTC_CPP_THREADED_CTX
98-
atomic<timestamp_t> last_used = chrono::steady_clock::now();
99-
#else
104+
mutable std::mutex mutex_;
100105
timestamp_t last_used = chrono::steady_clock::now();
101-
#endif
102106
};
103107

104108
// Owns the connection
@@ -145,10 +149,6 @@ class ConnectionPoolImpl
145149
ConnectionPoolImpl(RestClient& owner)
146150
: owner_{owner}, properties_{owner.GetConnectionProperties()}
147151
, cache_cleanup_timer_{owner.GetIoService()}
148-
#ifdef RESTC_CPP_THREADED_CTX
149-
, strand_{owner_.GetIoService()}
150-
#endif
151-
152152
{
153153
on_release_ = [this](const Entry::ptr_t& entry) { OnRelease(entry); };
154154
}
@@ -175,32 +175,27 @@ class ConnectionPoolImpl
175175
}
176176

177177
// Get ctx for internal, syncronized operations;
178-
#ifdef RESTC_CPP_THREADED_CTX
179-
auto GetCtx() const {
180-
return strand_;
181-
}
182-
#else
183178
auto& GetCtx() const {
184179
return owner_.GetIoService();
185180
}
186-
#endif
187181

188-
std::future<std::size_t> GetIdleConnections() const override {
189-
auto my_promise = make_shared<promise<size_t>>() ;
190-
GetCtx().dispatch([my_promise, this]() {
191-
my_promise->set_value(idle_.size());
192-
});
193-
return my_promise->get_future();
182+
size_t GetIdleConnections() const override {
183+
LOCK_ALWAYS_;
184+
return idle_.size();
194185
}
195186

196187
void Close() override {
188+
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: enter");
197189
if (!closed_) {
198-
closed_ = true;
199-
GetCtx().dispatch([this]{
190+
call_once(close_once_, [this] {
191+
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: closing *once*.");
192+
LOCK_ALWAYS_;
193+
closed_ = true;
200194
cache_cleanup_timer_.cancel();
201195
idle_.clear();
202196
});
203197
}
198+
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: leave");
204199
}
205200

206201
void StartTimer() {
@@ -209,16 +204,17 @@ class ConnectionPoolImpl
209204

210205
private:
211206
void ScheduleNextCacheCleanup() {
212-
GetCtx().dispatch([this]{
213-
cache_cleanup_timer_.expires_from_now(
214-
boost::posix_time::seconds(properties_->cacheCleanupIntervalSeconds));
215-
cache_cleanup_timer_.async_wait(std::bind(&ConnectionPoolImpl::OnCacheCleanup,
216-
shared_from_this(), std::placeholders::_1));
217-
});
207+
LOCK_ALWAYS_;
208+
cache_cleanup_timer_.expires_from_now(
209+
boost::posix_time::seconds(properties_->cacheCleanupIntervalSeconds));
210+
cache_cleanup_timer_.async_wait(std::bind(&ConnectionPoolImpl::OnCacheCleanup,
211+
shared_from_this(), std::placeholders::_1));
218212
}
219213

220214
void OnCacheCleanup(const boost::system::error_code& error) {
215+
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: enter");
221216
if (closed_) {
217+
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: closed");
222218
return;
223219
}
224220

@@ -227,10 +223,11 @@ class ConnectionPoolImpl
227223
return;
228224
}
229225

230-
RESTC_CPP_LOG_TRACE_("Cleaning cache...");
226+
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: Cleaning cache...");
231227

232228
const auto now = std::chrono::steady_clock::now();
233229
{
230+
LOCK_ALWAYS_;
234231
for(auto it = idle_.begin(); !closed_ && it != idle_.end();) {
235232

236233
auto current = it;
@@ -250,21 +247,27 @@ class ConnectionPoolImpl
250247
}
251248
}
252249

250+
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: schedule next");
253251
ScheduleNextCacheCleanup();
252+
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: leave");
254253
}
255254

256-
void OnRelease(const Entry::ptr_t& entry) {
257-
GetCtx().dispatch([this, entry]{
255+
void OnRelease(const Entry::ptr_t entry) {
256+
{
257+
LOCK_ALWAYS_;
258258
in_use_.erase(entry->GetKey());
259-
if (closed_ || !entry->GetConnection()->GetSocket().IsOpen()) {
260-
RESTC_CPP_LOG_TRACE_("Discarding " << *entry << " after use");
261-
return;
262-
}
259+
}
260+
if (closed_ || !entry->GetConnection()->GetSocket().IsOpen()) {
261+
RESTC_CPP_LOG_TRACE_("Discarding " << *entry << " after use");
262+
return;
263+
}
263264

264-
RESTC_CPP_LOG_TRACE_("Recycling " << *entry << " after use");
265-
entry->GetLastUsed() = chrono::steady_clock::now();
265+
RESTC_CPP_LOG_TRACE_("Recycling " << *entry << " after use");
266+
entry->SetLastUsed(chrono::steady_clock::now());
267+
{
268+
LOCK_ALWAYS_;
266269
idle_.insert({entry->GetKey(), entry});
267-
});
270+
}
268271
}
269272

270273
// Check the constraints to see if we can create a new connection
@@ -277,39 +280,42 @@ class ConnectionPoolImpl
277280
promise<bool> pr;
278281
auto result = pr.get_future();
279282

280-
GetCtx().dispatch([this, &ep, connectionType, &pr]() {
281-
{
282-
const auto key = Key{ep, connectionType};
283-
const size_t ep_cnt = idle_.count(key) + in_use_.count(key);
284-
if (ep_cnt >= properties_->cacheMaxConnectionsPerEndpoint) {
285-
RESTC_CPP_LOG_DEBUG_("No more available slots for " << key);
286-
pr.set_value(false);
287-
return;
288-
}
289-
}
283+
size_t cnt = 0;
284+
const auto key = Key{ep, connectionType};
285+
{
286+
LOCK_ALWAYS_;
287+
cnt = idle_.count(key) + in_use_.count(key);
288+
}
290289

291-
{
292-
const size_t all_cnt = idle_.size() + in_use_.size();
293-
if (all_cnt >= properties_->cacheMaxConnections) {
294-
295-
// See if we can release an idle connection.
296-
if (!PurgeOldestIdleEntry()) {
297-
RESTC_CPP_LOG_DEBUG_("No more available slots (max="
298-
<< properties_->cacheMaxConnections
299-
<< ", used=" << all_cnt << ')');
300-
pr.set_value(false);
301-
return;
302-
}
303-
}
304-
}
290+
if (cnt >= properties_->cacheMaxConnectionsPerEndpoint) {
291+
RESTC_CPP_LOG_DEBUG_("No more available slots for " << key);
292+
pr.set_value(false);
293+
return false;
294+
}
305295

306-
pr.set_value(true);
307-
});
308296

309-
return result.get();
297+
{
298+
LOCK_ALWAYS_;
299+
cnt = idle_.size() + in_use_.size();
300+
}
301+
if (cnt >= properties_->cacheMaxConnections) {
302+
303+
// See if we can release an idle connection.
304+
if (!PurgeOldestIdleEntry()) {
305+
RESTC_CPP_LOG_DEBUG_("No more available slots (max="
306+
<< properties_->cacheMaxConnections
307+
<< ", used=" << cnt << ')');
308+
pr.set_value(false);
309+
return false;
310+
}
311+
}
312+
313+
return true;
310314
}
311315

312316
bool PurgeOldestIdleEntry() {
317+
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: enter");
318+
LOCK_ALWAYS_;
313319
auto oldest = idle_.begin();
314320
for (auto it = idle_.begin(); it != idle_.end(); ++it) {
315321
if (it->second->GetLastUsed() < oldest->second->GetLastUsed()) {
@@ -320,9 +326,11 @@ class ConnectionPoolImpl
320326
if (oldest != idle_.end()) {
321327
RESTC_CPP_LOG_TRACE_("LRU-Purging " << *oldest->second);
322328
idle_.erase(oldest);
329+
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: success");
323330
return true;
324331
}
325332

333+
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: failed");
326334
return false;
327335
}
328336

@@ -336,21 +344,17 @@ class ConnectionPoolImpl
336344
promise<Connection::ptr_t> pr;
337345
auto result = pr.get_future();
338346

339-
GetCtx().dispatch([this, &ep, connectionType, &pr]{
340-
const auto key = Key{ep, connectionType};
341-
auto it = idle_.find(key);
342-
if (it != idle_.end()) {
343-
auto wrapper = make_unique<ConnectionWrapper>(it->second, on_release_);
344-
in_use_.insert(*it);
345-
idle_.erase(it);
346-
pr.set_value(move(wrapper));
347-
return;
348-
}
349-
350-
pr.set_value({});
351-
});
347+
LOCK_ALWAYS_;
348+
const auto key = Key{ep, connectionType};
349+
auto it = idle_.find(key);
350+
if (it != idle_.end()) {
351+
auto wrapper = make_unique<ConnectionWrapper>(it->second, on_release_);
352+
in_use_.insert(*it);
353+
idle_.erase(it);
354+
return wrapper;
355+
}
352356

353-
return result.get();
357+
return {};
354358
}
355359

356360
Connection::ptr_t CreateNew(const boost::asio::ip::tcp::endpoint& ep,
@@ -377,31 +381,28 @@ class ConnectionPoolImpl
377381
promise<Connection::ptr_t> pr;
378382
auto result = pr.get_future();
379383

380-
GetCtx().dispatch([this, entry=move(entry), &pr]{
384+
{
385+
LOCK_ALWAYS_;
381386
in_use_.insert({entry->GetKey(), entry});
382-
pr.set_value(make_unique<ConnectionWrapper>(entry, on_release_));
383-
});
384-
385-
return result.get();
387+
}
388+
return make_unique<ConnectionWrapper>(entry, on_release_);
386389
}
387390

388391
#ifdef RESTC_CPP_THREADED_CTX
389-
std::atomic_bool closed_ = false;
392+
std::atomic_bool closed_{false};
390393
#else
391394
bool closed_ = false;
392395
#endif
396+
std::once_flag close_once_;
393397
RestClient& owner_;
394398
multimap<Key, Entry::ptr_t> idle_;
395399
multimap<Key, std::weak_ptr<Entry>> in_use_;
396-
std::queue<Entry> pending_;
400+
//std::queue<Entry> pending_;
397401
const Request::Properties::ptr_t properties_;
398402
ConnectionWrapper::release_callback_t on_release_;
399403
boost::asio::deadline_timer cache_cleanup_timer_;
400404

401-
#ifdef RESTC_CPP_THREADED_CTX
402405
mutable std::mutex mutex_;
403-
boost::asio::io_context::strand strand_;
404-
#endif
405406
}; // ConnectionPoolImpl
406407

407408

‎src/IoReaderImpl.cpp

+33-7
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,42 @@ class IoReaderImpl : public DataReader {
2929
auto timer = IoTimer::Create("IoReaderImpl",
3030
cfg_.msReadTimeout,
3131
conn);
32-
const auto bytes = conn->GetSocket().AsyncReadSome(
33-
{buffer_.data(), buffer_.size()}, ctx_.GetYield());
3432

35-
timer->Cancel();
36-
37-
RESTC_CPP_LOG_TRACE_("Read #" << bytes
38-
<< " bytes from " << conn);
39-
return {buffer_.data(), bytes};
33+
for(size_t retries = 0;; ++retries) {
34+
size_t bytes = 0;
35+
try {
36+
if (retries) {
37+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: taking a nap");
38+
ctx_.Sleep(50ms);
39+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: Waking up. Will try to read from the socket now.");
40+
}
41+
42+
bytes = conn->GetSocket().AsyncReadSome(
43+
{buffer_.data(), buffer_.size()}, ctx_.GetYield());
44+
} catch (const boost::system::system_error& ex) {
45+
if (ex.code() == boost::system::errc::resource_unavailable_try_again) {
46+
if ( retries < 16) {
47+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: Caught boost::system::system_error exception: " << ex.what()
48+
<< ". I will continue the retry loop.");
49+
continue;
50+
}
51+
}
52+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: Caught boost::system::system_error exception: " << ex.what());
53+
throw;
54+
} catch (const exception& ex) {
55+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: Caught exception: " << ex.what());
56+
throw;
57+
}
58+
59+
RESTC_CPP_LOG_TRACE_("Read #" << bytes
60+
<< " bytes from " << conn);
61+
62+
timer->Cancel();
63+
return {buffer_.data(), bytes};
64+
}
4065
}
4166

67+
RESTC_CPP_LOG_DEBUG_("IoReaderImpl::ReadSome: Reached outer scope. Timed out?");
4268
throw ObjectExpiredException("Connection expired");
4369
}
4470

‎src/RequestImpl.cpp

+62-13
Original file line numberDiff line numberDiff line change
@@ -394,16 +394,24 @@ class RequestImpl : public Request {
394394
const decltype(address_it) addr_end;
395395

396396
for(; address_it != addr_end; ++address_it) {
397+
if (owner_.IsClosed()) {
398+
RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: The rest client is closed (at first lkoop). Aborting.");
399+
throw FailedToConnectException("Failed to connect (closed)");
400+
}
401+
397402
const auto endpoint = address_it->endpoint();
398403

399404
RESTC_CPP_LOG_TRACE_("Trying endpoint " << endpoint);
400405

401-
// Get a connection from the pool
402-
auto connection = owner_.GetConnectionPool()->GetConnection(
403-
endpoint, protocol_type);
406+
for(size_t retries = 0;; ++retries) {
407+
// Get a connection from the pool
408+
auto connection = owner_.GetConnectionPool()->GetConnection(
409+
endpoint, protocol_type);
404410

405-
// Connect if the connection is new.
406-
if (!connection->GetSocket().IsOpen()) {
411+
// Connect if the connection is new.
412+
if (connection->GetSocket().IsOpen()) {
413+
return connection;
414+
}
407415

408416
RESTC_CPP_LOG_DEBUG_("Connecting to " << endpoint);
409417

@@ -415,7 +423,7 @@ class RequestImpl : public Request {
415423
== prot_filter.end()) {
416424
RESTC_CPP_LOG_TRACE_("Filtered out (protocol mismatch) local address: "
417425
<< properties_->bindToLocalAddress);
418-
continue;
426+
break; // Break out of retry loop, re-enter endpoint loop
419427
}
420428
}
421429

@@ -440,13 +448,42 @@ class RequestImpl : public Request {
440448
}
441449
}
442450

451+
452+
if (owner_.IsClosed()) {
453+
RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: The rest client is closed. Aborting.");
454+
throw FailedToConnectException("Failed to connect (closed)");
455+
}
456+
443457
auto timer = IoTimer::Create(timer_name,
444458
properties_->connectTimeoutMs, connection);
445459

446460
try {
461+
if (retries) {
462+
RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: taking a nap");
463+
ctx.Sleep(50ms);
464+
RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: Waking up. Will try to read from the socket now.");
465+
}
466+
467+
RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: calling AsyncConnect --> " << endpoint);
447468
connection->GetSocket().AsyncConnect(
448469
endpoint, address_it->host_name(),
449470
properties_->tcpNodelay, ctx.GetYield());
471+
RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: OK AsyncConnect --> " << endpoint);
472+
return connection;
473+
} catch (const boost::system::system_error& ex) {
474+
RESTC_CPP_LOG_TRACE_("RequestImpl::Connect:: Caught boost::system::system_error exception: " << ex.what()
475+
<< ". Will close connection " << *connection);
476+
connection->GetSocket().GetSocket().close();
477+
478+
if (ex.code() == boost::system::errc::resource_unavailable_try_again) {
479+
if ( retries < 16) {
480+
RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect:: Caught boost::system::system_error exception: " << ex.what()
481+
<< ". I will continue the retry loop.");
482+
continue;
483+
}
484+
}
485+
RESTC_CPP_LOG_WARN_("RequestImpl::Connect:: Caught boost::system::system_error exception: " << ex.what());
486+
throw FailedToConnectException("Failed to connect");
450487
} catch(const exception& ex) {
451488
RESTC_CPP_LOG_WARN_("Connect to "
452489
<< endpoint
@@ -455,14 +492,12 @@ class RequestImpl : public Request {
455492
<< ", message: " << ex.what());
456493

457494
connection->GetSocket().GetSocket().close();
458-
continue;
459495
}
460-
}
461496

462-
return connection;
463-
}
497+
} // retries
498+
} // endpoints
464499

465-
throw FailedToConnectException("Failed to connect");
500+
throw FailedToConnectException("Failed to connect (exhausted all options)");
466501
}
467502

468503
void SendRequestPayload(Context& /*ctx*/,
@@ -586,12 +621,23 @@ class RequestImpl : public Request {
586621
writer_->Finish();
587622
writer_.reset();
588623

624+
RESTC_CPP_LOG_TRACE_("GetReply: writer is reset.");
625+
589626
DataReader::ReadConfig cfg;
590627
cfg.msReadTimeout = properties_->recvTimeout;
591628
auto reply = ReplyImpl::Create(connection_, ctx, owner_, properties_,
592629
request_type_);
593-
reply->StartReceiveFromServer(
594-
DataReader::CreateIoReader(connection_, ctx, cfg));
630+
631+
RESTC_CPP_LOG_TRACE_("GetReply: Calling StartReceiveFromServer");
632+
try {
633+
reply->StartReceiveFromServer(
634+
DataReader::CreateIoReader(connection_, ctx, cfg));
635+
} catch (const exception& ex) {
636+
RESTC_CPP_LOG_DEBUG_("GetReply: exception from StartReceiveFromServer: " << ex.what());
637+
throw;
638+
}
639+
640+
RESTC_CPP_LOG_TRACE_("GetReply: Returned from StartReceiveFromServer. code=" << reply->GetResponseCode());
595641

596642
const auto http_code = reply->GetResponseCode();
597643
if (http_code == http_301 || http_code == http_302) {
@@ -600,10 +646,13 @@ class RequestImpl : public Request {
600646
throw ProtocolException(
601647
"No Location header in redirect reply");
602648
}
649+
RESTC_CPP_LOG_TRACE_("GetReply: RedirectException. location=" << *redirect_location);
603650
throw RedirectException(http_code, *redirect_location, move(reply));
604651
}
605652

653+
RESTC_CPP_LOG_TRACE_("GetReply: Calling ValidateReply");
606654
ValidateReply(*reply);
655+
RESTC_CPP_LOG_TRACE_("GetReply: returning from ValidateReply");
607656

608657
/* Return the reply. At this time the reply headers and body
609658
* is returned. However, the body may or may not be

‎src/RestClientImpl.cpp

+103-33
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "restc-cpp/logging.h"
99
#include "restc-cpp/ConnectionPool.h"
1010
#include "restc-cpp/RequestBody.h"
11+
#include "restc-cpp/internals/helpers.h"
1112

1213
#ifdef RESTC_CPP_WITH_TLS
1314
# include "boost/asio/ssl.hpp"
@@ -163,8 +164,11 @@ class RestClientImpl : public RestClient {
163164

164165
~RestClientImpl() override {
165166
CloseWhenReady(false);
166-
if (thread_) {
167-
thread_->join();
167+
168+
for(auto &thread : threads_) {
169+
if (thread) {
170+
thread->join();
171+
}
168172
}
169173
}
170174

@@ -194,27 +198,58 @@ class RestClientImpl : public RestClient {
194198
return;
195199
}
196200

197-
promise<void> wait;
198-
auto done = wait.get_future();
201+
std::vector<promise<void>> wait;
202+
203+
#ifndef RESTC_CPP_THREADED_CTX
204+
if (default_connection_properties_->threads != 1) {
205+
RESTC_CPP_LOG_WARN_("Init: Compiled without RESTC_CPP_THREADED_CTX: Using only one thread!");
206+
default_connection_properties_->threads = 1;
207+
}
208+
#endif
209+
210+
// Make sure we don't re-arrange the vectors while some thread is reading from it
211+
done_mutexes_.reserve(default_connection_properties_->threads);
212+
threads_.reserve(default_connection_properties_->threads);
199213

200-
thread_ = make_unique<thread>([&]() {
201-
lock_guard<decltype(done_mutex_)> lock(done_mutex_);
202-
work_ = make_unique<boost::asio::io_service::work>(*io_service_);
203-
wait.set_value();
204-
RESTC_CPP_LOG_DEBUG_("Worker is starting.");
205-
io_service_->run();
206-
RESTC_CPP_LOG_DEBUG_("Worker is done.");
207-
});
214+
for(size_t i = 0; i < default_connection_properties_->threads; ++i) {
215+
wait.emplace_back();
216+
done_mutexes_.push_back(make_unique<recursive_mutex>());
217+
}
208218

209-
// Wait for the ConnectionPool to be constructed
210-
done.get();
219+
work_ = make_unique<boost::asio::io_service::work>(*io_service_);
220+
221+
RESTC_CPP_LOG_TRACE_("Starting " <<default_connection_properties_->threads << " worker thread(s)");
222+
for(size_t i = 0; i < default_connection_properties_->threads; ++i) {
223+
threads_.emplace_back(make_unique<thread>([i, this, &wait]() {
224+
lock_guard<recursive_mutex> lock(*done_mutexes_.at(i));
225+
try {
226+
wait.at(i).set_value();
227+
RESTC_CPP_LOG_DEBUG_("Worker " << i << " is starting.");
228+
io_service_->run();
229+
RESTC_CPP_LOG_DEBUG_("Worker " << i << " is done.");
230+
} catch (const exception& ex) {
231+
RESTC_CPP_LOG_ERROR_("Worker " << i << " caught exception: " << ex.what());
232+
}
233+
}));
234+
}
235+
236+
// Wait for the therads to be started
237+
for(auto& w : wait) {
238+
w.get_future().get();
239+
}
240+
241+
RESTC_CPP_LOG_TRACE_("All worker threads have started");
211242
}
212243

213244

214245
Request::Properties::ptr_t GetConnectionProperties() const override {
215246
return default_connection_properties_;
216247
}
217248

249+
bool IsClosed() const noexcept override {
250+
return closed_;
251+
}
252+
218253
void CloseWhenReady(bool wait) override {
219254
ClearWork();
220255
if (!io_service_->stopped()) {
@@ -226,7 +261,13 @@ class RestClientImpl : public RestClient {
226261
}
227262
if (wait) {
228263
RESTC_CPP_LOG_TRACE_("CloseWhenReady: Waiting for work to end.");
229-
lock_guard<decltype(done_mutex_)> lock(done_mutex_);
264+
265+
// We have to lock/unlock each of them to make sure that all the threads are done
266+
for(auto& dm : done_mutexes_) {
267+
if (dm) {
268+
lock_guard<recursive_mutex> lock(*dm);
269+
}
270+
}
230271
RESTC_CPP_LOG_TRACE_("CloseWhenReady: Done waiting for work to end.");
231272
}
232273
}
@@ -237,18 +278,21 @@ class RestClientImpl : public RestClient {
237278
}
238279

239280
if (!io_service_->stopped()) {
240-
auto promise = make_shared<std::promise<void>>();
241-
242-
io_service_->dispatch([this, promise]() {
243-
if (work_) {
244-
work_.reset();
245-
}
246-
closed_ = true;
247-
promise->set_value();
281+
call_once(close_once_, [&] {
282+
auto promise = make_shared<std::promise<void>>();
283+
284+
io_service_->dispatch([this, promise]() {
285+
LOCK_;
286+
if (work_) {
287+
work_.reset();
288+
}
289+
closed_ = true;
290+
promise->set_value();
291+
});
292+
293+
// Wait for the lambda to finish;
294+
promise->get_future().get();
248295
});
249-
250-
// Wait for the lambda to finish;
251-
promise->get_future().get();
252296
}
253297
}
254298

@@ -312,15 +356,29 @@ class RestClientImpl : public RestClient {
312356
#endif
313357

314358
void OnNoMoreWork() {
359+
RESTC_CPP_LOG_TRACE_("OnNoMoreWork: enter");
360+
LOCK_;
361+
if (current_tasks_ > 0) {
362+
// Cannot close down quite yet.
363+
RESTC_CPP_LOG_TRACE_("OnNoMoreWork: leaving - we have active tasks");
364+
return;
365+
}
315366
if (closed_ && pool_) {
316-
pool_->Close();
317-
pool_.reset();
367+
call_once(close_pool_once_, [&] {
368+
RESTC_CPP_LOG_TRACE_("OnNoMoreWork: closing pool");
369+
pool_->Close();
370+
pool_.reset();
371+
});
318372
}
319373
if (closed_ && ioservice_instance_) {
320374
if (!work_ && !io_service_->stopped()) {
321-
io_service_->stop();
375+
call_once(close_ioservice_once_, [&] {
376+
RESTC_CPP_LOG_TRACE_("OnNoMoreWork: Stopping ioservice");
377+
io_service_->stop();
378+
});
322379
}
323380
}
381+
RESTC_CPP_LOG_TRACE_("OnNoMoreWork: leave");
324382
}
325383

326384
protected:
@@ -330,14 +388,22 @@ class RestClientImpl : public RestClient {
330388

331389
private:
332390
Request::Properties::ptr_t default_connection_properties_ = make_shared<Request::Properties>();
391+
unique_ptr<boost::asio::io_service> ioservice_instance_;
333392
boost::asio::io_service *io_service_ = nullptr;
334393
ConnectionPool::ptr_t pool_;
335394
unique_ptr<boost::asio::io_service::work> work_;
336-
size_t current_tasks_ = 0;
395+
#ifdef RESTC_CPP_THREADED_CTX
396+
atomic_size_t current_tasks_{0};
397+
#else
398+
size_t current_tasks_{0};
399+
#endif
337400
bool closed_ = false;
338-
unique_ptr<thread> thread_;
339-
recursive_mutex done_mutex_;
340-
unique_ptr<boost::asio::io_service> ioservice_instance_;
401+
once_flag close_once_;
402+
std::vector<unique_ptr<thread>> threads_;
403+
std::vector<std::unique_ptr<recursive_mutex>> done_mutexes_;
404+
std::once_flag close_pool_once_;
405+
std::once_flag close_ioservice_once_;
406+
341407

342408
#ifdef RESTC_CPP_WITH_TLS
343409
shared_ptr<boost::asio::ssl::context> tls_context_;
@@ -349,6 +415,10 @@ class RestClientImpl : public RestClient {
349415
| boost::asio::ssl::context::no_sslv3);
350416
}
351417
#endif
418+
419+
#ifdef RESTC_CPP_THREADED_CTX
420+
mutable std::mutex mutex_;
421+
#endif
352422
};
353423

354424
unique_ptr<RestClient> RestClient::Create() {

‎src/TlsSocketImpl.h

+11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <future>
66

77
#include "restc-cpp/restc-cpp.h"
8+
#include "restc-cpp/logging.h"
89

910
#include <boost/asio/ssl.hpp>
1011
#include <boost/version.hpp>
@@ -72,12 +73,22 @@ class TlsSocketImpl : public Socket, protected ExceptionWrapper {
7273
//TLS-SNI (without this option, handshakes attempts with hosts behind CDNs will fail,
7374
//due to the fact that the CDN does not have enough information at the TLS layer
7475
//to decide where to forward the handshake attempt).
76+
77+
RESTC_CPP_LOG_TRACE_("AsyncConnect - Calling SSL_set_tlsext_host_name --> " << host);
7578
SSL_set_tlsext_host_name(ssl_socket_->native_handle(), host.c_str());
79+
80+
RESTC_CPP_LOG_TRACE_("AsyncConnect - Calling async_connect");
7681
GetSocket().async_connect(ep, yield);
82+
83+
RESTC_CPP_LOG_TRACE_("AsyncConnect - Calling lowest_layer().set_option");
7784
ssl_socket_->lowest_layer().set_option(
7885
boost::asio::ip::tcp::no_delay(tcpNodelay));
86+
87+
RESTC_CPP_LOG_TRACE_("AsyncConnect - Calling async_handshake");
7988
ssl_socket_->async_handshake(boost::asio::ssl::stream_base::client,
8089
yield);
90+
91+
RESTC_CPP_LOG_TRACE_("AsyncConnect - Done");
8192
});
8293
}
8394

‎tests/functional/CRUD_test.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,6 @@ STARTCASE(TestHEAD) {
124124

125125
int main( int argc, char * argv[] )
126126
{
127-
RESTC_CPP_TEST_LOGGING_SETUP("debug");
127+
RESTC_CPP_TEST_LOGGING_SETUP("trace");
128128
return lest::run( specification, argc, argv );
129129
}

‎tests/functional/ConnectionCacheTests.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ STARTCASE(TestConnectionClose) {
6363
}
6464

6565
CHECK_EQUAL(0, static_cast<int>(
66-
rest_client->GetConnectionPool()->GetIdleConnections().get()));
66+
rest_client->GetConnectionPool()->GetIdleConnections()));
6767

6868
}).get();
6969
} ENDCASE
@@ -125,11 +125,11 @@ STARTCASE(TestCleanupTimer) {
125125

126126
}).get();
127127

128-
CHECK_EQUAL(1, static_cast<int>(pool->GetIdleConnections().get()));
128+
CHECK_EQUAL(1, static_cast<int>(pool->GetIdleConnections()));
129129

130130
std::this_thread::sleep_for(std::chrono::seconds(4));
131131

132-
CHECK_EQUAL(0, static_cast<int>(pool->GetIdleConnections().get()));
132+
CHECK_EQUAL(0, static_cast<int>(pool->GetIdleConnections()));
133133
} ENDCASE
134134

135135
STARTCASE(TestPrematureCloseNotRecycled) {
@@ -143,7 +143,7 @@ STARTCASE(TestPrematureCloseNotRecycled) {
143143
repl_one.reset();
144144

145145
CHECK_EQUAL(0, static_cast<int>(
146-
rest_client->GetConnectionPool()->GetIdleConnections().get()));
146+
rest_client->GetConnectionPool()->GetIdleConnections()));
147147

148148
}).get();
149149
} ENDCASE

‎tests/functional/ConnectionPoolInstancesTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ STARTCASE(UseAfterDelete) {
2929
auto repl = ctx.Get(GetDockerUrl(http_url));
3030
repl->GetBodyAsString();
3131
return 0;
32-
});
32+
}).get();
3333

3434
RestClient::Create()->ProcessWithPromiseT<int>([&](Context& ctx) {
3535
auto repl = ctx.Get(GetDockerUrl(http_url));

‎tests/functional/HttpsTest.cpp

+11-5
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,26 @@ BOOST_FUSION_ADAPT_STRUCT(
4242
// library to use (boost::asio only support openssl and compatible
4343
// libraries out of the box).
4444

45-
string https_url = "https://jsonplaceholder.typicode.com/posts/1";
45+
//string https_url = "https://jsonplaceholder.typicode.com/posts/1";
4646

47-
//string https_url = "https://lastviking.eu/files/api";
47+
string https_url = "https://lastviking.eu/files/api";
4848

4949
const lest::test specification[] = {
5050

5151
TEST(TestHTTPS)
5252
{
53-
shared_ptr<boost::asio::ssl::context> tls_ctx = make_shared<boost::asio::ssl::context>(boost::asio::ssl::context{ boost::asio::ssl::context::sslv23 });
54-
tls_ctx->set_options(boost::asio::ssl::context::default_workarounds
53+
shared_ptr<boost::asio::ssl::context> tls_ctx = make_shared<boost::asio::ssl::context>(boost::asio::ssl::context{ boost::asio::ssl::context::tlsv12_client});
54+
55+
try {
56+
tls_ctx->set_options(boost::asio::ssl::context::default_workarounds
5557
| boost::asio::ssl::context::no_sslv2
5658
| boost::asio::ssl::context::no_sslv3
57-
| boost::asio::ssl::context::no_tlsv1_1
59+
// | boost::asio::ssl::context::no_tlsv1_1
5860
| boost::asio::ssl::context::single_dh_use);
61+
} catch (const exception& ex) {
62+
cout << " *** exception " << ex.what() << endl;
63+
throw;
64+
}
5965

6066
auto client = RestClient::Create(tls_ctx);
6167
client->ProcessWithPromise([&](Context& ctx) {

0 commit comments

Comments
 (0)
Please sign in to comment.