Skip to content

Commit d8f3e6c

Browse files
committed
feat: add ZMQ transport to concore.hpp/concore_base.hpp (fixes #474)
1 parent c3b6f84 commit d8f3e6c

2 files changed

Lines changed: 231 additions & 0 deletions

File tree

concore.hpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class Concore{
4949
int communication_iport = 0; // iport refers to input port
5050
int communication_oport = 0; // oport refers to input port
5151

52+
#ifdef CONCORE_USE_ZMQ
53+
map<string, concore_base::ZeroMQPort*> zmq_ports;
54+
#endif
55+
5256
public:
5357
double delay = 1;
5458
int retrycount = 0;
@@ -107,6 +111,11 @@ class Concore{
107111
*/
108112
~Concore()
109113
{
114+
#ifdef CONCORE_USE_ZMQ
115+
for (auto& kv : zmq_ports)
116+
delete kv.second;
117+
zmq_ports.clear();
118+
#endif
110119
#ifdef __linux__
111120
// Detach the shared memory segment from the process
112121
if (communication_oport == 1 && sharedData_create != nullptr) {
@@ -549,6 +558,117 @@ class Concore{
549558
}
550559
}
551560

561+
#ifdef CONCORE_USE_ZMQ
562+
/**
563+
* @brief Registers a ZMQ port for use with read()/write().
564+
* @param port_name The ZMQ port name.
565+
* @param port_type "bind" or "connect".
566+
* @param address The ZMQ address.
567+
* @param socket_type_str The socket type string.
568+
*/
569+
void init_zmq_port(string port_name, string port_type, string address, string socket_type_str) {
570+
if (zmq_ports.count(port_name)) return;
571+
int sock_type = concore_base::zmq_socket_type_from_string(socket_type_str);
572+
if (sock_type == -1) {
573+
cerr << "init_zmq_port: unknown socket type '" << socket_type_str << "'" << endl;
574+
return;
575+
}
576+
zmq_ports[port_name] = new concore_base::ZeroMQPort(port_type, address, sock_type);
577+
}
578+
579+
/**
580+
* @brief Reads data from a ZMQ port. Strips simtime prefix, updates simtime.
581+
* @param port_name The ZMQ port name.
582+
* @param name The name of the file.
583+
* @param initstr The initial string.
584+
* @return a vector of double values
585+
*/
586+
vector<double> read_ZMQ(string port_name, string name, string initstr) {
587+
auto it = zmq_ports.find(port_name);
588+
if (it == zmq_ports.end()) {
589+
cerr << "read_ZMQ: port '" << port_name << "' not initialized" << endl;
590+
return parser(initstr);
591+
}
592+
vector<double> inval = it->second->recv_with_retry();
593+
if (inval.empty())
594+
inval = parser(initstr);
595+
if (inval.empty()) return inval;
596+
simtime = simtime > inval[0] ? simtime : inval[0];
597+
s += port_name;
598+
inval.erase(inval.begin());
599+
return inval;
600+
}
601+
602+
/**
603+
* @brief Writes a vector of double values to a ZMQ port. Prepends simtime+delta.
604+
* @param port_name The ZMQ port name.
605+
* @param name The name of the file.
606+
* @param val The vector of double values to write.
607+
* @param delta The delta value (default: 0).
608+
*/
609+
void write_ZMQ(string port_name, string name, vector<double> val, int delta=0) {
610+
auto it = zmq_ports.find(port_name);
611+
if (it == zmq_ports.end()) {
612+
cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl;
613+
return;
614+
}
615+
val.insert(val.begin(), simtime + delta);
616+
it->second->send_with_retry(val);
617+
// simtime must not be mutated here (issue #385).
618+
}
619+
620+
/**
621+
* @brief Writes a string to a ZMQ port.
622+
* @param port_name The ZMQ port name.
623+
* @param name The name of the file.
624+
* @param val The string to write.
625+
* @param delta The delta value (default: 0).
626+
*/
627+
void write_ZMQ(string port_name, string name, string val, int delta=0) {
628+
auto it = zmq_ports.find(port_name);
629+
if (it == zmq_ports.end()) {
630+
cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl;
631+
return;
632+
}
633+
chrono::milliseconds timespan((int)(2000*delay));
634+
this_thread::sleep_for(timespan);
635+
it->second->send_string_with_retry(val);
636+
}
637+
638+
/**
639+
* @brief deviate the read to ZMQ communication protocol when port identifier is a string key.
640+
* @param port_name The ZMQ port name.
641+
* @param name The name of the file.
642+
* @param initstr The initial string.
643+
* @return
644+
*/
645+
vector<double> read(string port_name, string name, string initstr) {
646+
return read_ZMQ(port_name, name, initstr);
647+
}
648+
649+
/**
650+
* @brief deviate the write to ZMQ communication protocol when port identifier is a string key.
651+
* @param port_name The ZMQ port name.
652+
* @param name The name of the file.
653+
* @param val The vector of double values to write.
654+
* @param delta The delta value (default: 0).
655+
*/
656+
void write(string port_name, string name, vector<double> val, int delta=0) {
657+
return write_ZMQ(port_name, name, val, delta);
658+
}
659+
660+
/**
661+
* @brief deviate the write to ZMQ communication protocol when port identifier is a string key.
662+
* @param port_name The ZMQ port name.
663+
* @param name The name of the file.
664+
* @param val The string to write.
665+
* @param delta The delta value (default: 0).
666+
*/
667+
void write(string port_name, string name, string val, int delta=0) {
668+
return write_ZMQ(port_name, name, val, delta);
669+
}
670+
#endif // CONCORE_USE_ZMQ
671+
552672
/**
553673
* @brief Strips leading and trailing whitespace from a string.
554674
* @param str The input string.

concore_base.hpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,117 @@ inline std::string tryparam(
178178
return (it != params.end()) ? it->second : defaultValue;
179179
}
180180

181+
182+
// ===================================================================
183+
// ZeroMQ Transport (opt-in: compile with -DCONCORE_USE_ZMQ)
184+
// ===================================================================
185+
#ifdef CONCORE_USE_ZMQ
186+
#include <zmq.hpp>
187+
188+
/**
189+
* ZMQ socket wrapper with bind/connect, timeouts, and retry.
190+
*/
191+
class ZeroMQPort {
192+
public:
193+
zmq::context_t context;
194+
zmq::socket_t socket;
195+
std::string port_type;
196+
std::string address;
197+
198+
ZeroMQPort(const std::string& port_type_, const std::string& address_, int socket_type)
199+
: context(1), socket(context, socket_type),
200+
port_type(port_type_), address(address_)
201+
{
202+
socket.setsockopt(ZMQ_RCVTIMEO, 2000);
203+
socket.setsockopt(ZMQ_SNDTIMEO, 2000);
204+
socket.setsockopt(ZMQ_LINGER, 0);
205+
206+
if (port_type == "bind")
207+
socket.bind(address);
208+
else
209+
socket.connect(address);
210+
}
211+
212+
ZeroMQPort(const ZeroMQPort&) = delete;
213+
ZeroMQPort& operator=(const ZeroMQPort&) = delete;
214+
215+
/**
216+
* Sends a vector<double> as "[v0, v1, ...]" with retry on timeout.
217+
*/
218+
void send_with_retry(const std::vector<double>& payload) {
219+
std::ostringstream ss;
220+
ss << "[";
221+
for (size_t i = 0; i < payload.size(); ++i) {
222+
if (i) ss << ", ";
223+
ss << payload[i];
224+
}
225+
ss << "]";
226+
std::string msg = ss.str();
227+
for (int attempt = 0; attempt < 5; ++attempt) {
228+
try {
229+
zmq::message_t zmsg(msg.begin(), msg.end());
230+
socket.send(zmsg, zmq::send_flags::none);
231+
return;
232+
} catch (const zmq::error_t&) {
233+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
234+
}
235+
}
236+
std::cerr << "ZMQ send failed after retries." << std::endl;
237+
}
238+
239+
/**
240+
* Sends a raw string with retry on timeout.
241+
*/
242+
void send_string_with_retry(const std::string& msg) {
243+
for (int attempt = 0; attempt < 5; ++attempt) {
244+
try {
245+
zmq::message_t zmsg(msg.begin(), msg.end());
246+
socket.send(zmsg, zmq::send_flags::none);
247+
return;
248+
} catch (const zmq::error_t&) {
249+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
250+
}
251+
}
252+
std::cerr << "ZMQ send failed after retries." << std::endl;
253+
}
254+
255+
/**
256+
* Receives and parses "[v0, v1, ...]" back to vector<double>.
257+
*/
258+
std::vector<double> recv_with_retry() {
259+
for (int attempt = 0; attempt < 5; ++attempt) {
260+
try {
261+
zmq::message_t zmsg;
262+
auto res = socket.recv(zmsg, zmq::recv_flags::none);
263+
if (res) {
264+
std::string data(static_cast<char*>(zmsg.data()), zmsg.size());
265+
return parselist_double(data);
266+
}
267+
} catch (const zmq::error_t&) {
268+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
269+
}
270+
}
271+
std::cerr << "ZMQ recv failed after retries." << std::endl;
272+
return {};
273+
}
274+
};
275+
276+
/**
277+
* Maps socket type string ("REQ", "REP", etc.) to ZMQ constant.
278+
* Returns -1 on unknown type.
279+
*/
280+
inline int zmq_socket_type_from_string(const std::string& s) {
281+
if (s == "REQ") return ZMQ_REQ;
282+
if (s == "REP") return ZMQ_REP;
283+
if (s == "PUB") return ZMQ_PUB;
284+
if (s == "SUB") return ZMQ_SUB;
285+
if (s == "PUSH") return ZMQ_PUSH;
286+
if (s == "PULL") return ZMQ_PULL;
287+
if (s == "PAIR") return ZMQ_PAIR;
288+
return -1;
289+
}
290+
#endif // CONCORE_USE_ZMQ
291+
181292
} // namespace concore_base
182293

183294
#endif // CONCORE_BASE_HPP

0 commit comments

Comments
 (0)