Skip to content

Commit 63b7f38

Browse files
committed
update serialization
1 parent 45f97cf commit 63b7f38

File tree

5 files changed

+221
-11
lines changed

5 files changed

+221
-11
lines changed

Framework/Core/include/Framework/Expressions.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ struct Filter {
627627
Filter(std::unique_ptr<Node>&& ptr)
628628
{
629629
node = std::move(ptr);
630+
(void)designateSubtrees(node.get());
630631
}
631632

632633
Filter(Node&& node_) : node{std::make_unique<Node>(std::forward<Node>(node_))}
@@ -636,7 +637,6 @@ struct Filter {
636637

637638
Filter(Filter&& other) : node{std::forward<std::unique_ptr<Node>>(other.node)}
638639
{
639-
(void)designateSubtrees(node.get());
640640
}
641641

642642
Filter(std::string const& input_) : input{input_} {}

Framework/Core/src/AnalysisHelpers.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ std::string serializeProjectors(std::vector<framework::expressions::Projector>&
3434
ExpressionJSONHelpers::write(osm, projectors);
3535
return osm.str();
3636
}
37+
38+
std::string serializeSchema(std::shared_ptr<arrow::Schema>& schema)
39+
{
40+
std::stringstream osm;
41+
ArrowJSONHelpers::write(osm, schema);
42+
return osm.str();
43+
}
3744
} // namespace o2::framework

Framework/Core/src/ExpressionJSONHelpers.cxx

Lines changed: 190 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
// In applying this license CERN does not waive the privileges and immunities
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
11-
#include "Framework/ExpressionJSONHelpers.h"
11+
#include "ExpressionJSONHelpers.h"
1212

1313
#include <rapidjson/reader.h>
1414
#include <rapidjson/prettywriter.h>
@@ -105,7 +105,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
105105

106106
bool StartArray()
107107
{
108-
debug << "Starting array" << std::endl;
108+
debug << "StartArray()" << std::endl;
109109
if (states.top() == State::IN_START) {
110110
states.push(State::IN_LIST);
111111
return true;
@@ -116,7 +116,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
116116

117117
bool EndArray(SizeType)
118118
{
119-
debug << "Ending array" << std::endl;
119+
debug << "EndArray()" << std::endl;
120120
if (states.top() == State::IN_LIST) {
121121
states.pop();
122122
return true;
@@ -513,9 +513,7 @@ std::vector<expressions::Projector> o2::framework::ExpressionJSONHelpers::read(s
513513
bool ok = reader.Parse(isw, ereader);
514514

515515
if (!ok) {
516-
std::stringstream error;
517-
error << "Cannot parse serialized Expression, error: " << rapidjson::GetParseError_En(reader.GetParseErrorCode()) << " at offset: " << reader.GetErrorOffset();
518-
throw std::runtime_error(error.str());
516+
throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset());
519517
}
520518
return std::move(ereader.result);
521519
}
@@ -637,4 +635,190 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector<o2
637635
w.EndObject();
638636
}
639637

638+
namespace {
639+
struct SchemaReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, SchemaReader> {
640+
using Ch = rapidjson::UTF8<>::Ch;
641+
using SizeType = rapidjson::SizeType;
642+
643+
enum struct State {
644+
IN_START,
645+
IN_LIST,
646+
IN_FIELD,
647+
IN_ERROR
648+
};
649+
650+
std::stack<State> states;
651+
std::ostringstream debug;
652+
653+
std::shared_ptr<arrow::Schema> schema = nullptr;
654+
std::vector<std::shared_ptr<arrow::Field>> fields;
655+
656+
std::string currentKey;
657+
658+
std::string name;
659+
atype::type type;
660+
661+
SchemaReader()
662+
{
663+
debug << ">>> Start" << std::endl;
664+
states.push(State::IN_START);
665+
}
666+
667+
bool StartArray()
668+
{
669+
debug << "Starting array" << std::endl;
670+
if (states.top() == State::IN_START && currentKey.compare("fields") == 0) {
671+
states.push(State::IN_LIST);
672+
return true;
673+
}
674+
states.push(State::IN_ERROR);
675+
return false;
676+
}
677+
678+
bool EndArray(SizeType)
679+
{
680+
debug << "Ending array" << std::endl;
681+
if (states.top() == State::IN_LIST) {
682+
//finalize schema
683+
schema = std::make_shared<arrow::Schema>(fields);
684+
states.pop();
685+
return true;
686+
}
687+
states.push(State::IN_ERROR);
688+
return false;
689+
}
690+
691+
bool Key(const Ch* str, SizeType, bool)
692+
{
693+
debug << "Key(" << str << ")" << std::endl;
694+
currentKey = str;
695+
if (states.top() == State::IN_START) {
696+
if (currentKey.compare("fields") == 0) {
697+
return true;
698+
}
699+
}
700+
701+
if (states.top() == State::IN_FIELD) {
702+
if (currentKey.compare("name") == 0) {
703+
return true;
704+
}
705+
if (currentKey.compare("type") == 0) {
706+
return true;
707+
}
708+
}
709+
710+
states.push(State::IN_ERROR);
711+
return false;
712+
}
713+
714+
bool StartObject()
715+
{
716+
debug << "StartObject()" << std::endl;
717+
if (states.top() == State::IN_START) {
718+
return true;
719+
}
720+
721+
if (states.top() == State::IN_LIST) {
722+
states.push(State::IN_FIELD);
723+
return true;
724+
}
725+
726+
states.push(State::IN_ERROR);
727+
return false;
728+
}
729+
730+
bool EndObject(SizeType)
731+
{
732+
debug << "EndObject()" << std::endl;
733+
if (states.top() == State::IN_FIELD) {
734+
states.pop();
735+
// add a field
736+
fields.emplace_back(std::make_shared<arrow::Field>(name, expressions::concreteArrowType(type)));
737+
return true;
738+
}
739+
740+
if (states.top() == State::IN_START) {
741+
return true;
742+
}
743+
744+
states.push(State::IN_ERROR);
745+
return false;
746+
}
747+
748+
bool Uint(unsigned i)
749+
{
750+
debug << "Uint(" << i << ")" << std::endl;
751+
if (states.top() == State::IN_FIELD) {
752+
if (currentKey.compare("type") == 0) {
753+
type = (atype::type)i;
754+
return true;
755+
}
756+
}
757+
758+
states.push(State::IN_ERROR);
759+
return false;
760+
}
761+
762+
bool String(const Ch* str, SizeType, bool)
763+
{
764+
debug << "String(" << str << ")" << std::endl;
765+
if (states.top() == State::IN_FIELD) {
766+
if (currentKey.compare("name") == 0) {
767+
name = str;
768+
return true;
769+
}
770+
}
771+
772+
states.push(State::IN_ERROR);
773+
return false;
774+
}
775+
776+
bool Int(int i) {
777+
debug << "Int(" << i << ")" << std::endl;
778+
return Uint(i);
779+
}
780+
781+
};
782+
}
783+
784+
std::shared_ptr<arrow::Schema> o2::framework::ArrowJSONHelpers::read(std::istream& s)
785+
{
786+
rapidjson::Reader reader;
787+
rapidjson::IStreamWrapper isw(s);
788+
SchemaReader sreader;
789+
790+
bool ok = reader.Parse(isw, sreader);
791+
792+
if(!ok) {
793+
throw framework::runtime_error_f("Cannot parse serialized Expression, error: %s at offset: %d", rapidjson::GetParseError_En(reader.GetParseErrorCode()), reader.GetErrorOffset());
794+
}
795+
return sreader.schema;
796+
}
797+
798+
namespace {
799+
void writeSchema(rapidjson::Writer<rapidjson::OStreamWrapper>& w, arrow::Schema* schema)
800+
{
801+
for (auto& f : schema->fields()) {
802+
w.StartObject();
803+
w.Key("name");
804+
w.String(f->name().c_str());
805+
w.Key("type");
806+
w.Int(f->type()->id());
807+
w.EndObject();
808+
}
809+
}
810+
}
811+
812+
void o2::framework::ArrowJSONHelpers::write(std::ostream& o, std::shared_ptr<arrow::Schema>& schema)
813+
{
814+
rapidjson::OStreamWrapper osw(o);
815+
rapidjson::Writer<rapidjson::OStreamWrapper> w(osw);
816+
w.StartObject();
817+
w.Key("fields");
818+
w.StartArray();
819+
writeSchema(w, schema.get());
820+
w.EndArray();
821+
w.EndObject();
822+
}
823+
640824
} // namespace o2::framework

Framework/Core/include/Framework/ExpressionJSONHelpers.h renamed to Framework/Core/src/ExpressionJSONHelpers.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
namespace o2::framework
1717
{
1818
struct ExpressionJSONHelpers {
19-
// static std::unique_ptr<expressions::Node> read(std::istream& s);
2019
static std::vector<expressions::Projector> read(std::istream& s);
2120
static void write(std::ostream& o, std::vector<expressions::Projector>& projectors);
2221
};
22+
23+
struct ArrowJSONHelpers {
24+
static std::shared_ptr<arrow::Schema> read(std::istream& s);
25+
static void write(std::ostream& o, std::shared_ptr<arrow::Schema>& schema);
26+
};
2327
} // namespace o2::framework
2428

2529
#endif // FRAMEWORK_EXPRESSIONJSONHELPERS_H

Framework/Core/test/test_Expressions.cxx

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#include "Framework/Configurable.h"
1313
#include "Framework/ExpressionHelpers.h"
1414
#include "Framework/AnalysisDataModel.h"
15-
#include "Framework/ExpressionJSONHelpers.h"
15+
#include "../src/ExpressionJSONHelpers.h"
1616
#include <catch_amalgamated.hpp>
1717
#include <arrow/util/config.h>
1818
#include <iostream>
@@ -423,6 +423,21 @@ TEST_CASE("TestExpressionSerialization")
423423
auto t22 = createExpressionTree(s22, schemap);
424424
REQUIRE(t12->ToString() == t22->ToString());
425425

426-
std::cout << schemaf->ToString() << std::endl;
427-
std::cout << schemap->ToString() << std::endl;
426+
osm.clear();
427+
osm.str("");
428+
ArrowJSONHelpers::write(osm, schemaf);
429+
430+
ism.clear();
431+
ism.str(osm.str());
432+
auto newSchemaf = ArrowJSONHelpers::read(ism);
433+
REQUIRE(schemaf->ToString() == newSchemaf->ToString());
434+
435+
osm.clear();
436+
osm.str("");
437+
ArrowJSONHelpers::write(osm, schemap);
438+
439+
ism.clear();
440+
ism.str(osm.str());
441+
auto newSchemap = ArrowJSONHelpers::read(ism);
442+
REQUIRE(schemap->ToString() == newSchemap->ToString());
428443
}

0 commit comments

Comments
 (0)