Skip to content
This repository was archived by the owner on Apr 24, 2022. It is now read-only.

Commit d4229f5

Browse files
committedJun 3, 2018
New Api server based on Boost
- Got rid of jsonrpccpp server which hammers socket (#1059) - Some cnotes pushed in log to detect API operations - Socket(s) are persistent till disconnection (a client can issue multiple requests) - Added miner_shuffle method to re-rand nonce scrambler at runtime (it may give you the chance to abandon a low performance batch)
1 parent 3657924 commit d4229f5

File tree

7 files changed

+390
-105
lines changed

7 files changed

+390
-105
lines changed
 

‎ethminer/main.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#endif
4141

4242
#if API_CORE
43-
#include <libapicore/Api.h>
43+
#include <libapicore/ApiServer.h>
4444
#include <libapicore/httpServer.h>
4545
#endif
4646

@@ -710,8 +710,12 @@ class MinerCLI
710710
}
711711

712712
#if API_CORE
713-
Api api(this->m_api_port, f);
713+
714+
ApiServer api(m_io_service, abs(m_api_port), (m_api_port < 0) ? true : false, f);
715+
api.start();
716+
714717
http_server.run(m_http_port, &f, m_show_hwmonitors, m_show_power);
718+
715719
#endif
716720

717721
// Start PoolManager

‎libapicore/Api.cpp

-20
This file was deleted.

‎libapicore/Api.h

-20
This file was deleted.

‎libapicore/ApiServer.cpp

+279-45
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,260 @@
22

33
#include <ethminer-buildinfo.h>
44

5-
ApiServer::ApiServer(AbstractServerConnector *conn, serverVersion_t type, Farm &farm, bool &readonly) : AbstractServer(*conn, type), m_farm(farm)
5+
ApiServer::ApiServer(boost::asio::io_service& io_service, int portnum, bool readonly, Farm& f) :
6+
m_acceptor(io_service, tcp::endpoint(tcp::v4(), portnum)),
7+
m_io_strand(io_service),
8+
m_farm(f)
69
{
7-
this->bindAndAddMethod(Procedure("miner_getstat1", PARAMS_BY_NAME, JSON_OBJECT, NULL), &ApiServer::getMinerStat1);
8-
this->bindAndAddMethod(Procedure("miner_getstathr", PARAMS_BY_NAME, JSON_OBJECT, NULL), &ApiServer::getMinerStatHR);
9-
if (!readonly) {
10-
this->bindAndAddMethod(Procedure("miner_restart", PARAMS_BY_NAME, JSON_OBJECT, NULL), &ApiServer::doMinerRestart);
11-
this->bindAndAddMethod(Procedure("miner_reboot", PARAMS_BY_NAME, JSON_OBJECT, NULL), &ApiServer::doMinerReboot);
10+
m_readonly.store(readonly, std::memory_order_relaxed);
11+
}
12+
13+
void ApiServer::start()
14+
{
15+
// cnote << "ApiServer::start";
16+
if (m_acceptor.local_endpoint().port() == 0) return;
17+
m_running.store(true, std::memory_order_relaxed);
18+
19+
cnote << "Api server listening for connections on port " + to_string(m_acceptor.local_endpoint().port());
20+
21+
m_workThread = std::thread{ boost::bind(&ApiServer::begin_accept, this) };
22+
23+
}
24+
25+
void ApiServer::stop()
26+
{
27+
m_acceptor.cancel();
28+
m_running.store(false, std::memory_order_relaxed);
29+
30+
// Dispose all sessions (if any)
31+
m_sessions.clear();
32+
33+
}
34+
35+
void ApiServer::begin_accept()
36+
{
37+
if (!isRunning()) return;
38+
39+
dev::setThreadName("Api");
40+
std::shared_ptr<ApiConnection> session = std::make_shared<ApiConnection>(m_acceptor.get_io_service(), ++lastSessionId, m_readonly, m_farm);
41+
m_acceptor.async_accept(session->socket(), m_io_strand.wrap(boost::bind(&ApiServer::handle_accept, this, session, boost::asio::placeholders::error)));
42+
}
43+
44+
void ApiServer::handle_accept(std::shared_ptr<ApiConnection> session, boost::system::error_code ec)
45+
{
46+
// Start new connection
47+
// cnote << "ApiServer::handle_accept";
48+
if (!ec) {
49+
session->onDisconnected([&](int id)
50+
{
51+
// Destroy pointer to session
52+
auto it = find_if(m_sessions.begin(), m_sessions.end(), [&id](const std::shared_ptr<ApiConnection> session) {return session->getId() == id; });
53+
if (it != m_sessions.end()) {
54+
auto index = std::distance(m_sessions.begin(), it);
55+
m_sessions.erase(m_sessions.begin() + index);
56+
}
57+
58+
});
59+
session->start();
60+
m_sessions.push_back(session);
61+
cnote << "New api session from" << session->socket().remote_endpoint();
62+
63+
}
64+
else {
65+
session.reset();
66+
}
67+
68+
// Resubmit new accept
69+
begin_accept();
70+
71+
}
72+
73+
void ApiConnection::disconnect()
74+
{
75+
// cnote << "ApiConnection::disconnect";
76+
77+
// Cancel pending operations
78+
m_socket.cancel();
79+
80+
if (m_socket.is_open()) {
81+
82+
boost::system::error_code ec;
83+
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
84+
m_socket.close(ec);
85+
}
86+
87+
if (m_onDisconnected) { m_onDisconnected(this->getId()); }
88+
89+
}
90+
91+
void ApiConnection::start()
92+
{
93+
// cnote << "ApiConnection::start";
94+
recvSocketData();
95+
}
96+
97+
void ApiConnection::processRequest(Json::Value& requestObject)
98+
{
99+
Json::Value jRes;
100+
jRes["jsonrpc"] = "2.0";
101+
102+
// Strict sanity checks over jsonrpc v2
103+
if (
104+
(!requestObject.isMember("jsonrpc") || requestObject["jsonrpc"].empty() || !requestObject["jsonrpc"].isString() || requestObject.get("jsonrpc", ".") != "2.0") ||
105+
(!requestObject.isMember("method") || requestObject["method"].empty() || !requestObject["method"].isString()) ||
106+
(!requestObject.isMember("id") || requestObject["id"].empty() || !requestObject["id"].isUInt())
107+
)
108+
{
109+
jRes["id"] = Json::nullValue;
110+
jRes["error"]["code"] = -32600;
111+
jRes["error"]["message"] = "Invalid Request";
112+
sendSocketData(jRes);
113+
return;
114+
}
115+
116+
117+
// Process messages
118+
std::string _method = requestObject.get("method", "").asString();
119+
jRes["id"] = requestObject.get("id", 0).asInt();
120+
121+
122+
if (_method == "miner_getstat1")
123+
{
124+
jRes["result"] = getMinerStat1();
125+
}
126+
else if (_method == "miner_getstathr")
127+
{
128+
jRes["result"] = getMinerStatHR();
129+
}
130+
else if (_method == "miner_shuffle")
131+
{
132+
133+
// Gives nonce scrambler a new range
134+
cnote << "Miner Shuffle requested";
135+
jRes["result"] = true;
136+
m_farm.shuffle();
137+
12138
}
139+
else if (_method == "miner_restart")
140+
{
141+
// Send response to client of success
142+
// and invoke an async restart
143+
// to prevent locking
144+
if (m_readonly.load(std::memory_order_relaxed))
145+
{
146+
jRes["error"]["code"] = -32601;
147+
jRes["error"]["message"] = "Method not available";
148+
}
149+
else
150+
{
151+
cnote << "Miner Restart requested";
152+
jRes["result"] = true;
153+
m_farm.restart_async();
154+
}
155+
156+
}
157+
else if (_method == "miner_reboot")
158+
{
159+
160+
// Not implemented yet
161+
jRes["error"]["code"] = -32601;
162+
jRes["error"]["message"] = "Method not implemented";
163+
164+
}
165+
else
166+
{
167+
168+
// Any other method not found
169+
jRes["error"]["code"] = -32601;
170+
jRes["error"]["message"] = "Method not found";
171+
}
172+
173+
// Send response
174+
sendSocketData(jRes);
175+
13176
}
14177

15-
void ApiServer::getMinerStat1(const Json::Value& request, Json::Value& response)
178+
void ApiConnection::recvSocketData()
179+
{
180+
// cnote << "ApiConnection::recvSocketData";
181+
boost::asio::async_read_until(m_socket, m_recvBuffer, "\n",
182+
m_io_strand.wrap(boost::bind(&ApiConnection::onRecvSocketDataCompleted, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
183+
184+
}
185+
186+
void ApiConnection::onRecvSocketDataCompleted(const boost::system::error_code& ec, std::size_t bytes_transferred)
187+
{
188+
// cnote << "ApiConnection::onRecvSocketDataCompleted";
189+
// Due to the nature of io_service's queue and
190+
// the implementation of the loop this event may trigger
191+
// late after clean disconnection. Check status of connection
192+
// before triggering all stack of calls
193+
194+
if (!ec && bytes_transferred > 0) {
195+
196+
// Extract received message
197+
std::istream is(&m_recvBuffer);
198+
std::string message;
199+
getline(is, message);
200+
201+
if (m_socket.is_open()) {
202+
203+
if (!message.empty()) {
204+
205+
// Test validity of chunk and process
206+
Json::Value jMsg;
207+
Json::Reader jRdr;
208+
if (jRdr.parse(message, jMsg)) {
209+
processRequest(jMsg);
210+
}
211+
else {
212+
Json::Value jRes;
213+
jRes["jsonrpc"] = "2.0";
214+
jRes["id"] = Json::nullValue;
215+
jRes["error"]["code"] = -32700;
216+
jRes["error"]["message"] = "Parse Error";
217+
sendSocketData(jRes);
218+
}
219+
220+
}
221+
222+
// Eventually keep reading from socket
223+
recvSocketData();
224+
225+
}
226+
227+
228+
}
229+
else
230+
{
231+
if (m_socket.is_open()) {
232+
disconnect();
233+
}
234+
}
235+
236+
}
237+
238+
void ApiConnection::sendSocketData(Json::Value const & jReq) {
239+
240+
if (!m_socket.is_open())
241+
return;
242+
243+
std::ostream os(&m_sendBuffer);
244+
os << m_jWriter.write(jReq); // Do not add lf. It's added by writer.
245+
246+
async_write(m_socket, m_sendBuffer,
247+
m_io_strand.wrap(boost::bind(&ApiConnection::onSendSocketDataCompleted, this, boost::asio::placeholders::error)));
248+
249+
}
250+
251+
void ApiConnection::onSendSocketDataCompleted(const boost::system::error_code& ec) {
252+
253+
if (ec) disconnect();
254+
255+
}
256+
257+
Json::Value ApiConnection::getMinerStat1()
16258
{
17-
(void) request; // unused
18259

19260
auto runningTime = std::chrono::duration_cast<std::chrono::minutes>(steady_clock::now() - this->m_farm.farmLaunched());
20261

@@ -52,20 +293,23 @@ void ApiServer::getMinerStat1(const Json::Value& request, Json::Value& response)
52293
gpuIndex++;
53294
}
54295

55-
response[0] = ethminer_get_buildinfo()->project_version; //miner version.
56-
response[1] = toString(runningTime.count()); // running time, in minutes.
57-
response[2] = totalMhEth.str(); // total ETH hashrate in MH/s, number of ETH shares, number of ETH rejected shares.
58-
response[3] = detailedMhEth.str(); // detailed ETH hashrate for all GPUs.
59-
response[4] = totalMhDcr.str(); // total DCR hashrate in MH/s, number of DCR shares, number of DCR rejected shares.
60-
response[5] = detailedMhDcr.str(); // detailed DCR hashrate for all GPUs.
61-
response[6] = tempAndFans.str(); // Temperature and Fan speed(%) pairs for all GPUs.
62-
response[7] = poolAddresses.str(); // current mining pool. For dual mode, there will be two pools here.
63-
response[8] = invalidStats.str(); // number of ETH invalid shares, number of ETH pool switches, number of DCR invalid shares, number of DCR pool switches.
296+
Json::Value jRes;
297+
298+
jRes[0] = ethminer_get_buildinfo()->project_version; //miner version.
299+
jRes[1] = toString(runningTime.count()); // running time, in minutes.
300+
jRes[2] = totalMhEth.str(); // total ETH hashrate in MH/s, number of ETH shares, number of ETH rejected shares.
301+
jRes[3] = detailedMhEth.str(); // detailed ETH hashrate for all GPUs.
302+
jRes[4] = totalMhDcr.str(); // total DCR hashrate in MH/s, number of DCR shares, number of DCR rejected shares.
303+
jRes[5] = detailedMhDcr.str(); // detailed DCR hashrate for all GPUs.
304+
jRes[6] = tempAndFans.str(); // Temperature and Fan speed(%) pairs for all GPUs.
305+
jRes[7] = poolAddresses.str(); // current mining pool. For dual mode, there will be two pools here.
306+
jRes[8] = invalidStats.str(); // number of ETH invalid shares, number of ETH pool switches, number of DCR invalid shares, number of DCR pool switches.
307+
308+
return jRes;
64309
}
65310

66-
void ApiServer::getMinerStatHR(const Json::Value& request, Json::Value& response)
311+
Json::Value ApiConnection::getMinerStatHR()
67312
{
68-
(void) request; // unused
69313

70314
//TODO:give key-value format
71315
auto runningTime = std::chrono::duration_cast<std::chrono::minutes>(steady_clock::now() - this->m_farm.farmLaunched());
@@ -103,34 +347,24 @@ void ApiServer::getMinerStatHR(const Json::Value& request, Json::Value& response
103347
gpuIndex++;
104348
}
105349

106-
response["version"] = version.str(); // miner version.
107-
response["runtime"] = runtime.str(); // running time, in minutes.
350+
Json::Value jRes;
351+
352+
jRes["version"] = version.str(); // miner version.
353+
jRes["runtime"] = runtime.str(); // running time, in minutes.
108354
// total ETH hashrate in MH/s, number of ETH shares, number of ETH rejected shares.
109-
response["ethhashrate"] = (p.rate());
110-
response["ethhashrates"] = detailedMhEth;
111-
response["ethshares"] = s.getAccepts();
112-
response["ethrejected"] = s.getRejects();
113-
response["ethinvalid"] = s.getFailures();
114-
response["ethpoolsw"] = 0;
355+
jRes["ethhashrate"] = (p.rate());
356+
jRes["ethhashrates"] = detailedMhEth;
357+
jRes["ethshares"] = s.getAccepts();
358+
jRes["ethrejected"] = s.getRejects();
359+
jRes["ethinvalid"] = s.getFailures();
360+
jRes["ethpoolsw"] = 0;
115361
// Hardware Info
116-
response["temperatures"] = temps; // Temperatures(C) for all GPUs
117-
response["fanpercentages"] = fans; // Fans speed(%) for all GPUs
118-
response["powerusages"] = powers; // Power Usages(W) for all GPUs
119-
response["pooladdrs"] = poolAddresses.str(); // current mining pool. For dual mode, there will be two pools here.
120-
}
362+
jRes["temperatures"] = temps; // Temperatures(C) for all GPUs
363+
jRes["fanpercentages"] = fans; // Fans speed(%) for all GPUs
364+
jRes["powerusages"] = powers; // Power Usages(W) for all GPUs
365+
jRes["pooladdrs"] = poolAddresses.str(); // current mining pool. For dual mode, there will be two pools here.
121366

122-
void ApiServer::doMinerRestart(const Json::Value& request, Json::Value& response)
123-
{
124-
(void) request; // unused
125-
(void) response; // unused
126-
127-
this->m_farm.restart();
128-
}
367+
return jRes;
129368

130-
void ApiServer::doMinerReboot(const Json::Value& request, Json::Value& response)
131-
{
132-
(void) request; // unused
133-
(void) response; // unused
134-
135-
// Not supported
136369
}
370+

‎libapicore/ApiServer.h

+77-9
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,90 @@
22

33
#include <libethcore/Farm.h>
44
#include <libethcore/Miner.h>
5-
#include <jsonrpccpp/server.h>
5+
#include <json/json.h>
6+
#include <boost/bind.hpp>
7+
#include <boost/shared_ptr.hpp>
8+
#include <boost/asio.hpp>
69

7-
using namespace jsonrpc;
810
using namespace dev;
911
using namespace dev::eth;
1012
using namespace std::chrono;
1113

12-
class ApiServer : public AbstractServer<ApiServer>
14+
using boost::asio::ip::tcp;
15+
16+
class ApiConnection
1317
{
1418
public:
15-
ApiServer(AbstractServerConnector *conn, serverVersion_t type, Farm &farm, bool &readonly);
19+
20+
ApiConnection(boost::asio::io_service& io_service, int id, bool readonly, Farm& f) :
21+
m_sessionId(id),
22+
m_socket(io_service),
23+
m_io_strand(io_service),
24+
m_readonly(readonly),
25+
m_farm(f) {}
26+
27+
~ApiConnection() {}
28+
29+
void start();
30+
31+
Json::Value getMinerStat1();
32+
Json::Value getMinerStatHR();
33+
34+
using Disconnected = std::function<void(int const&)>;
35+
void onDisconnected(Disconnected const& _handler) { m_onDisconnected = _handler; }
36+
37+
int getId() { return m_sessionId; }
38+
39+
tcp::socket& socket() { return m_socket; }
40+
1641
private:
17-
Farm &m_farm;
18-
void getMinerStat1(const Json::Value& request, Json::Value& response);
19-
void getMinerStatHR(const Json::Value& request, Json::Value& response);
20-
void doMinerRestart(const Json::Value& request, Json::Value& response);
21-
void doMinerReboot(const Json::Value& request, Json::Value& response);
42+
43+
void disconnect();
44+
void processRequest(Json::Value& requestObject);
45+
void recvSocketData();
46+
void onRecvSocketDataCompleted(const boost::system::error_code& ec, std::size_t bytes_transferred);
47+
void sendSocketData(Json::Value const & jReq);
48+
void onSendSocketDataCompleted(const boost::system::error_code& ec);
49+
50+
Disconnected m_onDisconnected;
51+
52+
int m_sessionId;
53+
54+
tcp::socket m_socket;
55+
boost::asio::io_service::strand m_io_strand;
56+
boost::asio::streambuf m_sendBuffer;
57+
boost::asio::streambuf m_recvBuffer;
58+
Json::FastWriter m_jWriter;
59+
60+
std::atomic<bool> m_readonly = { false };
61+
Farm& m_farm;
62+
2263
};
2364

65+
66+
class ApiServer
67+
{
68+
public:
69+
70+
ApiServer(boost::asio::io_service& io_service, int portnum, bool readonly, Farm& f);
71+
bool isRunning() { return m_running.load(std::memory_order_relaxed); };
72+
void start();
73+
void stop();
74+
75+
private:
76+
77+
void begin_accept();
78+
void handle_accept(std::shared_ptr<ApiConnection> session, boost::system::error_code ec);
79+
80+
int lastSessionId = 0;
81+
82+
std::thread m_workThread;
83+
std::atomic<bool> m_readonly = { false };
84+
std::atomic<bool> m_running = { false };
85+
tcp::acceptor m_acceptor;
86+
boost::asio::io_service::strand m_io_strand;
87+
std::vector<std::shared_ptr<ApiConnection>> m_sessions;
88+
89+
Farm& m_farm;
90+
91+
};

‎libapicore/CMakeLists.txt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
set(SOURCES
2-
Api.h Api.cpp
32
ApiServer.h ApiServer.cpp
43
httpServer.cpp httpServer.h
54
)
@@ -8,5 +7,5 @@ hunter_add_package(mongoose)
87
find_package(mongoose CONFIG REQUIRED)
98

109
add_library(apicore ${SOURCES})
11-
target_link_libraries(apicore PRIVATE ethcore devcore ethminer-buildinfo libjson-rpc-cpp::server mongoose::mongoose)
10+
target_link_libraries(apicore PRIVATE ethcore devcore ethminer-buildinfo mongoose::mongoose)
1211
target_include_directories(apicore PRIVATE ..)

‎libethcore/Farm.h

+27-7
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,17 @@ class Farm: public FarmFace
6262
m_io_strand(io_service),
6363
m_hashrateTimer(io_service)
6464
{
65-
// Given that all nonces are equally likely to solve the problem
66-
// we could reasonably always start the nonce search ranges
67-
// at a fixed place, but that would be boring. Provide a once
68-
// per run randomized start place, without creating much overhead.
69-
random_device engine;
70-
m_nonce_scrambler = uniform_int_distribution<uint64_t>()(engine);
7165

7266
// Init HWMON
7367
adlh = wrap_adl_create();
7468
#if defined(__linux)
7569
sysfsh = wrap_amdsysfs_create();
7670
#endif
7771
nvmlh = wrap_nvml_create();
72+
73+
// Initialize nonce_scrambler
74+
shuffle();
75+
7876
}
7977

8078
~Farm()
@@ -93,6 +91,20 @@ class Farm: public FarmFace
9391
stop();
9492
}
9593

94+
/**
95+
* @brief Randomizes the nonce scrambler
96+
*/
97+
void shuffle() {
98+
99+
// Given that all nonces are equally likely to solve the problem
100+
// we could reasonably always start the nonce search ranges
101+
// at a fixed place, but that would be boring. Provide a once
102+
// per run randomized start place, without creating much overhead.
103+
random_device engine;
104+
m_nonce_scrambler = uniform_int_distribution<uint64_t>()(engine);
105+
106+
}
107+
96108
/**
97109
* @brief Sets the current mining mission.
98110
* @param _wp The work package we wish to be mining.
@@ -243,7 +255,15 @@ class Farm: public FarmFace
243255
m_onMinerRestart();
244256
}
245257
}
246-
258+
259+
/**
260+
* @brief Stop all mining activities and Starts them again (async post)
261+
*/
262+
void restart_async()
263+
{
264+
m_io_strand.get_io_service().post(m_io_strand.wrap(boost::bind(&Farm::restart, this)));
265+
}
266+
247267
bool isMining() const
248268
{
249269
return m_isMining.load(std::memory_order_relaxed);

0 commit comments

Comments
 (0)
This repository has been archived.