Skip to content

Commit baed16b

Browse files
committed
DPL Analysis: introduce aod-writer-df-offset option
1 parent 05e0d45 commit baed16b

File tree

7 files changed

+46
-18
lines changed

7 files changed

+46
-18
lines changed

Framework/Core/include/Framework/DataOutputDirector.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,16 @@ struct DataOutputDirector {
7070
void readSpecs(std::vector<InputSpec> inputs);
7171

7272
// fill the DataOutputDirector with information from a json file
73-
std::tuple<std::string, std::string, std::string, float, int> readJson(std::string const& fnjson);
74-
std::tuple<std::string, std::string, std::string, float, int> readJsonString(std::string const& stjson);
73+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJson(std::string const& fnjson);
74+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJsonString(std::string const& stjson);
7575

7676
// read/write private members
7777
int getNumberTimeFramesToMerge() { return mnumberTimeFramesToMerge; }
7878
void setNumberTimeFramesToMerge(int ntfmerge) { mnumberTimeFramesToMerge = ntfmerge > 0 ? ntfmerge : 1; }
7979
std::string getFileMode() { return mfileMode; }
8080
void setFileMode(std::string filemode) { mfileMode = filemode; }
81+
uint64_t getDFOffset() { return dataFrameOffset; }
82+
void setDFOffset(uint64_t offset) { offset > 0 ? dataFrameOffset = offset : 0; }
8183

8284
// get matching DataOutputDescriptors
8385
std::vector<DataOutputDescriptor*> getDataOutputDescriptors(header::DataHeader dh);
@@ -111,10 +113,11 @@ struct DataOutputDirector {
111113
int mfileCounter = 1;
112114
float mmaxfilesize = -1.;
113115
int mnumberTimeFramesToMerge = 1;
116+
uint64_t dataFrameOffset = 0;
114117
std::string mfileMode = "RECREATE";
115118

116-
std::tuple<std::string, std::string, std::string, float, int> readJsonDocument(Document* doc);
117-
const std::tuple<std::string, std::string, std::string, float, int> memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1);
119+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> readJsonDocument(Document* doc);
120+
const std::tuple<std::string, std::string, std::string, float, int, uint64_t> memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1, 0);
118121
};
119122

120123
} // namespace o2::framework

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ DataProcessorSpec
426426
// get TF number from startTime
427427
auto it = tfNumbers.find(startTime);
428428
if (it != tfNumbers.end()) {
429-
tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
429+
tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge() + dod->getDFOffset();
430430
} else {
431431
LOGP(fatal, "No time frame number found for output with start time {}", startTime);
432432
throw std::runtime_error("Processing is stopped!");

Framework/Core/src/DataOutputDirector.cxx

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ void DataOutputDirector::readSpecs(std::vector<InputSpec> inputs)
215215
}
216216
}
217217

218-
std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJson(std::string const& fnjson)
218+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJson(std::string const& fnjson)
219219
{
220220
// open the file
221221
FILE* fjson = fopen(fnjson.c_str(), "r");
@@ -231,28 +231,26 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
231231
// parse the json file
232232
Document jsonDocument;
233233
jsonDocument.ParseStream(jsonStream);
234-
auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
234+
auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument);
235235

236236
// clean up
237237
fclose(fjson);
238238

239-
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
239+
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset);
240240
}
241241

242-
std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonString(std::string const& jsonString)
242+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJsonString(std::string const& jsonString)
243243
{
244244
// parse the json string
245245
Document jsonDocument;
246246
jsonDocument.Parse(jsonString.c_str());
247-
auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
247+
auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument);
248248

249-
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
249+
return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset);
250250
}
251251

252-
std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonDocument(Document* jsonDocument)
252+
std::tuple<std::string, std::string, std::string, float, int, uint64_t> DataOutputDirector::readJsonDocument(Document* jsonDocument)
253253
{
254-
std::string smc(":");
255-
std::string slh("/");
256254
const char* itemName;
257255

258256
// initialisations
@@ -261,6 +259,7 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
261259
std::string fmode("");
262260
float maxfs = -1.;
263261
int ntfm = -1;
262+
uint64_t offset = 0;
264263

265264
// is it a proper json document?
266265
if (jsonDocument->HasParseError()) {
@@ -351,8 +350,21 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
351350
}
352351
}
353352

353+
itemName = "offset";
354+
if (dodirItem.HasMember(itemName)) {
355+
if (dodirItem[itemName].IsNumber()) {
356+
offset = dodirItem[itemName].GetUint64();
357+
setDFOffset(offset);
358+
} else {
359+
LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName);
360+
return memptyanswer;
361+
}
362+
}
363+
354364
itemName = "OutputDescriptors";
355365
if (dodirItem.HasMember(itemName)) {
366+
std::string slh("/");
367+
std::string smc(":");
356368
if (!dodirItem[itemName].IsArray()) {
357369
LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
358370
return memptyanswer;
@@ -419,7 +431,7 @@ std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector
419431
printOut();
420432
}
421433

422-
return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm);
434+
return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm, offset);
423435
}
424436

425437
std::vector<DataOutputDescriptor*> DataOutputDirector::getDataOutputDescriptors(header::DataHeader dh)

Framework/Core/src/WorkflowCustomizationHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ std::vector<ConfigParamSpec> WorkflowCustomizationHelpers::requiredWorkflowOptio
6666
{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}},
6767
{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}},
6868
{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}},
69+
{"aod-writer-df-offset", VariantType::UInt64, 0UL, {"Offset for dataframe numbering"}},
6970

7071
{"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}},
7172
{"fairmq-recv-buffer-size", VariantType::Int, 4, {"recvBufferSize option for FairMQ channels"}},

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,12 +992,13 @@ std::shared_ptr<DataOutputDirector> WorkflowHelpers::getDataOutputDirector(Confi
992992
float mfs, maxfilesize(-1.);
993993
std::string fmo, filemode("RECREATE");
994994
int ntfm, ntfmerge = 1;
995+
uint64_t offset = 0;
995996

996997
// values from json
997998
if (options.isSet("aod-writer-json")) {
998999
auto fnjson = options.get<std::string>("aod-writer-json");
9991000
if (!fnjson.empty()) {
1000-
std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson);
1001+
std::tie(rdn, fnb, fmo, mfs, ntfm, offset) = dod->readJson(fnjson);
10011002
if (!rdn.empty()) {
10021003
resdir = rdn;
10031004
}
@@ -1068,11 +1069,18 @@ std::shared_ptr<DataOutputDirector> WorkflowHelpers::getDataOutputDirector(Confi
10681069
}
10691070
}
10701071
}
1072+
if (options.isSet("aod-writer-df-offset")) {
1073+
auto off = options.get<uint64_t>("aod-writer-df-offset");
1074+
if (off > 0) {
1075+
offset = off;
1076+
}
1077+
}
10711078
dod->setResultDir(resdir);
10721079
dod->setFilenameBase(fnbase);
10731080
dod->setFileMode(filemode);
10741081
dod->setMaximumFileSize(maxfilesize);
10751082
dod->setNumberTimeFramesToMerge(ntfmerge);
1083+
dod->setDFOffset(offset);
10761084

10771085
return dod;
10781086
}

Framework/Core/src/runDataProcessing.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,6 +2024,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
20242024
"--aod-writer-resmode",
20252025
"--aod-writer-maxfilesize",
20262026
"--aod-writer-keep",
2027+
"--aod-writer-df-offset",
20272028
"--aod-parent-access-level",
20282029
"--aod-parent-base-path-replacement",
20292030
"--driver-client-backend",

Framework/Core/test/test_DataOutputDirector.cxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,15 @@ TEST_CASE("TestDataOutputDirector")
5050
std::string fmode("");
5151
float mfs = -1.;
5252
int ntf = -1;
53+
uint64_t offset = 0;
5354

5455
dh = DataHeader(DataDescription{"DUE"},
5556
DataOrigin{"AOD"},
5657
DataHeader::SubSpecificationType{0});
5758
std::string jsonString(R"({"OutputDirector": {"resfile": "defresults", "resfilemode": "RECREATE", "ntfmerge": 10, "OutputDescriptors": [{"table": "AOD/UNO/0", "columns": ["fEta1","fMom1"], "treename": "uno", "filename": "unoresults"}, {"table": "AOD/DUE/0", "columns": ["fPhi2"], "treename": "due"}]}})");
5859

5960
dod.reset();
60-
std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJsonString(jsonString);
61+
std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJsonString(jsonString);
6162
ds = dod.getDataOutputDescriptors(dh);
6263

6364
REQUIRE(ds.size() == 1);
@@ -78,6 +79,7 @@ TEST_CASE("TestDataOutputDirector")
7879
jf << R"( "resfile": "defresults",)" << std::endl;
7980
jf << R"( "resfilemode": "NEW",)" << std::endl;
8081
jf << R"( "ntfmerge": 10,)" << std::endl;
82+
jf << R"( "offset": 10,)" << std::endl;
8183
jf << R"( "OutputDescriptors": [)" << std::endl;
8284
jf << R"( {)" << std::endl;
8385
jf << R"( "table": "AOD/DUE/0",)" << std::endl;
@@ -102,14 +104,15 @@ TEST_CASE("TestDataOutputDirector")
102104
jf.close();
103105

104106
dod.reset();
105-
std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJson(jsonFile);
107+
std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJson(jsonFile);
106108
dod.setFilenameBase("AnalysisResults");
107109
ds = dod.getDataOutputDescriptors(dh);
108110

109111
REQUIRE(ds.size() == 2);
110112
REQUIRE(dfn == std::string("defresults"));
111113
REQUIRE(fmode == std::string("NEW"));
112114
REQUIRE(ntf == 10);
115+
REQUIRE(offset == 10);
113116

114117
REQUIRE(ds[0]->getFilenameBase() == std::string("unoresults"));
115118
REQUIRE(ds[0]->tablename == std::string("DUE"));

0 commit comments

Comments
 (0)