88// In applying this license CERN does not waive the privileges and immunities
99// granted to it by virtue of its status as an Intergovernmental Organization
1010// or submit itself to any jurisdiction.
11- #include " Framework/ ExpressionJSONHelpers.h"
11+ #include " ExpressionJSONHelpers.h"
1212
1313#include < rapidjson/reader.h>
1414#include < rapidjson/prettywriter.h>
@@ -105,7 +105,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
105105
106106 bool StartArray ()
107107 {
108- debug << " Starting array " << std::endl;
108+ debug << " StartArray() " << std::endl;
109109 if (states.top () == State::IN_START) {
110110 states.push (State::IN_LIST);
111111 return true ;
@@ -116,7 +116,7 @@ struct ExpressionReader : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>,
116116
117117 bool EndArray (SizeType)
118118 {
119- debug << " Ending array " << std::endl;
119+ debug << " EndArray() " << std::endl;
120120 if (states.top () == State::IN_LIST) {
121121 states.pop ();
122122 return true ;
@@ -513,9 +513,7 @@ std::vector<expressions::Projector> o2::framework::ExpressionJSONHelpers::read(s
513513 bool ok = reader.Parse (isw, ereader);
514514
515515 if (!ok) {
516- std::stringstream error;
517- error << " Cannot parse serialized Expression, error: " << rapidjson::GetParseError_En (reader.GetParseErrorCode ()) << " at offset: " << reader.GetErrorOffset ();
518- throw std::runtime_error (error.str ());
516+ throw framework::runtime_error_f (" Cannot parse serialized Expression, error: %s at offset: %d" , rapidjson::GetParseError_En (reader.GetParseErrorCode ()), reader.GetErrorOffset ());
519517 }
520518 return std::move (ereader.result );
521519}
@@ -637,4 +635,190 @@ void o2::framework::ExpressionJSONHelpers::write(std::ostream& o, std::vector<o2
637635 w.EndObject ();
638636}
639637
638+ namespace {
639+ struct SchemaReader : public rapidjson ::BaseReaderHandler<rapidjson::UTF8<>, SchemaReader> {
640+ using Ch = rapidjson::UTF8<>::Ch;
641+ using SizeType = rapidjson::SizeType;
642+
643+ enum struct State {
644+ IN_START,
645+ IN_LIST,
646+ IN_FIELD,
647+ IN_ERROR
648+ };
649+
650+ std::stack<State> states;
651+ std::ostringstream debug;
652+
653+ std::shared_ptr<arrow::Schema> schema = nullptr ;
654+ std::vector<std::shared_ptr<arrow::Field>> fields;
655+
656+ std::string currentKey;
657+
658+ std::string name;
659+ atype::type type;
660+
661+ SchemaReader ()
662+ {
663+ debug << " >>> Start" << std::endl;
664+ states.push (State::IN_START);
665+ }
666+
667+ bool StartArray ()
668+ {
669+ debug << " Starting array" << std::endl;
670+ if (states.top () == State::IN_START && currentKey.compare (" fields" ) == 0 ) {
671+ states.push (State::IN_LIST);
672+ return true ;
673+ }
674+ states.push (State::IN_ERROR);
675+ return false ;
676+ }
677+
678+ bool EndArray (SizeType)
679+ {
680+ debug << " Ending array" << std::endl;
681+ if (states.top () == State::IN_LIST) {
682+ // finalize schema
683+ schema = std::make_shared<arrow::Schema>(fields);
684+ states.pop ();
685+ return true ;
686+ }
687+ states.push (State::IN_ERROR);
688+ return false ;
689+ }
690+
691+ bool Key (const Ch* str, SizeType, bool )
692+ {
693+ debug << " Key(" << str << " )" << std::endl;
694+ currentKey = str;
695+ if (states.top () == State::IN_START) {
696+ if (currentKey.compare (" fields" ) == 0 ) {
697+ return true ;
698+ }
699+ }
700+
701+ if (states.top () == State::IN_FIELD) {
702+ if (currentKey.compare (" name" ) == 0 ) {
703+ return true ;
704+ }
705+ if (currentKey.compare (" type" ) == 0 ) {
706+ return true ;
707+ }
708+ }
709+
710+ states.push (State::IN_ERROR);
711+ return false ;
712+ }
713+
714+ bool StartObject ()
715+ {
716+ debug << " StartObject()" << std::endl;
717+ if (states.top () == State::IN_START) {
718+ return true ;
719+ }
720+
721+ if (states.top () == State::IN_LIST) {
722+ states.push (State::IN_FIELD);
723+ return true ;
724+ }
725+
726+ states.push (State::IN_ERROR);
727+ return false ;
728+ }
729+
730+ bool EndObject (SizeType)
731+ {
732+ debug << " EndObject()" << std::endl;
733+ if (states.top () == State::IN_FIELD) {
734+ states.pop ();
735+ // add a field
736+ fields.emplace_back (std::make_shared<arrow::Field>(name, expressions::concreteArrowType (type)));
737+ return true ;
738+ }
739+
740+ if (states.top () == State::IN_START) {
741+ return true ;
742+ }
743+
744+ states.push (State::IN_ERROR);
745+ return false ;
746+ }
747+
748+ bool Uint (unsigned i)
749+ {
750+ debug << " Uint(" << i << " )" << std::endl;
751+ if (states.top () == State::IN_FIELD) {
752+ if (currentKey.compare (" type" ) == 0 ) {
753+ type = (atype::type)i;
754+ return true ;
755+ }
756+ }
757+
758+ states.push (State::IN_ERROR);
759+ return false ;
760+ }
761+
762+ bool String (const Ch* str, SizeType, bool )
763+ {
764+ debug << " String(" << str << " )" << std::endl;
765+ if (states.top () == State::IN_FIELD) {
766+ if (currentKey.compare (" name" ) == 0 ) {
767+ name = str;
768+ return true ;
769+ }
770+ }
771+
772+ states.push (State::IN_ERROR);
773+ return false ;
774+ }
775+
776+ bool Int (int i) {
777+ debug << " Int(" << i << " )" << std::endl;
778+ return Uint (i);
779+ }
780+
781+ };
782+ }
783+
784+ std::shared_ptr<arrow::Schema> o2::framework::ArrowJSONHelpers::read (std::istream& s)
785+ {
786+ rapidjson::Reader reader;
787+ rapidjson::IStreamWrapper isw (s);
788+ SchemaReader sreader;
789+
790+ bool ok = reader.Parse (isw, sreader);
791+
792+ if (!ok) {
793+ throw framework::runtime_error_f (" Cannot parse serialized Expression, error: %s at offset: %d" , rapidjson::GetParseError_En (reader.GetParseErrorCode ()), reader.GetErrorOffset ());
794+ }
795+ return sreader.schema ;
796+ }
797+
798+ namespace {
799+ void writeSchema (rapidjson::Writer<rapidjson::OStreamWrapper>& w, arrow::Schema* schema)
800+ {
801+ for (auto & f : schema->fields ()) {
802+ w.StartObject ();
803+ w.Key (" name" );
804+ w.String (f->name ().c_str ());
805+ w.Key (" type" );
806+ w.Int (f->type ()->id ());
807+ w.EndObject ();
808+ }
809+ }
810+ }
811+
812+ void o2::framework::ArrowJSONHelpers::write (std::ostream& o, std::shared_ptr<arrow::Schema>& schema)
813+ {
814+ rapidjson::OStreamWrapper osw (o);
815+ rapidjson::Writer<rapidjson::OStreamWrapper> w (osw);
816+ w.StartObject ();
817+ w.Key (" fields" );
818+ w.StartArray ();
819+ writeSchema (w, schema.get ());
820+ w.EndArray ();
821+ w.EndObject ();
822+ }
823+
640824} // namespace o2::framework
0 commit comments