Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Framework/Core/include/Framework/DataOutputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ struct DataOutputDirector {
void readSpecs(std::vector<InputSpec> inputs);

// fill the DataOutputDirector with information from a json file
std::tuple<std::string, std::string, std::string, float, int> readJson(std::string const& fnjson);
std::tuple<std::string, std::string, std::string, float, int> readJsonString(std::string const& stjson);
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJson(std::string const& fnjson);
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJsonString(std::string const& stjson);

// read/write private members
int getNumberTimeFramesToMerge() { return mnumberTimeFramesToMerge; }
void setNumberTimeFramesToMerge(int ntfmerge) { mnumberTimeFramesToMerge = ntfmerge > 0 ? ntfmerge : 1; }
std::string getFileMode() { return mfileMode; }
void setFileMode(std::string filemode) { mfileMode = filemode; }
uint64_t getDFOffset() { return dataFrameOffset; }
void setDFOffset(uint64_t offset) { offset > 0 ? dataFrameOffset = offset : 0; }

// get matching DataOutputDescriptors
std::vector<DataOutputDescriptor*> getDataOutputDescriptors(header::DataHeader dh);
Expand Down Expand Up @@ -111,10 +113,11 @@ struct DataOutputDirector {
int mfileCounter = 1;
float mmaxfilesize = -1.;
int mnumberTimeFramesToMerge = 1;
uint64_t dataFrameOffset = 0;
std::string mfileMode = "RECREATE";

std::tuple<std::string, std::string, std::string, float, int> readJsonDocument(Document* doc);
const std::tuple<std::string, std::string, std::string, float, int> memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1);
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJsonDocument(Document* doc);
const std::tuple<std::string, std::string, std::string, float, int, uint64_t> memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1, 0);
};

} // namespace o2::framework
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ DataProcessorSpec
// get TF number from startTime
auto it = tfNumbers.find(startTime);
if (it != tfNumbers.end()) {
tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge() + dod->getDFOffset();
} else {
LOGP(fatal, "No time frame number found for output with start time {}", startTime);
throw std::runtime_error("Processing is stopped!");
Expand Down
32 changes: 22 additions & 10 deletions Framework/Core/src/DataOutputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void DataOutputDirector::readSpecs(std::vector<InputSpec> inputs)
}
}

std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJson(std::string const& fnjson)
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJson(std::string const& fnjson)
{
// open the file
FILE* fjson = fopen(fnjson.c_str(), "r");
Expand All @@ -231,28 +231,26 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
// parse the json file
Document jsonDocument;
jsonDocument.ParseStream(jsonStream);
auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument);

// clean up
fclose(fjson);

return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset);
}

std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonString(std::string const& jsonString)
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJsonString(std::string const& jsonString)
{
// parse the json string
Document jsonDocument;
jsonDocument.Parse(jsonString.c_str());
auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument);

return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset);
}

std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonDocument(Document* jsonDocument)
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJsonDocument(Document* jsonDocument)
{
std::string smc(":");
std::string slh("/");
const char* itemName;

// initialisations
Expand All @@ -261,6 +259,7 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
std::string fmode("");
float maxfs = -1.;
int ntfm = -1;
uint64_t offset = 0;

// is it a proper json document?
if (jsonDocument->HasParseError()) {
Expand Down Expand Up @@ -351,8 +350,21 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
}
}

itemName = "offset";
if (dodirItem.HasMember(itemName)) {
if (dodirItem[itemName].IsNumber()) {
offset = dodirItem[itemName].GetUint64();
setDFOffset(offset);
} else {
LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName);
return memptyanswer;
}
}

itemName = "OutputDescriptors";
if (dodirItem.HasMember(itemName)) {
std::string slh("/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static / const, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole thing needs a refactoring pass, but it is not critical.

std::string smc(":");
if (!dodirItem[itemName].IsArray()) {
LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
return memptyanswer;
Expand Down Expand Up @@ -419,7 +431,7 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
printOut();
}

return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm);
return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm, offset);
}

std::vector<DataOutputDescriptor*> DataOutputDirector::getDataOutputDescriptors(header::DataHeader dh)
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/WorkflowCustomizationHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ std::vector<ConfigParamSpec> WorkflowCustomizationHelpers::requiredWorkflowOptio
{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}},
{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}},
{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}},
{"aod-writer-df-offset", VariantType::UInt64, 0UL, {"Offset for dataframe numbering"}},

{"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}},
{"fairmq-recv-buffer-size", VariantType::Int, 4, {"recvBufferSize option for FairMQ channels"}},
Expand Down
10 changes: 9 additions & 1 deletion Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -992,12 +992,13 @@ std::shared_ptr<DataOutputDirector> WorkflowHelpers::getDataOutputDirector(Confi
float mfs, maxfilesize(-1.);
std::string fmo, filemode("RECREATE");
int ntfm, ntfmerge = 1;
uint64_t offset = 0;

// values from json
if (options.isSet("aod-writer-json")) {
auto fnjson = options.get<std::string>("aod-writer-json");
if (!fnjson.empty()) {
std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson);
std::tie(rdn, fnb, fmo, mfs, ntfm, offset) = dod->readJson(fnjson);
if (!rdn.empty()) {
resdir = rdn;
}
Expand Down Expand Up @@ -1068,11 +1069,18 @@ std::shared_ptr<DataOutputDirector> WorkflowHelpers::getDataOutputDirector(Confi
}
}
}
if (options.isSet("aod-writer-df-offset")) {
auto off = options.get<uint64_t>("aod-writer-df-offset");
if (off > 0) {
offset = off;
}
}
dod->setResultDir(resdir);
dod->setFilenameBase(fnbase);
dod->setFileMode(filemode);
dod->setMaximumFileSize(maxfilesize);
dod->setNumberTimeFramesToMerge(ntfmerge);
dod->setDFOffset(offset);

return dod;
}
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
"--aod-writer-resmode",
"--aod-writer-maxfilesize",
"--aod-writer-keep",
"--aod-writer-df-offset",
"--aod-parent-access-level",
"--aod-parent-base-path-replacement",
"--driver-client-backend",
Expand Down
7 changes: 5 additions & 2 deletions Framework/Core/test/test_DataOutputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ TEST_CASE("TestDataOutputDirector")
std::string fmode("");
float mfs = -1.;
int ntf = -1;
uint64_t offset = 0;

dh = DataHeader(DataDescription{"DUE"},
DataOrigin{"AOD"},
DataHeader::SubSpecificationType{0});
std::string jsonString(R"({"OutputDirector": {"resfile": "defresults", "resfilemode": "RECREATE", "ntfmerge": 10, "OutputDescriptors": [{"table": "AOD/UNO/0", "columns": ["fEta1","fMom1"], "treename": "uno", "filename": "unoresults"}, {"table": "AOD/DUE/0", "columns": ["fPhi2"], "treename": "due"}]}})");

dod.reset();
std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJsonString(jsonString);
std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJsonString(jsonString);
ds = dod.getDataOutputDescriptors(dh);

REQUIRE(ds.size() == 1);
Expand All @@ -78,6 +79,7 @@ TEST_CASE("TestDataOutputDirector")
jf << R"( "resfile": "defresults",)" << std::endl;
jf << R"( "resfilemode": "NEW",)" << std::endl;
jf << R"( "ntfmerge": 10,)" << std::endl;
jf << R"( "offset": 10,)" << std::endl;
jf << R"( "OutputDescriptors": [)" << std::endl;
jf << R"( {)" << std::endl;
jf << R"( "table": "AOD/DUE/0",)" << std::endl;
Expand All @@ -102,14 +104,15 @@ TEST_CASE("TestDataOutputDirector")
jf.close();

dod.reset();
std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJson(jsonFile);
std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJson(jsonFile);
dod.setFilenameBase("AnalysisResults");
ds = dod.getDataOutputDescriptors(dh);

REQUIRE(ds.size() == 2);
REQUIRE(dfn == std::string("defresults"));
REQUIRE(fmode == std::string("NEW"));
REQUIRE(ntf == 10);
REQUIRE(offset == 10);

REQUIRE(ds[0]->getFilenameBase() == std::string("unoresults"));
REQUIRE(ds[0]->tablename == std::string("DUE"));
Expand Down