Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/dmn-async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class Dmn_Async : public Dmn_Pipe<std::function<void()>> {
-> std::shared_ptr<Dmn_Async_Wait>;

private:
using Dmn_Pipe::read;
using Dmn_Pipe::readAndProcess;
using Dmn_Pipe::write;

/**
* @brief Internal helper that schedules a task to run at a specific time in
* the future (expressed as nanoseconds since epoch).
Expand Down
2 changes: 2 additions & 0 deletions include/dmn-dmesg-pb-util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@

#define DMESG_PB_SET_MSG_TOPIC(pb, val) ((pb).set_topic((val)))

#define DMESG_PB_SET_MSG_CONFLICT(pb, val) ((pb).set_conflict((val)))

#define DMESG_PB_SET_MSG_RUNNINGCOUNTER(pb, val) \
((pb).set_runningcounter((val)))

Expand Down
57 changes: 20 additions & 37 deletions src/dmn-dmesgnet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <cassert>
#include <chrono>
#include <iostream>
#include <memory>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -113,7 +112,7 @@ Dmn_DMesgNet::~Dmn_DMesgNet() noexcept try {
void Dmn_DMesgNet::createInputHandlerProc() {
if (m_input_handler) {
m_write_handler = Dmn_DMesg::openHandler(
m_name, [this](const dmn::DMesgPb &dmesgPb) -> bool { return false; },
m_name, [this](const dmn::DMesgPb &) -> bool { return false; },
[](dmn::DMesgPb dmesgPbWrite) mutable -> void {});

m_input_proc =
Expand Down Expand Up @@ -143,40 +142,24 @@ void Dmn_DMesgNet::createInputHandlerProc() {
{ this->reconciliateDMesgPbSys(dmesgpb_read); }, this,
dmesgpb_read);
} else {
std::cout << "none sys data: "
<< dmesgpb_read.ShortDebugString() << "\n";
DMN_ASYNC_CALL_WITH_CAPTURE(
{
try {
DMESG_PB_SET_MSG_SOURCEWRITEHANDLERIDENTIFIER(
dmesgpb_read, this->m_name);

this->m_write_handler->write(dmesgpb_read);

m_topic_last_dmesgpb[dmesgpb_read.topic()] =
dmesgpb_read;
} catch (...) {
// The data from network is out of sync with data
// in the Dmn_DMesg, and a few should happen:
// - mark the topic as in conflict for local Dmn_DMesg
// - the local Dmn_DMesg will mark all openHandler in
// conflict but waiting for resolution with
// Dmn_DMesgNet master, so they will not allow any
// message on the same topic band.
// - the local Dmn_DMesgNet will broadcast a sys
// conflict message.
// - all networked DMesgNet(s) receives the sys conflict
// message will then put its local Dmn_DMesg in
// conflict state for the same topic.
// - master node will then send its last message for the
// to all nodes, and all nodes receives the message
// will use new message as its last valid message for
// the topic and clear it conflict state.

this->m_subscript_handler->resolveConflict();
}
},
this, dmesgpb_read);
DMESG_PB_SET_MSG_SOURCEWRITEHANDLERIDENTIFIER(dmesgpb_read,
this->m_name);

bool ok = m_write_handler->writeAndCheckConflict(dmesgpb_read);

if (ok) {
m_topic_last_dmesgpb[dmesgpb_read.topic()] = dmesgpb_read;
} else {
DMESG_PB_SET_MSG_CONFLICT(dmesgpb_read, true);

DMN_ASYNC_CALL_WITH_CAPTURE(
{
std::string serialized_string{};
dmesgpb_read.SerializeToString(&serialized_string);
m_output_handler->write(serialized_string);
},
this, dmesgpb_read);
}
} /* else (dmesgpb_read.type() == dmn::DMesgTypePb::sys) */
}
}
Expand Down Expand Up @@ -256,7 +239,7 @@ void Dmn_DMesgNet::createTimerProc() {
m_timer_proc = std::make_unique<dmn::Dmn_Timer<std::chrono::nanoseconds>>(
std::chrono::nanoseconds(DMN_DMESGNET_HEARTBEAT_IN_NS),
[this]() -> void {
this->write([this]() mutable -> void {
this->addExecTask([this]() mutable -> void {
if (this->m_sys.body().sys().self().state() ==
dmn::DMesgStatePb::MasterPending) {
this->m_master_pending_counter++;
Expand Down
14 changes: 12 additions & 2 deletions test/dmn-test-dmesgnet-5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ int main(int argc, char *argv[]) {
dmn::DMesgPb dmesgpb1{};
dmn::DMesgPb dmesgpb1_body{};
dmn::DMesgPb dmesgpb1_sys{};
dmn::DMesgPb dmesgpb1_body_conflict{};
dmn::Dmn_Proc dmesg1_to_dmesg2{
"dmesg1_to_dmesg2", [read_from_write_1, write_to_read_2, write_to_read_1,
&dmesgpb1, &dmesgpb1_body, &dmesgpb1_sys]() {
"dmesg1_to_dmesg2",
[read_from_write_1, write_to_read_2, write_to_read_1, &dmesgpb1,
&dmesgpb1_body, &dmesgpb1_body_conflict, &dmesgpb1_sys]() {
while (true) {
auto data = read_from_write_1->read();
if (!data) {
Expand All @@ -79,6 +81,10 @@ int main(int argc, char *argv[]) {
dmesgpb1.ParseFromString(*data);
if (dmesgpb1.type() == dmn::DMesgTypePb::sys) {
dmesgpb1_sys = dmesgpb1;
} else if (dmesgpb1.conflict()) {
std::cout << "********* conflict: " << dmesgpb1.ShortDebugString()
<< "\n";
dmesgpb1_body_conflict = dmesgpb1;
} else {
dmesgpb1_body = dmesgpb1;

Expand Down Expand Up @@ -143,5 +149,9 @@ int main(int argc, char *argv[]) {
std::cout << "after destroying 1\n";
std::this_thread::sleep_for(std::chrono::seconds(10));

EXPECT_TRUE((dmesgpb1_body_conflict.conflict()));
EXPECT_TRUE((dmesgpb1_body_conflict.topic() == "counter sync"));
EXPECT_TRUE((dmesgpb1_body_conflict.runningcounter() == 1));

return RUN_ALL_TESTS();
}