Skip to content

Commit 866e3bb

Browse files
committed
Make it work with newer versions of protobuf
1 parent 5a02fd3 commit 866e3bb

File tree

6 files changed

+55
-59
lines changed

6 files changed

+55
-59
lines changed

occ/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ find_package(Boost 1.74 REQUIRED COMPONENTS ${BOOST_COMPONENTS})
165165

166166
# Protobuf
167167
set(protobuf_MODULE_COMPATIBLE TRUE)
168-
find_package(protobuf 3.14.0 CONFIG REQUIRED)
168+
find_package(protobuf CONFIG REQUIRED)
169169
message(STATUS "Using protobuf ${protobuf_VERSION}")
170170

171171
# OpenSSL on Mac

occ/cmake/OccConfig.cmake.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ include(CMakeFindDependencyMacro)
3131
list(APPEND CMAKE_MODULE_PATH ${Occ_CMAKE_DIR})
3232
find_dependency(Boost 1.68 REQUIRED COMPONENTS program_options)
3333
set(protobuf_MODULE_COMPATIBLE TRUE)
34-
find_dependency(protobuf 3.7.1 CONFIG REQUIRED)
34+
find_dependency(protobuf CONFIG REQUIRED)
3535
find_dependency(gRPC 1.19.1 CONFIG REQUIRED)
3636

3737
list(REMOVE_AT CMAKE_MODULE_PATH -1)

occ/occlib/OccServer.cxx

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,22 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context,
6868
boost::uuids::basic_random_generator<boost::mt19937> gen;
6969
std::string id = boost::uuids::to_string(gen());
7070

71-
boost::lockfree::queue<pb::DeviceEvent*> eventQueue;
71+
boost::lockfree::queue<occ_pb::DeviceEvent*> eventQueue;
7272
m_eventQueues[id] = &eventQueue;
7373
DEFER({
7474
m_eventQueues.erase(id);
7575
});
7676

7777
bool isStreamOpen = true;
7878
while (!m_destroying && isStreamOpen && m_rco->getState() != t_State::done) {
79-
pb::DeviceEvent *newEvent;
79+
occ_pb::DeviceEvent *newEvent;
8080
bool ok = eventQueue.pop(newEvent);
8181
if (!ok) { // queue empty, sleep and retry
8282
std::this_thread::sleep_for(2ms);
8383
continue;
8484
}
8585

86-
pb::EventStreamReply response;
86+
occ_pb::EventStreamReply response;
8787
if (newEvent) {
8888
response.mutable_event()->CopyFrom(*newEvent);
8989
isStreamOpen = writer->Write(response);
@@ -94,8 +94,8 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context,
9494
}
9595

9696
grpc::Status OccServer::StateStream(grpc::ServerContext* context,
97-
const pb::StateStreamRequest* request,
98-
grpc::ServerWriter<pb::StateStreamReply>* writer)
97+
const occ_pb::StateStreamRequest* request,
98+
grpc::ServerWriter<occ_pb::StateStreamReply>* writer)
9999
{
100100
(void) context;
101101
(void) request;
@@ -118,8 +118,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context,
118118
continue;
119119
}
120120

121-
pb::StateStreamReply response;
122-
response.set_type(pb::STATE_STABLE);
121+
occ_pb::StateStreamReply response;
122+
response.set_type(occ_pb::STATE_STABLE);
123123
response.set_state(getStringFromState(newState));
124124

125125
if (newState != t_State::done) {
@@ -133,8 +133,8 @@ grpc::Status OccServer::StateStream(grpc::ServerContext* context,
133133
}
134134

135135
grpc::Status OccServer::GetState(grpc::ServerContext* context,
136-
const pb::GetStateRequest* request,
137-
pb::GetStateReply* response)
136+
const occ_pb::GetStateRequest* request,
137+
occ_pb::GetStateReply* response)
138138
{
139139
std::lock_guard<std::mutex> lock(m_mu);
140140

@@ -159,8 +159,8 @@ grpc::Status OccServer::GetState(grpc::ServerContext* context,
159159
* @return the status, either grpc::Status::OK or an error status
160160
*/
161161
grpc::Status OccServer::Transition(grpc::ServerContext* context,
162-
const pb::TransitionRequest* request,
163-
pb::TransitionReply* response)
162+
const occ_pb::TransitionRequest* request,
163+
occ_pb::TransitionReply* response)
164164
{
165165
std::lock_guard<std::mutex> lock(m_mu);
166166

@@ -213,11 +213,11 @@ grpc::Status OccServer::Transition(grpc::ServerContext* context,
213213
response->set_transitionevent(request->transitionevent());
214214
response->set_ok(newStateStr == finalState);
215215
if (newState == error) { // ERROR state
216-
response->set_trigger(pb::DEVICE_ERROR);
216+
response->set_trigger(occ_pb::DEVICE_ERROR);
217217
} else if (newStateStr == finalState) { // correct destination state
218-
response->set_trigger(pb::EXECUTOR);
218+
response->set_trigger(occ_pb::EXECUTOR);
219219
} else { // some other state, for whatever reason - we assume DEVICE_INTENTIONAL
220-
response->set_trigger(pb::DEVICE_INTENTIONAL);
220+
response->set_trigger(occ_pb::DEVICE_INTENTIONAL);
221221
}
222222

223223
std::cout << "[OCC] new state: " << newStateStr << std::endl;
@@ -394,14 +394,14 @@ void OccServer::publishState(t_State s)
394394
}
395395
}
396396

397-
void OccServer::pushEvent(pb::DeviceEvent* event)
397+
void OccServer::pushEvent(occ_pb::DeviceEvent* event)
398398
{
399399
for (auto item : m_eventQueues) {
400400
item.second->push(event);
401401
}
402402
printf("[OCC] Object: %s - pushing event = %s\n",
403403
m_rco->getName().c_str(),
404-
pb::DeviceEventType_Name(event->type()).c_str());
404+
occ_pb::DeviceEventType_Name(event->type()).c_str());
405405
}
406406

407407
bool OccServer::checkMachineDone()
@@ -428,8 +428,8 @@ void OccServer::runChecker()
428428
int err = m_rco->iterateRunning();
429429
if (err == 1) { // signal EndOfData event
430430
endOfData = true;
431-
auto eodEvent = new pb::DeviceEvent;
432-
eodEvent->set_type(pb::END_OF_STREAM);
431+
auto eodEvent = new occ_pb::DeviceEvent;
432+
eodEvent->set_type(occ_pb::END_OF_STREAM);
433433
pushEvent(eodEvent);
434434
}
435435
else if (err) {
@@ -446,8 +446,8 @@ void OccServer::runChecker()
446446

447447
// the above publishes a state change event to the StateStream, but we also push an exception event on the
448448
// EventStream because the transition was initiated by the task
449-
auto taskErrorEvent = new pb::DeviceEvent;
450-
taskErrorEvent->set_type(pb::TASK_INTERNAL_ERROR);
449+
auto taskErrorEvent = new occ_pb::DeviceEvent;
450+
taskErrorEvent->set_type(occ_pb::TASK_INTERNAL_ERROR);
451451
pushEvent(taskErrorEvent);
452452
}
453453
}

occ/occlib/OccServer.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
#include <mutex>
3737
#include <thread>
3838

39-
namespace pb = occ_pb;
40-
4139

4240
namespace boost {
4341
namespace property_tree
@@ -61,7 +59,7 @@ const std::unordered_map<std::string, std::string> EXPECTED_FINAL_STATE = {
6159
{"RECOVER", "STANDBY"},
6260
};
6361

64-
class OccServer final : public pb::Occ::Service
62+
class OccServer final : public occ_pb::Occ::Service
6563
{
6664
public:
6765
/**
@@ -82,20 +80,20 @@ class OccServer final : public pb::Occ::Service
8280
virtual ~OccServer();
8381

8482
grpc::Status EventStream(grpc::ServerContext* context,
85-
const pb::EventStreamRequest* request,
86-
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
83+
const occ_pb::EventStreamRequest* request,
84+
grpc::ServerWriter<occ_pb::EventStreamReply>* writer) override;
8785

8886
grpc::Status StateStream(grpc::ServerContext* context,
89-
const pb::StateStreamRequest* request,
90-
grpc::ServerWriter<pb::StateStreamReply>* writer) override;
87+
const occ_pb::StateStreamRequest* request,
88+
grpc::ServerWriter<occ_pb::StateStreamReply>* writer) override;
9189

9290
grpc::Status GetState(grpc::ServerContext* context,
93-
const pb::GetStateRequest* request,
94-
pb::GetStateReply* response) override;
91+
const occ_pb::GetStateRequest* request,
92+
occ_pb::GetStateReply* response) override;
9593

9694
grpc::Status Transition(grpc::ServerContext* context,
97-
const pb::TransitionRequest* request,
98-
pb::TransitionReply* response) override;
95+
const occ_pb::TransitionRequest* request,
96+
occ_pb::TransitionReply* response) override;
9997

10098
bool checkMachineDone();
10199

@@ -104,7 +102,7 @@ class OccServer final : public pb::Occ::Service
104102
void updateState(t_State s);
105103

106104
void publishState(t_State s);
107-
void pushEvent(pb::DeviceEvent* event);
105+
void pushEvent(occ_pb::DeviceEvent* event);
108106

109107
void runChecker();
110108

@@ -116,7 +114,7 @@ class OccServer final : public pb::Occ::Service
116114
bool m_machineDone;
117115

118116
std::unordered_map<std::string, boost::lockfree::queue<t_State>* > m_stateQueues;
119-
std::unordered_map<std::string, boost::lockfree::queue<pb::DeviceEvent*>* > m_eventQueues;
117+
std::unordered_map<std::string, boost::lockfree::queue<occ_pb::DeviceEvent*>* > m_eventQueues;
120118
};
121119

122120

occ/plugin/OccPluginServer.cxx

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ OccPluginServer::EventStream(grpc::ServerContext* context,
7070
if (state == "EXITING") {
7171
std::unique_lock<std::mutex> finished_lk(finished_mu);
7272

73-
auto nilEvent = new pb::DeviceEvent();
74-
nilEvent->set_type(pb::NULL_DEVICE_EVENT);
75-
pb::EventStreamReply response;
73+
auto nilEvent = new occ_pb::DeviceEvent();
74+
nilEvent->set_type(occ_pb::NULL_DEVICE_EVENT);
75+
occ_pb::EventStreamReply response;
7676
response.mutable_event()->CopyFrom(*nilEvent);
7777

7878
writer->WriteLast(response, grpc::WriteOptions());
@@ -100,8 +100,8 @@ OccPluginServer::EventStream(grpc::ServerContext* context,
100100

101101
grpc::Status
102102
OccPluginServer::StateStream(grpc::ServerContext* context,
103-
const pb::StateStreamRequest* request,
104-
grpc::ServerWriter<pb::StateStreamReply>* writer)
103+
const occ_pb::StateStreamRequest* request,
104+
grpc::ServerWriter<occ_pb::StateStreamReply>* writer)
105105
{
106106

107107
(void) context;
@@ -116,14 +116,14 @@ OccPluginServer::StateStream(grpc::ServerContext* context,
116116
std::lock_guard<std::mutex> lock(writer_mu);
117117
auto state = fair::mq::PluginServices::ToStr(reachedState);
118118
last_known_state = state;
119-
pb::StateType sType = isIntermediateFMQState(state) ? pb::STATE_INTERMEDIATE : pb::STATE_STABLE;
119+
occ_pb::StateType sType = isIntermediateFMQState(state) ? occ_pb::STATE_INTERMEDIATE : occ_pb::STATE_STABLE;
120120

121-
pb::StateStreamReply response;
121+
occ_pb::StateStreamReply response;
122122
response.set_type(sType);
123123
response.set_state(state);
124124

125125
OLOG(debug) << "[StateStream] new state: " << state << "; type: "
126-
<< pb::StateType_Name(sType);
126+
<< occ_pb::StateType_Name(sType);
127127

128128
if (state != "EXITING") {
129129
writer->Write(response);
@@ -151,8 +151,8 @@ OccPluginServer::StateStream(grpc::ServerContext* context,
151151
}
152152

153153
grpc::Status OccPluginServer::GetState(grpc::ServerContext* context,
154-
const pb::GetStateRequest* request,
155-
pb::GetStateReply* response)
154+
const occ_pb::GetStateRequest* request,
155+
occ_pb::GetStateReply* response)
156156
{
157157
std::lock_guard<std::mutex> lock(m_mu);
158158

@@ -178,8 +178,8 @@ grpc::Status OccPluginServer::GetState(grpc::ServerContext* context,
178178
*/
179179
grpc::Status
180180
OccPluginServer::Transition(grpc::ServerContext* context,
181-
const pb::TransitionRequest* request,
182-
pb::TransitionReply* response)
181+
const occ_pb::TransitionRequest* request,
182+
occ_pb::TransitionReply* response)
183183
{
184184
// Valid FairMQ state machine transitions, mapped to DeviceStateTransition objects:
185185
// {DeviceStateTransition::Auto, "Auto"}, // ever needed?
@@ -238,7 +238,7 @@ OccPluginServer::Transition(grpc::ServerContext* context,
238238

239239
auto nopbResponse = std::get<0>(transitionOutcome);
240240
response->set_state(nopbResponse.state);
241-
response->set_trigger(static_cast<pb::StateChangeTrigger>(nopbResponse.trigger));
241+
response->set_trigger(static_cast<occ_pb::StateChangeTrigger>(nopbResponse.trigger));
242242
response->set_transitionevent(nopbResponse.transitionEvent);
243243
response->set_ok(nopbResponse.ok);
244244

occ/plugin/OccPluginServer.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
#include <mutex>
3434

35-
namespace pb = occ_pb;
36-
3735
namespace fair
3836
{
3937
namespace mq
@@ -42,7 +40,7 @@ class PluginServices;
4240
}
4341
}
4442

45-
class OccPluginServer final : public pb::Occ::Service
43+
class OccPluginServer final : public occ_pb::Occ::Service
4644
{
4745
public:
4846
explicit OccPluginServer(fair::mq::PluginServices*);
@@ -51,20 +49,20 @@ class OccPluginServer final : public pb::Occ::Service
5149
{}
5250

5351
grpc::Status EventStream(grpc::ServerContext* context,
54-
const pb::EventStreamRequest* request,
55-
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
52+
const occ_pb::EventStreamRequest* request,
53+
grpc::ServerWriter<occ_pb::EventStreamReply>* writer) override;
5654

5755
grpc::Status StateStream(grpc::ServerContext* context,
58-
const pb::StateStreamRequest* request,
59-
grpc::ServerWriter<pb::StateStreamReply>* writer) override;
56+
const occ_pb::StateStreamRequest* request,
57+
grpc::ServerWriter<occ_pb::StateStreamReply>* writer) override;
6058

6159
grpc::Status GetState(grpc::ServerContext* context,
62-
const pb::GetStateRequest* request,
63-
pb::GetStateReply* response) override;
60+
const occ_pb::GetStateRequest* request,
61+
occ_pb::GetStateReply* response) override;
6462

6563
grpc::Status Transition(grpc::ServerContext* context,
66-
const pb::TransitionRequest* request,
67-
pb::TransitionReply* response) override;
64+
const occ_pb::TransitionRequest* request,
65+
occ_pb::TransitionReply* response) override;
6866

6967
private:
7068
fair::mq::PluginServices* m_pluginServices;

0 commit comments

Comments
 (0)