#include <iostream>
#include <fstream>
#include <thread>
#include <future>
#include <array>

#include <boost/utility/string_ref.hpp>

#include "restc-cpp/restc-cpp.h"
#include "restc-cpp/logging.h"
#include "restc-cpp/Url.h"
#include "restc-cpp/Socket.h"
#include "restc-cpp/ConnectionPool.h"
#include "restc-cpp/IoTimer.h"
#include "restc-cpp/error.h"
#include "restc-cpp/url_encode.h"
#include "restc-cpp/RequestBody.h"
#include "ReplyImpl.h"

using namespace std;
using namespace std::string_literals;

namespace {

// We support versions of boost prior to the introduction of this convenience function

boost::asio::ip::address_v6 make_address_v6(const char* str,
    boost::system::error_code& ec)
{
  boost::asio::ip::address_v6::bytes_type bytes;
  unsigned long scope_id = 0;
  if (boost::asio::detail::socket_ops::inet_pton(
        BOOST_ASIO_OS_DEF(AF_INET6), str, &bytes[0], &scope_id, ec) <= 0)
    return boost::asio::ip::address_v6();
  return boost::asio::ip::address_v6(bytes, scope_id);
}

boost::asio::ip::address_v4 make_address_v4(const char* str,
    boost::system::error_code& ec)
{
  boost::asio::ip::address_v4::bytes_type bytes;
  if (boost::asio::detail::socket_ops::inet_pton(
        BOOST_ASIO_OS_DEF(AF_INET), str, &bytes, 0, ec) <= 0)
    return boost::asio::ip::address_v4();
  return boost::asio::ip::address_v4(bytes);
}

boost::asio::ip::address make_address(const char* str,
    boost::system::error_code& ec)
{
  boost::asio::ip::address_v6 ipv6_address =
    make_address_v6(str, ec);
  if (!ec)
    return boost::asio::ip::address{ipv6_address};

  boost::asio::ip::address_v4 ipv4_address =
    make_address_v4(str, ec);
  if (!ec)
    return boost::asio::ip::address{ipv4_address};

  return boost::asio::ip::address{};
}

} // anon ns

namespace restc_cpp {

const std::string& Request::Proxy::GetName() {
    static const array<string, 3> names = {
      "NONE", "HTTP", "SOCKS5"
    };

    return names.at(static_cast<size_t>(type));
}

namespace {

constexpr char SOCKS5_VERSION = 0x05;
constexpr char SOCKS5_TCP_STREAM = 0x01;
constexpr size_t SOCKS5_MAX_HOSTNAME_LEN = 255;
constexpr char SOCKS5_IPV4_ADDR = 0x01;
constexpr char SOCKS5_IPV6_ADDR = 0x04;
constexpr char SOCKS5_HOSTNAME_ADDR = 0x03;

/* hostname: example.com:123                 -> "example.com", 123
 * ipv4:     1.2.3.4:123                     -> "1.2.3.4", 123
 * ipv6:     [fe80::4479:f6ff:fea3:aa23]:123 -> "fe80::4479:f6ff:fea3:aa23", 123
 */
pair<string, uint16_t> ParseAddress(const std::string addr) {
    auto pos = addr.find('['); // IPV6
    string host;
    string port;
    if (pos != string::npos) {
        auto host = addr.substr(1); // strip '['
        pos = host.find(']');
        if (pos == string::npos) {
            throw ParseException{"IPv6 address must have a ']'"};
        }
        port = host.substr(pos);
        host = host.substr(0, pos);

        if (port.size() < 3 || (host.at(1) != ':')) {
            throw ParseException{"Need `]:<port>` in "s + addr};
        }
        port = port.substr(2);
    } else {
        // IPv4 or address
        pos = addr.find(':');
        if (pos == string::npos || (addr.size() - pos) < 2) {
            throw ParseException{"Need `:<port>` in "s + addr};
        }
        port = addr.substr(pos +1);
        host = addr.substr(0, pos);
    }

    const uint16_t port_num = stoul(port);
    if (port_num == 0 || port_num > numeric_limits<uint16_t>::max()) {
        throw ParseException{"Port `:<port>` must be a valid IP port number in "s + addr};
    }

    return {host, port_num};
}

/*! Parse the address and write the socks5 connect request */
void ParseAddressIntoSocke5ConnectRequest(const std::string& addr,
                                          vector<uint8_t>& out) {

    out.push_back(SOCKS5_VERSION);
    out.push_back(SOCKS5_TCP_STREAM);
    out.push_back(0);

    string host;
    uint16_t port = 0;

    tie(host, port) = ParseAddress(addr);

    const auto final_port = htons(port);

    boost::system::error_code ec;
    const auto a = make_address(host.c_str(), ec);
    if (ec) {
        // Assume that it's a hostname.
        // TODO: Validate with a regex
        if (host.size() > SOCKS5_MAX_HOSTNAME_LEN) {
            throw ParseException{"SOCKS5 address must be <= 255 bytes"};
        }
        if (host.size() < 1) {
            throw ParseException{"SOCKS5 address must be > 1 byte"};
        }

        out.push_back(SOCKS5_HOSTNAME_ADDR);

        // Add string lenght (single byte)
        out.push_back(static_cast<uint8_t>(host.size()));

        // Copy the address, without trailing zero
        copy(host.begin(), host.end(), back_insert_iterator<vector<uint8_t>>(out));
    } else if (a.is_v4()) {
        const auto v4 = a.to_v4();
        const auto b = v4.to_bytes();
        assert(b.size() == 4);
        out.push_back(SOCKS5_IPV4_ADDR);
        copy(b.begin(), b.end(), back_insert_iterator<vector<uint8_t>>(out));
    } else if (a.is_v6()) {
        const auto v6 = a.to_v6();
        const auto b = v6.to_bytes();
        assert(b.size() == 16);
        out.push_back(SOCKS5_IPV6_ADDR);
        copy(b.begin(), b.end(), back_insert_iterator<vector<uint8_t>>(out));
    } else {
        throw ParseException{"Internal error. Failed to parse: "s + addr};
    }

    // Add 2 byte port number in network byte order
    assert(sizeof(final_port) >= 2);
    const unsigned char *p = reinterpret_cast<const unsigned char *>(&final_port);
    out.push_back(*p);
    out.push_back(*(p +1));
}

// Return 0 whene there is no more bytes to read
size_t ValidateCompleteSocks5ConnectReply(uint8_t *buf, size_t len) {
    if (len < 5) {
        throw RestcCppException{"SOCKS5 server connect reply must start at minimum 5 bytes"s};
    }

    if (buf[0] != SOCKS5_VERSION) {
        throw ProtocolException{"Wrong/unsupported SOCKS5 version in connect reply: "s
                                + to_string(buf[0])};
    }

    if (buf[1] != 0) { // Request failed
        throw ProtocolException{"Unexpected value in SOCKS5 header[1] "s
                                + to_string(buf[1])};
    }

    size_t hdr_len = 5; // Mandatory bytes

    switch(buf[3]) {
    case SOCKS5_IPV4_ADDR:
        hdr_len += 4 + 1;
        break;
    case SOCKS5_IPV6_ADDR:
        hdr_len += 16 + 1;
        break;
    case SOCKS5_HOSTNAME_ADDR:
        if (len < 4) {
            return false; // We need the length field...
        }
        hdr_len += buf[3] + 1 + 1;
    break;
    default:
        throw ProtocolException{"Wrong/unsupported SOCKS5 BINDADDR type: "s
                                + to_string(buf[3])};
    }

    if (len > hdr_len) {
        throw NotSupportedException{"SOCKS5: Received more data then the connect response."};
    }

    return hdr_len;
}

void DoSocks5Handshake(Connection& connection,
                       const Url& url,
                       const Request::Properties properties,
                       Context& ctx) {

    assert(properties.proxy.type == Request::Proxy::Type::SOCKS5);
    auto& sck = connection.GetSocket();

    // Send no-auth handshake
    {
        array<uint8_t, 3> hello = {SOCKS5_VERSION, 1, 0};
        RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - saying hello");
        sck.AsyncWriteT(hello, ctx.GetYield());
    }
    {
        array<uint8_t, 2> reply = {};
        RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - waiting for greeting");
        sck.AsyncRead({reply.data(), 2}, ctx.GetYield());

        if (reply[0] != SOCKS5_VERSION) {
            throw ProtocolException{"Wrong/unsupported SOCKS5 version: "s + to_string(reply[0])};
        }

        if (reply[1] != 0) {
            throw AccessDeniedException{"SOCKS5 Access denied: "s + to_string(reply[1])};
        }
    }

    // Send connect parameters
    {
        vector<uint8_t> params;

        auto addr = url.GetHost().to_string() + ":" + to_string(url.GetPort());

        ParseAddressIntoSocke5ConnectRequest(addr, params);
        RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - saying connect to " <<  url.GetHost().to_string() << ":" << url.GetPort());
        sck.AsyncWriteT(params, ctx.GetYield());
    }

    {
        array<uint8_t, 255 + 6> reply;
        size_t remaining = 5; // Minimum length
        uint8_t *next = reply.data();

        RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - waiting for connect confirmation - first segment");
        auto read = sck.AsyncRead({next, remaining}, ctx.GetYield());
        const auto hdr_len = ValidateCompleteSocks5ConnectReply(reply.data(), read);
        if (hdr_len > read) {
            remaining = hdr_len - read;
            next += read;
            if (hdr_len > reply.size()) {
                throw ProtocolException{"SOCKS5 Connect header from the server is too large"};
            }
            RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - waiting for connect confirmation - second segment");
            read += sck.AsyncRead({next, remaining}, ctx.GetYield());
            ValidateCompleteSocks5ConnectReply(reply.data(), read);
        }

        assert(read == hdr_len);
    }
    RESTC_CPP_LOG_TRACE_("DoSocks5Handshake - done");
}
} // anonumous ns

class RequestImpl : public Request {
public:

    struct RedirectException{

        RedirectException(const RedirectException&) = delete;
        RedirectException(RedirectException &&) = default;

        RedirectException(int redirCode, string redirUrl, std::unique_ptr<Reply> reply)
        : code{redirCode}, url{move(redirUrl)}, redirectReply{move(reply)}
        {}

        RedirectException() = delete;
        ~RedirectException() = default;
        RedirectException& operator = (const RedirectException&) = delete;
        RedirectException& operator = (RedirectException&&) = delete;

        int GetCode() const noexcept { return code; };
        const std::string& GetUrl() const noexcept { return url; }
        Reply& GetRedirectReply() const { return *redirectReply; }

    private:
        const int code;
        std::string url;
        std::unique_ptr<Reply> redirectReply;
    };

    RequestImpl(std::string url,
                const Type requestType,
                RestClient& owner,
                std::unique_ptr<RequestBody> body,
                const boost::optional<args_t>& args,
                const boost::optional<headers_t>& headers,
                const boost::optional<auth_t>& auth = {})
    : url_{move(url)}, parsed_url_{url_.c_str()} , request_type_{requestType}
    , body_{move(body)}, owner_{owner}
    {
       if (args || headers || auth) {
            Properties::ptr_t props = owner_.GetConnectionProperties();
            assert(props);
            properties_ = make_shared<Properties>(*props);

            if (args) {
                properties_->args.insert(properties_->args.end(),
                                         args->begin(), args->end());
            }

            merge_map(headers, properties_->headers);

            if (auth) {
                SetAuth(*auth);
            }

        } else {
            properties_ = owner_.GetConnectionProperties();
        }
    }

    // modified from http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c
    static std::string Base64Encode(const std::string &in) {
        // Silence the cursed clang-tidy...
        constexpr auto magic_4 = 4;
        constexpr auto magic_6 = 6;
        constexpr auto magic_8 = 8;
        constexpr auto magic_3f = 0x3F;

        static const string alphabeth {"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"};
        string out;

        int val = 0;
        int valb = -magic_6;
        for (const uint8_t c : in) {
            val = (val<<magic_8) + c;
            valb += magic_8;
            while (valb>=0) {
                out.push_back(alphabeth[(val>>valb)&magic_3f]);
                valb-=magic_6;
            }
        }
        if (valb>-magic_6) out.push_back(alphabeth[((val<<magic_8)>>(valb+magic_8))&magic_3f]);
        while (out.size()%magic_4) out.push_back('=');
        return out;
    }

    void SetAuth(const Auth& auth) {
        static const string authorization{"Authorization"};
        static const string basic_sp{"Basic "};

        std::string pre_base = auth.name + ':' + auth.passwd;
        properties_->headers[authorization]
            = basic_sp + Base64Encode(pre_base);
        std::memset(&pre_base[0], 0, pre_base.capacity());
        pre_base.clear();
    }

    const Properties& GetProperties() const override {
        return *properties_;
    }

    void SetProperties(Properties::ptr_t propreties) override {
        properties_ = move(propreties);
    }

    const std::string& Verb(const Type requestType) {
        static const std::array<std::string, 7> names =
            {{ "GET", "POST", "PUT", "DELETE", "OPTIONS",
                "HEAD", "PATCH"
            }};

        return names.at(static_cast<size_t>(requestType));
    }

    uint64_t GetContentBytesSent() const noexcept {
        return bytes_sent_ - header_size_;
    }

    unique_ptr<Reply> Execute(Context& ctx) override {
        int redirects = 0;
        while(true) {
            try {
                return DoExecute((ctx));
            } catch(const RedirectException& ex) {
                
                auto url = ex.GetUrl();
                
                if (properties_->redirectFn) {
                    properties_->redirectFn(ex.GetCode(), url, ex.GetRedirectReply());
                }
                
                if ((properties_->maxRedirects >= 0)
                    && (++redirects > properties_->maxRedirects)) {
                    throw ConstraintException("Too many redirects.");
                }

                RESTC_CPP_LOG_DEBUG_("Redirecting ("
                    << ex.GetCode()
                    << ") '" << url_
                    << "' --> '"
                    << url
                    << "') ");
                url_ = move(url);
                parsed_url_ = url_.c_str();
                add_url_args_ = false; // Use whatever arguments we got in the redirect
            }
        }
    }


private:
    void ValidateReply(const Reply& reply) {
        // Silence the cursed clang tidy!
        constexpr auto magic_2 = 2;
        constexpr auto magic_100 = 100;
        constexpr auto http_401 = 401;
        constexpr auto http_403 = 403;
        constexpr auto http_404 = 404;
        constexpr auto http_405 = 405;
        constexpr auto http_406 = 406;
        constexpr auto http_407 = 407;
        constexpr auto http_408 = 408;

        const auto& response = reply.GetHttpResponse();
        if ((response.status_code / magic_100) > magic_2) switch(response.status_code) {
            case http_401:
                throw HttpAuthenticationException(response);
            case http_403:
                throw HttpForbiddenException(response);
            case http_404:
                throw HttpNotFoundException(response);
            case http_405:
                throw HttpMethodNotAllowedException(response);
            case http_406:
                throw HttpNotAcceptableException(response);
            case http_407:
                throw HttpProxyAuthenticationRequiredException(response);
            case http_408:
                throw HttpRequestTimeOutException(response);
            default:
                throw RequestFailedWithErrorException(response);
        }
    }

    std::string BuildOutgoingRequest() {
        static const std::string crlf{"\r\n"};
        static const std::string column{": "};
        static const string host{"Host"};

        std::ostringstream request_buffer;

        // Build the request-path
        request_buffer << Verb(request_type_) << ' ';

        if (properties_->proxy.type == Request::Proxy::Type::HTTP) {
            request_buffer << parsed_url_.GetProtocolName() << parsed_url_.GetHost() << ":" << parsed_url_.GetPort();
        }

        // Add arguments to the path as ?name=value&name=value...
        bool first_arg = true;
        if (add_url_args_) {
            // Normal processing.
            request_buffer << url_encode(parsed_url_.GetPath());
            for(const auto& arg : properties_->args) {
                if (first_arg) {
                    first_arg = false;
                    request_buffer << '?';
                } else {
                    request_buffer << '&';
                }

                request_buffer
                    << url_encode(arg.name)
                    << '=' << url_encode(arg.value);
            }
        } else {
            // After a redirect. We The redirect-url in parsed_url_ should be encoded,
            // and may be exactly what the target expects - so we do nothing here.
            request_buffer << parsed_url_.GetPath();
        }

        request_buffer << " HTTP/1.1" << crlf;

        // Build the header buffers
        headers_t headers = properties_->headers;
        assert(writer_);

        // Let the writers set their individual headers.
        writer_->SetHeaders(headers);

        if (headers.find(host) == headers.end()) {
            request_buffer << host << ": " << parsed_url_.GetHost().to_string() << crlf;
        }

        for(const auto& it : headers) {
            request_buffer << it.first << column << it.second << crlf;
        }

        // End the header section.
        request_buffer << crlf;

        return request_buffer.str();
    }

    boost::asio::ip::tcp::resolver::query GetRequestEndpoint() {
        const auto proxy_type = properties_->proxy.type;

        if (proxy_type == Request::Proxy::Type::SOCKS5) {
            string host;
            uint16_t port = 0;
            tie(host, port) = ParseAddress(properties_->proxy.address);

            RESTC_CPP_LOG_TRACE_("Using " << properties_->proxy.GetName()
                                 << " Proxy at: "
                                 << host << ':' << port);

            return {host, to_string(port)};
        }

        if (proxy_type == Request::Proxy::Type::HTTP) {
            Url proxy {properties_->proxy.address.c_str()};

            RESTC_CPP_LOG_TRACE_("Using " << properties_->proxy.GetName()
                                 << " Proxy at: "
                                 << proxy.GetHost() << ':' << proxy.GetPort());

            return { proxy.GetHost().to_string(),
                proxy.GetPort().to_string()};
        }

        return { parsed_url_.GetHost().to_string(),
            parsed_url_.GetPort().to_string()};
    }

    /* If we are redirected, we need to reset the body
     * Some body implementations may not support that and throw in Reset()
     */
    void PrepareBody() {
        if (body_) {
            if (dirty_) {
                body_->Reset();
            }
        }
        dirty_ = true;
    }

    template <typename protocolT>
    boost::asio::ip::tcp::endpoint ToEp(const std::string& endp,
                                        const protocolT& protocol,
                                        Context& ctx) const {
        string host, port;

        auto endipv6 = endp.find(']');
        if (endipv6 == string::npos) {
            endipv6 = 0;
        }
        const auto pos = endp.find(':', endipv6);
        if (pos == string::npos) {
            host = endp;
        } else {
            host = endp.substr(0, pos);
            if (endp.size() > pos) {
                port = endp.substr(pos + 1);
            }
        }

        // Strip IPv6 brackets, asio can't resolve it
        if (endipv6 && !host.empty() && host.front() == '[') {
            host = host.substr(1, endipv6 -1);
        }

        RESTC_CPP_LOG_TRACE_("host=" << host << ", port=" << port);

        if (host.empty()) {
            const auto port_num = stoi(port);
            if (port_num < 1 || port_num > std::numeric_limits<uint16_t>::max()) {
                throw RestcCppException{"Port number out of range: "s + port};
            }
            return {protocol, static_cast<uint16_t>(port_num)};
        }

        boost::asio::ip::tcp::resolver::query q{host, port};
        boost::asio::ip::tcp::resolver resolver(owner_.GetIoService());

        auto ep = resolver.async_resolve(q, ctx.GetYield());
        const decltype(ep) addr_end;
        for(; ep != addr_end; ++ep)
        if (ep != addr_end) {

            RESTC_CPP_LOG_TRACE_("ep=" << ep->endpoint() << ", protocol=" << ep->endpoint().protocol().protocol());

            if (protocol == ep->endpoint().protocol()) {
                return ep->endpoint();
            }

            RESTC_CPP_LOG_TRACE_("Incorrect protocol, looping for next alternative");
        }

        RESTC_CPP_LOG_ERROR_("Failed to resolve endpoint " << endp
                             << " with protocol " << protocol.protocol());
        throw FailedToResolveEndpointException{"Failed to resolve endpoint: "s + endp};
    }


    auto GetBindProtocols(const std::string& endp, Context& ctx) {
        std::vector<decltype(boost::asio::ip::tcp::v4())> p;

        if (!endp.empty()) {
            try {
                ToEp(endp, boost::asio::ip::tcp::v4(), ctx);
                p.push_back(boost::asio::ip::tcp::v4());
            } catch (const FailedToResolveEndpointException&) {
                ;
            }

            try {
                ToEp(endp, boost::asio::ip::tcp::v6(), ctx);
                p.push_back(boost::asio::ip::tcp::v6());
            } catch (const FailedToResolveEndpointException&) {
                ;
            }
        }

        return p;
    }

    Connection::ptr_t Connect(Context& ctx) {

        static const auto timer_name = "Connect"s;

        auto prot_filter = GetBindProtocols(properties_->bindToLocalAddress, ctx);

        const Connection::Type protocol_type =
            (parsed_url_.GetProtocol() == Url::Protocol::HTTPS)
            ? Connection::Type::HTTPS
            : Connection::Type::HTTP;

        boost::asio::ip::tcp::resolver resolver(owner_.GetIoService());
        // Resolve the hostname
        const auto query = GetRequestEndpoint();

        RESTC_CPP_LOG_TRACE_("Resolving " << query.host_name() << ":"
            << query.service_name());

        auto address_it = resolver.async_resolve(query,
                                                 ctx.GetYield());
        const decltype(address_it) addr_end;

        for(; address_it != addr_end; ++address_it) {
//            if (owner_.IsClosing()) {
//                RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: The rest client is closed (at first loop). Aborting.");
//                throw FailedToConnectException("Failed to connect (closed)");
//            }

            const auto endpoint = address_it->endpoint();

            RESTC_CPP_LOG_TRACE_("Trying endpoint " << endpoint);

            for(size_t retries = 0; retries < 8; ++retries) {
                // Get a connection from the pool
                auto connection = owner_.GetConnectionPool()->GetConnection(
                    endpoint, protocol_type);

                // Connect if the connection is new.
                if (connection->GetSocket().IsOpen()) {
                    return connection;
                }

                RESTC_CPP_LOG_DEBUG_("Connecting to " << endpoint);

                if (!properties_->bindToLocalAddress.empty()) {

                    // Only connect outwards to protocols we can bind to
                    if (!prot_filter.empty()) {
                        if (std::find(prot_filter.begin(), prot_filter.end(), endpoint.protocol())
                                == prot_filter.end()) {
                            RESTC_CPP_LOG_TRACE_("Filtered out (protocol mismatch) local address: "
                                << properties_->bindToLocalAddress);
                            break; // Break out of retry loop, re-enter endpoint loop
                        }
                    }

                    RESTC_CPP_LOG_TRACE_("Binding to local address: "
                        << properties_->bindToLocalAddress);

                    boost::system::error_code ec;
                    auto local_ep = ToEp(properties_->bindToLocalAddress, endpoint.protocol(), ctx);
                    auto& sck = connection->GetSocket().GetSocket();
                    sck.open(local_ep.protocol());
                    sck.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
                    sck.bind(local_ep, ec);

                    if (ec) {
                        RESTC_CPP_LOG_ERROR_("Failed to bind to local address '"
                            << local_ep
                            << "': " << ec.message());

                        sck.close();
                        throw RestcCppException{"Failed to bind to local address: "s
                                                + properties_->bindToLocalAddress};
                    }
                }


//                if (owner_.IsClosed()) {
//                    RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: The rest client is closed. Aborting.");
//                    throw FailedToConnectException("Failed to connect (closed)");
//                }

                auto timer = IoTimer::Create(timer_name,
                    properties_->connectTimeoutMs, connection);

                try {
                    if (retries) {
                        RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: taking a nap");
                        ctx.Sleep(retries * 20ms);
                        RESTC_CPP_LOG_DEBUG_("RequestImpl::Connect: Waking up. Will try to read from the socket now.");
                    }

                    if (properties_->proxy.type == Proxy::Type::SOCKS5) {
                        connection->GetSocket().SetAfterConnectCallback([&]() {
                            RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: In Socks5 callback");

                            DoSocks5Handshake(*connection, parsed_url_, *properties_, ctx);

                            RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: Leaving Socks5 callback");
                        });
                    }

                    RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: calling AsyncConnect --> " << endpoint);
                    connection->GetSocket().AsyncConnect(
                        endpoint, address_it->host_name(),
                        properties_->tcpNodelay, ctx.GetYield());
                    RESTC_CPP_LOG_TRACE_("RequestImpl::Connect: OK AsyncConnect --> " << endpoint);
                    return connection;
                } catch (const boost::system::system_error& ex) {
                    RESTC_CPP_LOG_TRACE_("RequestImpl::Connect:: Caught boost::system::system_error exception: \"" << ex.what()
                                         << "\". Will close connection " << *connection);
                    connection->GetSocket().GetSocket().close();

                    if (ex.code() == boost::system::errc::resource_unavailable_try_again) {
                        if ( retries < 8) {
                            RESTC_CPP_LOG_DEBUG_(
                                "RequestImpl::Connect:: Caught boost::system::system_error exception: \""
                                    << ex.what()
                                    << "\" while connecting to " << endpoint
                                    << ". I will continue the retry loop.");
                            continue;
                        }
                    }
                    RESTC_CPP_LOG_DEBUG_(
                        "RequestImpl::Connect:: Caught boost::system::system_error exception: \""
                            << ex.what()
                            << "\" while connecting to " << endpoint);
                    break; // Go to the next endpoint
                } catch(const exception& ex) {
                    RESTC_CPP_LOG_WARN_("Connect to "
                        << endpoint
                        << " failed with exception type: "
                        << typeid(ex).name()
                        << ", message: " << ex.what());

                    connection->GetSocket().GetSocket().close();
                }

            } // retries
        } // endpoints

        throw FailedToConnectException("Failed to connect (exhausted all options)");
    }

    void SendRequestPayload(Context& /*ctx*/,
                      write_buffers_t write_buffer) {

        static const auto timer_name = "SendRequestPayload"s;
        bool have_sent_headers = false;

        if (properties_->beforeWriteFn) {
            properties_->beforeWriteFn();
        }

        while(boost::asio::buffer_size(write_buffer))
        {
            auto timer = IoTimer::Create(timer_name,
                properties_->sendTimeoutMs, connection_);

            try {

                if (!have_sent_headers) {

                    auto b = write_buffer[0];

                    writer_->WriteDirect(
                        {boost::asio::buffer_cast<const char *>(b),
                        boost::asio::buffer_size(b)});

                    have_sent_headers = true;

                    if (write_buffer.size() > 1) {
                        write_buffer.erase(write_buffer.begin());
                        writer_->Write(write_buffer);
                    }

                } else {
                    writer_->Write(write_buffer);
                }

                bytes_sent_ += boost::asio::buffer_size(write_buffer);

            } catch(const exception& ex) {
                RESTC_CPP_LOG_WARN_("Write failed with exception type: "
                    << typeid(ex).name()
                    << ", message: " << ex.what());
                throw;
            }

            write_buffer.clear();

            if (body_) {
                switch(body_->GetType()) {
                    case RequestBody::Type::FIXED_SIZE:
                    case RequestBody::Type::CHUNKED_LAZY_PULL:
                        if (!body_->GetData(write_buffer)) {
                            return;
                        }
                        break;
                    case RequestBody::Type::CHUNKED_LAZY_PUSH:
                        body_->PushData(*writer_);
                }
            } else {
                return; // No more data to send
            }
        }
    }

    DataWriter& SendRequest(Context& ctx) override {
        bytes_sent_ = 0;

        connection_ = Connect(ctx);
        DataWriter::WriteConfig cfg;
        cfg.msWriteTimeout = properties_->sendTimeoutMs;
        writer_ = DataWriter::CreateIoWriter(connection_, ctx, cfg);

        if (body_) {
            if (body_->GetType() == RequestBody::Type::FIXED_SIZE) {
                writer_ = DataWriter::CreatePlainWriter(
                    body_->GetFixedSize(), move(writer_));
            } else {
                writer_ = DataWriter::CreateChunkedWriter(nullptr, move(writer_));
            }
        } else {
            static const string transfer_encoding{"Transfer-Encoding"};
            static const string chunked{"chunked"};
            auto h = properties_->headers.find(transfer_encoding);
            if ((h != properties_->headers.end()) && ciEqLibC()(h->second, chunked)) {
                writer_ = DataWriter::CreateChunkedWriter(nullptr, move(writer_));
            } else {
                writer_ = DataWriter::CreatePlainWriter(0, move(writer_));
            }
        }

        // TODO: Add compression

        write_buffers_t write_buffer;
        ToBuffer headers(BuildOutgoingRequest());
        write_buffer.push_back(headers);
        header_size_ = boost::asio::buffer_size(write_buffer);

        RESTC_CPP_LOG_TRACE_("Request: " << (const boost::string_ref)headers
            << ' ' << *connection_);

        PrepareBody();
        SendRequestPayload(ctx, write_buffer);
        if (properties_->afterWriteFn) {
            properties_->afterWriteFn();
        }

        RESTC_CPP_LOG_DEBUG_("Sent " << Verb(request_type_) << " request to '" << url_ << "' "
            << *connection_);

        assert(writer_);
        return *writer_;
    }

    unique_ptr<Reply> GetReply(Context& ctx) override {
        constexpr auto http_301 = 301;
        constexpr auto http_302 = 302;

        // We will not send more data regarding the current request
        writer_->Finish();
        writer_.reset();

        RESTC_CPP_LOG_TRACE_("GetReply: writer is reset.");

        DataReader::ReadConfig cfg;
        cfg.msReadTimeout = properties_->recvTimeout;
        auto reply = ReplyImpl::Create(connection_, ctx, owner_, properties_,
                                       request_type_);

        RESTC_CPP_LOG_TRACE_("GetReply: Calling StartReceiveFromServer");
        try {
            reply->StartReceiveFromServer(
                DataReader::CreateIoReader(connection_, ctx, cfg));
        } catch (const exception& ex) {
            RESTC_CPP_LOG_DEBUG_("GetReply: exception from StartReceiveFromServer: " << ex.what());
            throw;
        }

        RESTC_CPP_LOG_TRACE_("GetReply: Returned from StartReceiveFromServer. code=" << reply->GetResponseCode());

        const auto http_code = reply->GetResponseCode();
        if (http_code == http_301 || http_code == http_302) {
            auto redirect_location = reply->GetHeader("Location");
            if (!redirect_location) {
                throw ProtocolException(
                    "No Location header in redirect reply");
            }
            RESTC_CPP_LOG_TRACE_("GetReply: RedirectException. location=" << *redirect_location);
            throw RedirectException(http_code, *redirect_location, move(reply));
        }

        if (properties_->throwOnHttpError) {
            RESTC_CPP_LOG_TRACE_("GetReply: Calling ValidateReply");
            ValidateReply(*reply);
            RESTC_CPP_LOG_TRACE_("GetReply: returning from ValidateReply");
        }

        /* Return the reply. At this time the reply headers and body
            * is returned. However, the body may or may not be
            * received.
            */

        return move(reply);
    }



    unique_ptr<Reply> DoExecute(Context& ctx) {
        SendRequest(ctx);
        return GetReply(ctx);
    }

    std::string url_;
    Url parsed_url_;
    const Type request_type_;
    std::unique_ptr<RequestBody> body_;
    Connection::ptr_t connection_;
    std::unique_ptr<DataWriter> writer_;
    Properties::ptr_t properties_;
    RestClient &owner_;
    size_t header_size_ = 0;
    std::uint64_t bytes_sent_ = 0;
    bool dirty_ = false;
    bool add_url_args_ = true;
};


std::unique_ptr<Request>
Request::Create(const std::string& url,
                const Type requestType,
                RestClient& owner,
                std::unique_ptr<RequestBody> body,
                const boost::optional<args_t>& args,
                const boost::optional<headers_t>& headers,
                const boost::optional<auth_t>& auth) {

    return make_unique<RequestImpl>(url, requestType, owner, move(body), args, headers, auth);
}

} // restc_cpp