Skip to content

Commit 7a02a7b

Browse files
authored
Merge pull request #97 from mujin/silence_zmq_bullseye
Silence ZMQ related warnings
2 parents 650598e + 4df2ec3 commit 7a02a7b

4 files changed

Lines changed: 78 additions & 106 deletions

File tree

include/mujincontrollerclient/mujinjson.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ inline void ParseJson(rapidjson::Document& d, const std::string& str) {
164164
inline void ParseJson(rapidjson::Document& d, std::istream& is) {
165165
rapidjson::IStreamWrapper isw(is);
166166
// see note in: void ParseJson(rapidjson::Document& d, const std::string& str)
167-
rapidjson::Document(tempDoc);
167+
rapidjson::Document tempDoc;
168168
tempDoc.ParseStream<rapidjson::kParseFullPrecisionFlag>(isw); // parse float in full precision mode
169169
if (tempDoc.HasParseError()) {
170170
throw MujinJSONException(boost::str(boost::format("Json stream is invalid (offset %u) %s")%((unsigned)d.GetErrorOffset())%GetParseError_En(d.GetParseError())), MJE_Failed);

src/binpickingtask.cpp

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,14 +1785,12 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
17851785
socket.reset();
17861786
}
17871787
socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB));
1788-
socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect
1789-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
1790-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
1791-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
1792-
std::stringstream ss; ss << std::setprecision(std::numeric_limits<double>::digits10+1);
1793-
ss << _heartbeatPort;
1794-
socket->connect(("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str());
1795-
socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
1788+
socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect
1789+
socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
1790+
socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
1791+
socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
1792+
socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort));
1793+
socket->set(zmq::sockopt::subscribe, "");
17961794

17971795
zmq::pollitem_t pollitem;
17981796
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
@@ -1801,13 +1799,12 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
18011799

18021800
unsigned long long lastheartbeat = GetMilliTime();
18031801
while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) {
1804-
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
1802+
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
18051803
if (pollitem.revents & ZMQ_POLLIN) {
18061804
zmq::message_t reply;
1807-
socket->recv(&reply);
1808-
std::string replystring((char *)reply.data (), (size_t)reply.size());
1805+
socket->recv(reply);
18091806
//if ((size_t)reply.size() == 1 && ((char *)reply.data())[0]==255) {
1810-
if (replystring == "255") {
1807+
if (reply.to_string() == "255") {
18111808
lastheartbeat = GetMilliTime();
18121809
}
18131810
}
@@ -1827,37 +1824,36 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
18271824
std::string utils::GetHeartbeat(const std::string& endpoint) {
18281825
zmq::context_t zmqcontext(1);
18291826
zmq::socket_t socket(zmqcontext, ZMQ_SUB);
1830-
socket.connect(endpoint.c_str());
1831-
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
1827+
socket.connect(endpoint);
1828+
socket.set(zmq::sockopt::subscribe, "");
18321829

18331830
zmq::pollitem_t pollitem;
18341831
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
18351832
pollitem.socket = socket;
18361833
pollitem.events = ZMQ_POLLIN;
18371834

1838-
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
1835+
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
18391836
if (!(pollitem.revents & ZMQ_POLLIN)) {
18401837
return "";
18411838
}
18421839

18431840
zmq::message_t reply;
1844-
socket.recv(&reply);
1845-
const std::string received((char *)reply.data (), (size_t)reply.size());
1841+
socket.recv(reply);
18461842
#ifndef _WIN32
1847-
return received;
1843+
return reply.to_string();
18481844
#else
18491845
// sometimes buffer can container \n or \\, which windows does not like
18501846
std::string newbuffer;
18511847
std::vector< std::pair<std::string, std::string> > serachpairs(2);
18521848
serachpairs[0].first = "\n"; serachpairs[0].second = "";
18531849
serachpairs[1].first = "\\"; serachpairs[1].second = "";
1854-
SearchAndReplace(newbuffer, received, serachpairs);
1850+
SearchAndReplace(newbuffer, reply.to_string(), serachpairs);
18551851
return newbuffer;
18561852
#endif
18571853
}
18581854

1859-
18601855
namespace {
1856+
18611857
std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) {
18621858
// get all slave request ids
18631859
std::vector<std::string> slavereqids;
@@ -1898,13 +1894,11 @@ std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) {
18981894
return slavereqids[smallest_suffix_index];
18991895
}
19001896

1901-
std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat,
1902-
const std::string& key)
1897+
std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, const std::string& key)
19031898
{
19041899

19051900
rapidjson::Document pt(rapidjson::kObjectType);
1906-
std::stringstream ss(heartbeat);
1907-
ParseJson(pt, ss.str());
1901+
ParseJson(pt, heartbeat);
19081902
try {
19091903
const std::string slavereqid = FindSmallestSlaveRequestId(pt);
19101904
std::string result;
@@ -1916,8 +1910,8 @@ std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat,
19161910
}
19171911

19181912
}
1919-
}
19201913

1914+
} // anonymous namespace
19211915

19221916
std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heartbeat) {
19231917
static const std::string prefix("mujin:/");
@@ -1926,8 +1920,7 @@ std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heart
19261920

19271921
std::string utils::GetSlaveRequestIdFromHeartbeat(const std::string& heartbeat) {
19281922
rapidjson::Document pt;
1929-
std::stringstream ss(heartbeat);
1930-
ParseJson(pt, ss.str());
1923+
ParseJson(pt, heartbeat);
19311924
try {
19321925
static const std::string prefix("slaverequestid-");
19331926
return FindSmallestSlaveRequestId(pt).substr(prefix.length());

src/binpickingtaskzmq.cpp

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -267,30 +267,26 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
267267
socket.reset();
268268
}
269269
socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB));
270-
socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect
271-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
272-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
273-
socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
274-
std::stringstream ss; ss << std::setprecision(std::numeric_limits<double>::digits10+1);
275-
ss << _heartbeatPort;
276-
socket->connect (("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str());
277-
socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
270+
socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect
271+
socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
272+
socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
273+
socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
274+
socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort));
275+
socket->set(zmq::sockopt::subscribe, "");
278276

279277
zmq::pollitem_t pollitem;
280278
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
281279
pollitem.socket = socket->operator void*();
282280
pollitem.events = ZMQ_POLLIN;
283281
unsigned long long lastheartbeat = GetMilliTime();
284282
while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) {
285-
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
283+
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
286284
if (pollitem.revents & ZMQ_POLLIN) {
287285
zmq::message_t reply;
288-
socket->recv(&reply);
289-
std::string replystring((char *)reply.data (), (size_t)reply.size());
286+
socket->recv(reply);
290287
rapidjson::Document pt(rapidjson::kObjectType);
291288
try{
292-
std::stringstream replystring_ss(replystring);
293-
ParseJson(pt, replystring_ss.str());
289+
ParseJson(pt, reply.to_string());
294290
heartbeat.Parse(pt);
295291
{
296292
boost::mutex::scoped_lock lock(_mutexTaskState);
@@ -304,7 +300,7 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
304300
}
305301
catch (std::exception const &e) {
306302
MUJIN_LOG_ERROR("HeartBeat reply is not JSON");
307-
MUJIN_LOG_ERROR(replystring);
303+
MUJIN_LOG_ERROR(reply.to_string());
308304
MUJIN_LOG_ERROR(e.what());
309305
continue;
310306
}
@@ -319,6 +315,4 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
319315
MUJIN_LOG_DEBUG(str(boost::format("Stopped controller %s monitoring thread on port %d for slaverequestid=%s.")%_mujinControllerIp%_heartbeatPort%_slaverequestid));
320316
}
321317

322-
323-
324318
} // end namespace mujinclient

0 commit comments

Comments
 (0)