Skip to content
This repository was archived by the owner on Apr 24, 2022. It is now read-only.

Reconnect #1135

Merged
merged 13 commits into from
May 25, 2018
Next Next commit
Async Disconnect and Reconnect
Attempt to solve problems related to disconnect/reconnect for timeouts
on responses or no-work.
On PoolManager the IO_SERVICE is now persistent and never exits

On Linux
* Disconnect/Reconnect without issues for plain TCP connections
* SSL Connections get shut down properly but at first reconnect attempt
it goes timeout. After that reconnection is immediate.

On Windows
* Feedback needed.
AndreaLanfranchi authored and chfast committed May 22, 2018

Partially verified

This commit is signed with the committer’s verified signature. The key has expired.
chfast’s contribution has been verified via GPG key.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
commit 4ccc26a526a03699635fa1898a629e3941b707b7
7 changes: 3 additions & 4 deletions ethminer/MinerAux.h
Original file line number Diff line number Diff line change
@@ -197,7 +197,7 @@ class MinerCLI
{
string url = argv[++i];
if (url == "exit") // add fake scheme and port to 'exit' url
url = "stratum://exit:1";
url = "stratum+tcp://-:x@exit:0";
URI uri;
try {
uri = url;
@@ -809,8 +809,7 @@ class MinerCLI
Farm f;
f.setSealers(sealers);

PoolManager mgr(client, f, m_minerType);
mgr.setReconnectTries(m_maxFarmRetries);
PoolManager mgr(client, f, m_minerType, m_maxFarmRetries);

// If we are in simulation mode we add a fake connection
if (m_mode == OperationMode::Simulation) {
@@ -848,7 +847,7 @@ class MinerCLI
}

mgr.stop();

cnote << "Terminated !";
exit(0);
}

1 change: 1 addition & 0 deletions libethcore/Farm.h
Original file line number Diff line number Diff line change
@@ -178,6 +178,7 @@ class Farm: public FarmFace

m_hashrateTimer.cancel();
m_io_service.stop();
m_serviceThread.join();

m_lastProgresses.clear();
}
3 changes: 2 additions & 1 deletion libpoolprotocols/PoolClient.h
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ namespace dev
virtual void submitHashrate(string const & rate) = 0;
virtual void submitSolution(Solution solution) = 0;
virtual bool isConnected() = 0;
virtual bool isPendingState() = 0;
virtual string ActiveEndPoint() = 0;

using SolutionAccepted = std::function<void(bool const&)>;
@@ -46,7 +47,7 @@ namespace dev
bool m_authorized = false;
bool m_connected = false;
bool m_connection_changed = false;
boost::asio::ip::tcp::endpoint m_endpoint;
boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> m_endpoint;

URI m_conn;

153 changes: 74 additions & 79 deletions libpoolprotocols/PoolManager.cpp
Original file line number Diff line number Diff line change
@@ -19,26 +19,17 @@ static string diffToDisplay(double diff)
return ss.str();
}

PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType) : Worker("main"), m_farm(farm), m_minerType(minerType)
PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries) : Worker("main"), m_farm(farm), m_minerType(minerType)
{
p_client = client;

m_maxConnectionAttempts = maxTries;

p_client->onConnected([&]()
{
m_connectionAttempt = 0;
cnote << "Connected to " << m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint();
if (!m_farm.isMining())
{
cnote << "Spinning up miners...";
if (m_minerType == MinerType::CL)
m_farm.start("opencl", false);
else if (m_minerType == MinerType::CUDA)
m_farm.start("cuda", false);
else if (m_minerType == MinerType::Mixed) {
m_farm.start("cuda", false);
m_farm.start("opencl", true);
}
}
});

p_client->onDisconnected([&]()
{
cnote << "Disconnected from " + m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint();
@@ -48,13 +39,12 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine
m_farm.stop();
}

if (m_running)
tryReconnect();
});

p_client->onWorkReceived([&](WorkPackage const& wp)
{
m_reconnectTry = 0;
m_farm.setWork(wp);

cnote << "New job" << wp.header << " " + m_connections[m_activeConnectionIdx].Host() + p_client->ActiveEndPoint();
if (wp.boundary != m_lastBoundary)
{
using namespace boost::multiprecision;
@@ -64,8 +54,24 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine
const uint256_t divisor(string("0x") + m_lastBoundary.hex());
cnote << "New pool difficulty:" << EthWhite << diffToDisplay(double(dividend / divisor)) << EthReset;
}
cnote << "New job" << wp.header << " " + m_connections[m_activeConnectionIdx].Host() + p_client->ActiveEndPoint();

if (!m_farm.isMining())
{
cnote << "Spinning up miners...";
if (m_minerType == MinerType::CL)
m_farm.start("opencl", false);
else if (m_minerType == MinerType::CUDA)
m_farm.start("cuda", false);
else if (m_minerType == MinerType::Mixed) {
m_farm.start("cuda", false);
m_farm.start("opencl", true);
}
}

m_farm.setWork(wp);

});

p_client->onSolutionAccepted([&](bool const& stale)
{
using namespace std::chrono;
@@ -76,6 +82,7 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine
cnote << EthLime "**Accepted" EthReset << (stale ? "(stale)" : "") << ss.str();
m_farm.acceptedSolution(stale);
});

p_client->onSolutionRejected([&](bool const& stale)
{
using namespace std::chrono;
@@ -139,9 +146,8 @@ void PoolManager::stop()
if (m_running) {
cnote << "Shutting down...";
m_running = false;

if (p_client->isConnected())
p_client->disconnect();
if (p_client->isConnected()) { p_client->disconnect(); }
stopWorking();

if (m_farm.isMining())
{
@@ -155,9 +161,50 @@ void PoolManager::workLoop()
{
while (m_running)
{
this_thread::sleep_for(chrono::seconds(1));
m_hashrateReportingTimePassed++;

// Take action only if not pending state (connecting/disconnecting)
// Otherwise do nothing and wait until connection state is NOT pending
if (!p_client->isPendingState()) {

if (!p_client->isConnected()) {

// Rotate connections if above max attempts threshold
if (m_connectionAttempt >= m_maxConnectionAttempts) {

m_connectionAttempt = 0;
m_activeConnectionIdx++;
if (m_activeConnectionIdx == m_connections.size()) {
m_activeConnectionIdx = 0;
}

}

if (m_connections[m_activeConnectionIdx].Host() != "exit") {

// Count connectionAttempts
m_connectionAttempt++;

// Invoke connections
p_client->setConnection(m_connections[m_activeConnectionIdx]);
m_farm.set_pool_addresses(m_connections[m_activeConnectionIdx].Host(), m_connections[m_activeConnectionIdx].Port());
cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port()));
p_client->connect();

}
else {

dev::setThreadName("main");
cnote << "No more failover connections.";
m_running = false;
}

}

}

// Hashrate reporting
m_hashrateReportingTimePassed++;

if (m_hashrateReportingTimePassed > m_hashrateReportingTime) {
auto mp = m_farm.miningProgress();
std::string h = toHex(toCompactBigEndian(mp.rate(), 1));
@@ -171,17 +218,16 @@ void PoolManager::workLoop()
p_client->submitHashrate("0x" + ss.str());
m_hashrateReportingTimePassed = 0;
}

this_thread::sleep_for(chrono::seconds(1));

}
}

void PoolManager::addConnection(URI &conn)
{
m_connections.push_back(conn);

if (m_connections.size() == 1) {
p_client->setConnection(conn);
m_farm.set_pool_addresses(conn.Host(), conn.Port());
}
}

void PoolManager::clearConnections()
@@ -197,60 +243,9 @@ void PoolManager::start()
if (m_connections.size() > 0) {
m_running = true;
startWorking();

// Try to connect to pool
cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port()));
p_client->connect();
}
else {
cwarn << "Manager has no connections defined!";
}
}

void PoolManager::tryReconnect()
{
// No connections available, so why bother trying to reconnect
if (m_connections.size() <= 0) {
cwarn << "Manager has no connections defined!";
return;
}

for (auto i = 4; --i; this_thread::sleep_for(chrono::seconds(1))) {
cnote << "Retrying in " << i << "... \r";
}

// We do not need awesome logic here, we just have one connection anyway
if (m_connections.size() == 1) {

cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port()));
p_client->connect();
return;
}

// Fallback logic, tries current connection multiple times and then switches to
// one of the other connections.
if (m_reconnectTries > m_reconnectTry) {

m_reconnectTry++;
cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port()));
p_client->connect();
}
else {
m_reconnectTry = 0;
m_activeConnectionIdx++;
if (m_activeConnectionIdx >= m_connections.size()) {
m_activeConnectionIdx = 0;
}
if (m_connections[m_activeConnectionIdx].Host() == "exit") {
dev::setThreadName("main");
cnote << "Exiting because reconnecting is not possible.";
stop();
}
else {
p_client->setConnection(m_connections[m_activeConnectionIdx]);
m_farm.set_pool_addresses(m_connections[m_activeConnectionIdx].Host(), m_connections[m_activeConnectionIdx].Port());
cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port()));
p_client->connect();
}
}
}
14 changes: 8 additions & 6 deletions libpoolprotocols/PoolManager.h
Original file line number Diff line number Diff line change
@@ -19,12 +19,11 @@ namespace dev
class PoolManager : public Worker
{
public:
PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType);
PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries);
void addConnection(URI &conn);
void clearConnections();
void start();
void stop();
void setReconnectTries(unsigned const & reconnectTries) { m_reconnectTries = reconnectTries; };
bool isConnected() { return p_client->isConnected(); };
bool isRunning() { return m_running; };

@@ -34,17 +33,20 @@ namespace dev

bool m_running = false;
void workLoop() override;
unsigned m_reconnectTries = 3;
unsigned m_reconnectTry = 0;
std::vector <URI> m_connections;

unsigned m_connectionAttempt = 0;
unsigned m_maxConnectionAttempts = 0;
unsigned m_activeConnectionIdx = 0;

std::vector <URI> m_connections;

h256 m_lastBoundary = h256();

PoolClient *p_client;
Farm &m_farm;
MinerType m_minerType;
std::chrono::steady_clock::time_point m_submit_time;
void tryReconnect();

};
}
}
2 changes: 2 additions & 0 deletions libpoolprotocols/getwork/EthGetworkClient.h
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@ class EthGetworkClient : public PoolClient, Worker
void disconnect() override;

bool isConnected() override { return m_connected; }
bool isPendingState() override { return false; }

string ActiveEndPoint() override { return ""; };

void submitHashrate(string const & rate) override;
345 changes: 270 additions & 75 deletions libpoolprotocols/stratum/EthStratumClient.cpp

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions libpoolprotocols/stratum/EthStratumClient.h
Original file line number Diff line number Diff line change
@@ -31,11 +31,9 @@ class EthStratumClient : public PoolClient
void disconnect();

// Connected and Connection Statuses
bool isConnected()
{
return m_connected.load(std::memory_order_relaxed) &&
!m_disconnecting.load(std::memory_order_relaxed);
}
bool isConnected() override { return m_connected.load(std::memory_order_relaxed) && !isPendingState(); }
bool isPendingState() override { return (m_connecting.load(std::memory_order_relaxed) || m_disconnecting.load(std::memory_order_relaxed));}

bool isSubscribed() { return m_subscribed.load(std::memory_order_relaxed); }
bool isAuthorized() { return m_authorized.load(std::memory_order_relaxed); }
string ActiveEndPoint() { return " [" + toString(m_endpoint) + "]"; };
@@ -48,10 +46,13 @@ class EthStratumClient : public PoolClient

private:

void disconnect_finalize();

void resolve_handler(const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i);
void start_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iter);
void start_connect();
void check_connect_timeout(const boost::system::error_code& ec);
void connect_handler(const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i);
void connect_handler(const boost::system::error_code& ec);
void io_work_timer_handler(const boost::system::error_code& ec);
void work_timeout_handler(const boost::system::error_code& ec);
void response_timeout_handler(const boost::system::error_code& ec);

@@ -64,14 +65,15 @@ class EthStratumClient : public PoolClient
void onRecvSocketDataCompleted(const boost::system::error_code& ec, std::size_t bytes_transferred);
void sendSocketData(Json::Value const & jReq);
void onSendSocketDataCompleted(const boost::system::error_code& ec);

void onSSLShutdownCompleted(const boost::system::error_code& ec);

string m_worker; // eth-proxy only; No ! It's for all !!!

std::atomic<bool> m_subscribed = { false };
std::atomic<bool> m_authorized = { false };
std::atomic<bool> m_connected = { false };
std::atomic<bool> m_disconnecting = { false };
std::atomic<bool> m_connecting = { false };

// seconds to trigger a work_timeout (overwritten in constructor)
int m_worktimeout;
@@ -83,8 +85,10 @@ class EthStratumClient : public PoolClient

bool m_stale = false;

std::thread m_serviceThread; ///< The IO service thread.
boost::asio::io_service m_io_service;
std::thread m_serviceThread; // The IO service thread.
boost::asio::io_service m_io_service; // The IO service itself
boost::asio::io_service::work m_io_work; // The IO work which prevents io_service.run() to return on no work thus terminating thread
boost::asio::deadline_timer m_io_work_timer; // A dummy timer to keep io_service with something to do
boost::asio::ip::tcp::socket *m_socket;

// Use shared ptrs to avoid crashes due to async_writes
@@ -104,6 +108,7 @@ class EthStratumClient : public PoolClient
bool m_response_pending = false;

boost::asio::ip::tcp::resolver m_resolver;
std::queue<boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>> m_endpoints;

string m_email;
string m_rate;
1 change: 1 addition & 0 deletions libpoolprotocols/testing/SimulateClient.h
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ class SimulateClient : public PoolClient, Worker
void disconnect() override;

bool isConnected() override { return m_connected; }
bool isPendingState() override { return false; }
string ActiveEndPoint() override { return ""; };

void submitHashrate(string const & rate) override;