#ifdef __unix__ # include <cxxabi.h> #endif #include <iostream> #include <thread> #include <future> #include "restc-cpp/restc-cpp.h" #include "restc-cpp/logging.h" #include "restc-cpp/ConnectionPool.h" #include "restc-cpp/RequestBody.h" #include "restc-cpp/internals/helpers.h" #ifdef RESTC_CPP_WITH_TLS # include "boost/asio/ssl.hpp" # include "boost/asio/ssl/context.hpp" #endif using namespace std; namespace restc_cpp { class RestClientImpl : public RestClient { public: /*! Proper shutdown handling * We remove the waiter in CloseWhenReady(), but if we have * a connection pool, the timer there will still keep the * ioservice busy. So we have to manually shut down the * ioservice if the waiter has been removed, and we have * no more coroutines in flight. */ struct DoneHandlerImpl : public DoneHandler { DoneHandlerImpl(RestClientImpl& parent) : parent_{parent} { RESTC_CPP_LOG_TRACE_("Done-handler is created"); ++parent_.current_tasks_; } ~DoneHandlerImpl() override { RESTC_CPP_LOG_TRACE_("Done-handler is destroyed"); if (--parent_.current_tasks_ == 0) { parent_.OnNoMoreWork(); } } DoneHandlerImpl(const DoneHandlerImpl&) = delete; DoneHandlerImpl(DoneHandlerImpl&&) = delete; DoneHandlerImpl& operator = (const DoneHandlerImpl&) = delete; DoneHandlerImpl& operator = (DoneHandlerImpl&&) = delete; private: RestClientImpl& parent_; }; class ContextImpl : public Context { public: ContextImpl(boost::asio::yield_context& yield, RestClient& rc) : yield_{yield} , rc_{rc} {} RestClient& GetClient() override { return rc_; } boost::asio::yield_context& GetYield() override { return yield_; } unique_ptr<Reply> Get(string url) override { auto req = Request::Create(url, restc_cpp::Request::Type::GET, rc_); return Request(*req); } unique_ptr< Reply > Post(string url, string body) override { auto req = Request::Create(url, restc_cpp::Request::Type::POST, rc_, {RequestBody::CreateStringBody(move(body))}); return Request(*req); } unique_ptr< Reply > Put(string url, string body) override { auto req = Request::Create(url, restc_cpp::Request::Type::PUT, rc_, {RequestBody::CreateStringBody(move(body))}); return Request(*req); } unique_ptr< Reply > Delete(string url) override { auto req = Request::Create(url, restc_cpp::Request::Type::DELETE, rc_); return Request(*req); } unique_ptr< Reply > Options(string url) override { auto req = Request::Create(url, restc_cpp::Request::Type::OPTIONS, rc_); return Request(*req); } unique_ptr< Reply > Head(string url) override { auto req = Request::Create(url, restc_cpp::Request::Type::HEAD, rc_); return Request(*req); } unique_ptr< Reply > Patch(string url) override { auto req = Request::Create(url, restc_cpp::Request::Type::PATCH, rc_); return Request(*req); } unique_ptr<Reply> Request(restc_cpp::Request& req) override { return req.Execute(*this); } void Sleep(const boost::posix_time::microseconds& ms) override { boost::asio::deadline_timer timer( GetClient().GetIoService(), boost::posix_time::microseconds(ms)); timer.async_wait(GetYield()); } private: boost::asio::yield_context& yield_; RestClient& rc_; }; RestClientImpl(const boost::optional<Request::Properties>& properties, bool useMainThread) : ioservice_instance_{make_unique<boost::asio::io_service>()} { #ifdef RESTC_CPP_WITH_TLS setDefaultSSLContext(); #endif io_service_ = ioservice_instance_.get(); Init(properties, useMainThread); } #ifdef RESTC_CPP_WITH_TLS RestClientImpl(const boost::optional<Request::Properties>& properties, bool useMainThread, shared_ptr<boost::asio::ssl::context> ctx) : ioservice_instance_{ make_unique<boost::asio::io_service>() } { tls_context_ = move(ctx); io_service_ = ioservice_instance_.get(); Init(properties, useMainThread); } RestClientImpl(const boost::optional<Request::Properties>& properties, bool useMainThread, shared_ptr<boost::asio::ssl::context> ctx, boost::asio::io_service& ioservice) : io_service_{ &ioservice } { tls_context_ = move(ctx); io_service_ = ioservice_instance_.get(); Init(properties, useMainThread); } #endif RestClientImpl(const boost::optional<Request::Properties>& properties, boost::asio::io_service& ioservice) : io_service_{&ioservice} { #ifdef RESTC_CPP_WITH_TLS setDefaultSSLContext(); #endif Init(properties, true); } ~RestClientImpl() override { CloseWhenReady(false); for(auto &thread : threads_) { if (thread) { thread->join(); } } } RestClientImpl(const RestClientImpl&) = delete; RestClientImpl(RestClientImpl&&) = delete; RestClientImpl& operator = (const RestClientImpl&) = delete; RestClientImpl& operator = (RestClientImpl&&) = delete; void Init(const boost::optional<Request::Properties>& properties, bool useMainThread) { static const string content_type{"Content-Type"}; static const string json_type{"application/json; charset=utf-8"}; if (properties) *default_connection_properties_ = *properties; if (default_connection_properties_->headers.find(content_type) == default_connection_properties_->headers.end()) { default_connection_properties_->headers[content_type] = json_type; } pool_ = ConnectionPool::Create(*this); if (useMainThread) { return; } std::vector<promise<void>> wait; #ifndef RESTC_CPP_THREADED_CTX if (default_connection_properties_->threads != 1) { RESTC_CPP_LOG_WARN_("Init: Compiled without RESTC_CPP_THREADED_CTX: Using only one thread!"); default_connection_properties_->threads = 1; } #endif // Make sure we don't re-arrange the vectors while some thread is reading from it done_mutexes_.reserve(default_connection_properties_->threads); threads_.reserve(default_connection_properties_->threads); for(size_t i = 0; i < default_connection_properties_->threads; ++i) { wait.emplace_back(); done_mutexes_.push_back(make_unique<recursive_mutex>()); } work_ = make_unique<boost::asio::io_service::work>(*io_service_); RESTC_CPP_LOG_TRACE_("Starting " <<default_connection_properties_->threads << " worker thread(s)"); for(size_t i = 0; i < default_connection_properties_->threads; ++i) { threads_.emplace_back(make_unique<thread>([i, this, &wait]() { lock_guard<recursive_mutex> lock(*done_mutexes_.at(i)); try { wait.at(i).set_value(); RESTC_CPP_LOG_DEBUG_("Worker " << i << " is starting."); io_service_->run(); RESTC_CPP_LOG_DEBUG_("Worker " << i << " is done."); } catch (const exception& ex) { RESTC_CPP_LOG_ERROR_("Worker " << i << " caught exception: " << ex.what()); } })); } // Wait for the therads to be started for(auto& w : wait) { w.get_future().get(); } RESTC_CPP_LOG_TRACE_("All worker threads have started"); } Request::Properties::ptr_t GetConnectionProperties() const override { return default_connection_properties_; } bool IsClosing() const noexcept override { return closed_; } void CloseWhenReady(bool wait) override { ClearWork(); if (!io_service_->stopped()) { io_service_->dispatch([this](){ if (current_tasks_ == 0) { OnNoMoreWork(); } }); } if (wait) { RESTC_CPP_LOG_TRACE_("CloseWhenReady: Waiting for work to end."); // We have to lock/unlock each of them to make sure that all the threads are done for(auto& dm : done_mutexes_) { if (dm) { lock_guard<recursive_mutex> lock(*dm); } } RESTC_CPP_LOG_TRACE_("CloseWhenReady: Done waiting for work to end."); } } void ClearWork() { if (closed_) { return; } if (!io_service_->stopped()) { call_once(close_once_, [&] { auto promise = make_shared<std::promise<void>>(); io_service_->dispatch([this, promise]() { LOCK_; if (work_) { work_.reset(); } closed_ = true; promise->set_value(); }); // Wait for the lambda to finish; promise->get_future().get(); }); } } void ProcessInWorker(boost::asio::yield_context yield, const prc_fn_t& fn, const shared_ptr<promise<void>>& promise) { ContextImpl ctx(yield, *this); DoneHandlerImpl handler(*this); try { fn(ctx); } catch (boost::coroutines::detail::forced_unwind const&) { throw; // required for Boost Coroutine! } catch(const exception& ex) { RESTC_CPP_LOG_ERROR_("ProcessInWorker: Caught exception: " << ex.what()); if (promise) { promise->set_exception(current_exception()); return; } terminate(); } catch (const boost::exception& ex) { RESTC_CPP_LOG_ERROR_("ProcessInWorker: Caught boost exception: " << boost::diagnostic_information(ex)); terminate(); } catch(...) { ostringstream estr; #ifdef __unix__ estr << " of type : " << __cxxabiv1::__cxa_current_exception_type()->name(); #endif RESTC_CPP_LOG_ERROR_("*** ProcessInWorker: Caught unknown exception " << estr.str()); if (promise) { promise->set_exception(current_exception()); return; } terminate(); } if (promise) { promise->set_value(); } } void Process(const prc_fn_t& fn) override { boost::asio::spawn(*io_service_, bind(&RestClientImpl::ProcessInWorker, this, placeholders::_1, fn, nullptr)); } future< void > ProcessWithPromise(const prc_fn_t& fn) override { auto promise = make_shared<std::promise<void>>(); auto future = promise->get_future(); boost::asio::spawn(*io_service_, bind(&RestClientImpl::ProcessInWorker, this, placeholders::_1, fn, promise)); return future; } std::shared_ptr<ConnectionPool> GetConnectionPool() override { assert(pool_); return pool_; } boost::asio::io_service& GetIoService() override { return *io_service_; } #ifdef RESTC_CPP_WITH_TLS shared_ptr<boost::asio::ssl::context> GetTLSContext() override { return tls_context_; } #endif void OnNoMoreWork() { RESTC_CPP_LOG_TRACE_("OnNoMoreWork: enter"); LOCK_; if (current_tasks_ > 0) { // Cannot close down quite yet. RESTC_CPP_LOG_TRACE_("OnNoMoreWork: leaving - we have active tasks"); return; } if (closed_ && pool_) { call_once(close_pool_once_, [&] { RESTC_CPP_LOG_TRACE_("OnNoMoreWork: closing pool"); pool_->Close(); pool_.reset(); }); } if (closed_ && ioservice_instance_) { if (!work_ && !io_service_->stopped()) { call_once(close_ioservice_once_, [&] { RESTC_CPP_LOG_TRACE_("OnNoMoreWork: Stopping ioservice"); io_service_->stop(); }); } } RESTC_CPP_LOG_TRACE_("OnNoMoreWork: leave"); } protected: unique_ptr<DoneHandler> GetDoneHandler() override { return make_unique<DoneHandlerImpl>(*this); } private: Request::Properties::ptr_t default_connection_properties_ = make_shared<Request::Properties>(); unique_ptr<boost::asio::io_service> ioservice_instance_; boost::asio::io_service *io_service_ = nullptr; ConnectionPool::ptr_t pool_; unique_ptr<boost::asio::io_service::work> work_; #ifdef RESTC_CPP_THREADED_CTX atomic_size_t current_tasks_{0}; #else size_t current_tasks_{0}; #endif bool closed_ = false; once_flag close_once_; std::vector<unique_ptr<thread>> threads_; std::vector<std::unique_ptr<recursive_mutex>> done_mutexes_; std::once_flag close_pool_once_; std::once_flag close_ioservice_once_; #ifdef RESTC_CPP_WITH_TLS shared_ptr<boost::asio::ssl::context> tls_context_; void setDefaultSSLContext() { tls_context_ = make_shared<boost::asio::ssl::context>(boost::asio::ssl::context{ boost::asio::ssl::context::sslv23_client }); tls_context_->set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::no_sslv3); } #endif #ifdef RESTC_CPP_THREADED_CTX mutable std::mutex mutex_; #endif }; unique_ptr<RestClient> RestClient::Create() { boost::optional<Request::Properties> properties; return make_unique<RestClientImpl>(properties, false); } #ifdef RESTC_CPP_WITH_TLS unique_ptr<RestClient> RestClient::Create(std::shared_ptr<boost::asio::ssl::context> ctx) { boost::optional<Request::Properties> properties; return make_unique<RestClientImpl>(properties, false, move(ctx)); } std::unique_ptr<RestClient> RestClient::Create(std::shared_ptr<boost::asio::ssl::context> ctx, const boost::optional<Request::Properties> &properties) { return make_unique<RestClientImpl>(properties, false, move(ctx)); } std::unique_ptr<RestClient> RestClient::Create(std::shared_ptr<boost::asio::ssl::context> ctx, const boost::optional<Request::Properties> &properties, boost::asio::io_service &ioservice) { return make_unique<RestClientImpl>(properties, false, move(ctx), ioservice); } #endif unique_ptr<RestClient> RestClient::Create( const boost::optional<Request::Properties>& properties) { return make_unique<RestClientImpl>(properties, false); } unique_ptr<RestClient> RestClient::CreateUseOwnThread(const boost::optional<Request::Properties>& properties) { return make_unique<RestClientImpl>(properties, true); } unique_ptr<RestClient> RestClient::CreateUseOwnThread() { return make_unique<RestClientImpl>(boost::optional<Request::Properties>{}, true); } std::unique_ptr<RestClient> RestClient::Create(const boost::optional<Request::Properties>& properties, boost::asio::io_service& ioservice) { return make_unique<RestClientImpl>(properties, ioservice); } std::unique_ptr<RestClient> RestClient::Create(boost::asio::io_service& ioservice) { return make_unique<RestClientImpl>(boost::optional<Request::Properties>{}, ioservice); } unique_ptr<Context> Context::Create(boost::asio::yield_context& yield, RestClient& rc) { return make_unique<RestClientImpl::ContextImpl>(yield, rc); } } // restc_cpp