feat(meos): parameterize PAIR_MEETING dMeet via SQL constant fifth arg (stacks on #18)#19
Open
estebanzimanyi wants to merge 5 commits into
Open
Conversation
… on NebulaStream (33 YAMLs, 27/27 cells) Additive scaffold for the BerlinMOD-9 × 3 streaming-form parity contract on MobilityNebula, sibling to the existing SNCB Q-series and matching the MobilityFlink MobilityDB#3 / MobilityKafka MobilityDB#1 streaming-form definitions. All 27 cells covered: Q1 'which vehicles have appeared' — full (continuous + windowed + snapshot) Q2 'where is vehicle X at time T' — full Q3 'vehicles within 5 km of P' — full Q4 'vehicles inside region R (polygon)'— full Q5 'pairs of vehicles meeting near P' — partial (emit per-vehicle trajectories near P; consumer joins) Q6 'cumulative distance per vehicle' — partial (emit TEMPORAL_SEQUENCE; consumer computes length) Q7 'first passage of vehicle through POI' × {POI1, POI2, POI3} — full (per-POI fan-out) Q8 'vehicles within d of LINESTRING' — full (edwithin_tgeo_geo with LINESTRING geometry) Q9 'distance between X and Y at time T'— partial (emit X and Y trajectories; consumer joins) 18 of 27 cells are FULL (the BerlinMOD-Q semantic is computed entirely inside NebulaStream). 9 cells are PARTIAL — NebulaStream emits the per-window inputs (trajectory, candidate vehicles) and a consumer post-processes for the final BerlinMOD-Q answer. The partial pattern is the natural expression of these queries in NebulaStream's current SQL surface; the path to FULL is documented per-Q in docs/berlinmod-streaming-forms.md (a stream-self-join for Q5/Q9, a temporal_length scalar function for Q6). Form mapping to NebulaStream windows: continuous: SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC) windowed: TUMBLING(time_utc, SIZE 10 SEC) snapshot: TUMBLING(time_utc, SIZE 5 SEC) MEOS-side surface consumed (already exposed by PR MobilityDB#14 + follow-ups): edwithin_tgeo_geo — Q3 (POINT predicate), Q4 (POLYGON, d=0.0), Q5 (POINT predicate), Q7 (per-POI POINT), Q8 (LINESTRING predicate) TEMPORAL_SEQUENCE — Q2 / Q5 / Q6 / Q9 (per-window per-vehicle trajectory) No new MEOS PhysicalFunction classes added; no C++ changes; no SNCB Q-series modifications. All 33 YAMLs are additive in a new Queries/berlinmod/ subdirectory. Add (additions): Queries/berlinmod/q1_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q2_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q3_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q4_{continuous,windowed,snapshot}.yaml (3) Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml (3, partial) Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml (3, partial) Queries/berlinmod/q7_poi{1,2,3}_{continuous,windowed,snapshot}.yaml (9, full via fan-out) Queries/berlinmod/q8_{continuous,windowed,snapshot}.yaml (3, LINESTRING predicate) Queries/berlinmod/q9_{continuous,windowed,snapshot}.yaml (3, partial) Input/input_berlinmod.csv (sample data: 3 vehicles × 21 events, 14 simulated seconds) docs/berlinmod-streaming-forms.md Validation: every YAML parses cleanly via python3 yaml.safe_load. Runtime verification gated on the NebulaStream test harness. Coverage: 27 of 27 cells (100 %), with 18 FULL and 9 PARTIAL annotated explicitly per Q. Path to FULL for the 9 PARTIAL cells is one MobilityNebula C++ PhysicalFunction class each (or a NebulaStream upstream stream-self-join), documented in docs/berlinmod-streaming-forms.md.
…-form cells to full
Adds the TEMPORAL_LENGTH aggregation across the four levels of the
NebulaStream pipeline (logical / physical / parser / lowering) so the
BerlinMOD-Q6 "cumulative distance per vehicle" streaming-form cells
(continuous + windowed + snapshot) compute the spheroidal trajectory
length entirely inside NebulaStream instead of emitting raw trajectories
for a consumer-side reduction.
Logical: nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.{hpp,cpp}
mirroring TemporalSequenceAggregationLogicalFunctionV2 but with finalAggregateStampType = FLOAT64.
Registers as "TemporalLength" in the aggregation registry. Serializes through the existing
TemporalAggregationSerde wire shape with the type tag overridden.
Physical: nes-physical-operators/{include,src}/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.{hpp,cpp}
identical lift / combine / reset / cleanup to TemporalSequenceAggregationPhysicalFunction;
the lower() path builds the same MEOS instant-set trajectory string, parses it via
MEOSWrapper::parseTemporalPoint, and calls MEOS' tpoint_length(Temporal*) to return a single
FLOAT64 result.
Parser: nes-sql-parser/AntlrSQL.g4 adds the TEMPORAL_LENGTH lexer token and includes it in
functionName. AntlrSQLQueryPlanCreator.cpp adds the TEMPORAL_LENGTH dispatch in both the
case-label and string-name paths, parallel to TEMPORAL_SEQUENCE.
Lowering: nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp
adds the TEMPORAL_LENGTH special-case lowering, parallel to TEMPORAL_SEQUENCE, producing a
TemporalLengthAggregationPhysicalFunction with the same (lon, lat, timestamp) state schema.
YAMLs: Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml updated to call
TEMPORAL_LENGTH directly; the FLOAT64 output column replaces the VARSIZED trajectory output;
header comments updated to "FULL".
Docs: docs/berlinmod-streaming-forms.md updated to reflect 21 cells full + 6 cells partial
(Q5 + Q9 only); the path-to-full table now lists those two queries only.
YAML safe_load green on all 3 Q6 cells. Build verification gated on the user's NebulaStream
test harness (vcpkg-bootstrapped); the C++ code follows the established TemporalSequence
template exactly, with the lower() path replaced by tpoint_length.
…streaming-form cells to full
Mirrors the TEMPORAL_LENGTH pattern from the parent PR with two new
four-field aggregations that close the last 6 partial cells on the
MobilityNebula BerlinMOD parity matrix:
PAIR_MEETING(lon, lat, ts, vehicle_id) -> VARSIZED
Lift collects per-event tuples. Lower picks each vehicle's latest known
position in the window, enumerates pairs (a < b), calls MEOS' geog_dwithin
with dMeet = 200 m hardcoded for the BerlinMOD scaffold, and emits a
string-encoded list of meeting pairs (vid_a, vid_b, ts, "<=dMeet" tag).
Future PR can parameterize dMeet via a constant input. Closes Q5 × 3 cells.
CROSS_DISTANCE(lon, lat, ts, vehicle_id) -> FLOAT64
Same lift shape. Lower picks the latest known position of each of the two
target vehicles (VID_A = 100, VID_B = 200 hardcoded), drives the MEOS
nad_tgeo_tgeo distance, and returns a FLOAT64 (NaN if either vehicle is
unobserved). Future PR can parameterize (VID_A, VID_B). Closes Q9 × 3 cells.
Wired across the four pipeline layers identically to TEMPORAL_LENGTH:
- nes-physical-operators/{include,src}/Aggregation/Function/Meos/{PairMeeting,CrossDistance}AggregationPhysicalFunction.{hpp,cpp}
- nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/{PairMeeting,CrossDistance}AggregationLogicalFunction.{hpp,cpp}
- nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt + nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt plugin entries
- nes-sql-parser/AntlrSQL.g4 lexer + functionName tokens
- nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp case-label + string-name dispatch
- nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp special-case lowering with 4-field state schema
YAMLs: Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml and
q9_{continuous,windowed,snapshot}.yaml rewritten to call the new
aggregations directly; sink schemas updated to FLOAT64 / VARSIZED;
header comments updated to FULL.
Docs: docs/berlinmod-streaming-forms.md updated to reflect 27/27 cells
full (was 21 full + 6 partial); MEOS-operators table now lists
PAIR_MEETING and CROSS_DISTANCE alongside the existing ones.
YAML safe_load green on all 6 rewritten Q5/Q9 cells. C++ follows the
established TemporalLength template from the parent MobilityDB#16; build
verification gated on the user's NebulaStream test harness.
… covered' section After PR MobilityDB#16 (TEMPORAL_LENGTH closes Q6) and PR MobilityDB#17 (PAIR_MEETING + CROSS_DISTANCE close Q5 + Q9), the parity matrix is 27/27 full — the doc's own coverage table at the top confirms it. But the section 'Not covered (15 cells / 5 queries)' at line 77 was a remnant from the pre-MobilityDB#16/MobilityDB#17 state and contradicts the rest of the doc. Remove it. Add a new 'Streaming-semantics tier overlay' section that classifies each BerlinMOD-Q by its streaming-execution tier (stateless / bounded-state / windowed / cross-stream) per the closed 7-value vocabulary proposed for the MEOS-API objectModel.streamingSemantics facet (see the sibling RFC on MEOS-API PR MobilityDB#10). The mapping makes the cross-binding picture explicit: a Q's tier on NebulaStream is the same tier on Flink / Kafka, and the table points to the equivalent generic wiring class on Flink for each tier. Two short follow-up notes explain why cross-stream looks different on NebulaStream (single-aggregation Cartesian enumeration vs Flink's interval-join across two streams — same semantic, different topology) and why Q7 is bounded-state rather than windowed (per-POI fan-out, per-(vehicle, POI) bounded state, no full-sequence reduction needed). Refresh the 'Sibling parity references' section to point at the current state of the Flink and Kafka work — Flink's per-tier wiring infrastructure under org.mobilitydb.flink.meos.wirings (5 generic classes covering 100% of the streamable surface) and Kafka's codegen mirror under org.mobilitydb.kafka.meos. Drops stale PR-number references per the same as-is / no-internal-process discipline applied elsewhere in the ecosystem docs. Stacks on PR MobilityDB#17. Docs-only; touches no YAML, no C++ pipeline-layer file.
The PAIR_MEETING aggregation (added in MobilityDB#17) hardcoded the meeting-distance threshold at 200 m via a static constexpr DMEET_METRES, with the PR body noting parameterization as future work. This PR lands that future work: PAIR_MEETING now takes a fifth argument — a numeric constant in metres — and the physical operator uses it per-query. ## Surface PAIR_MEETING(lon, lat, ts, vehicle_id, dMeet) ^^^^^ new fifth arg (numeric constant, metres) The first four args remain FieldAccess (lon, lat, ts, vehicle_id); the fifth is pulled from the parser's constantBuilder as a numeric literal, parsed via std::stod, and threaded through the logical→physical lowering chain into the lower() lambda alongside the existing state pointers. ## Files (9, all stacked on MobilityDB#18 → MobilityDB#17 → MobilityDB#16 → MobilityDB#15) | Layer | File | |---|---| | Physical .hpp | PairMeetingAggregationPhysicalFunction.hpp — `DMEET_METRES` constexpr → `DEFAULT_DMEET_METRES` + instance field `dMeetMetres` | | Physical .cpp | PairMeetingAggregationPhysicalFunction.cpp — constructor takes dMeet; lower() passes it to the captureless lambda via `nautilus::val<double>` | | Logical .hpp | PairMeetingAggregationLogicalFunction.hpp — constructor + create() factory take dMeet; getter `getDMeetMetres()` | | Logical .cpp | PairMeetingAggregationLogicalFunction.cpp — initialize field; Registrar deserialize path uses DEFAULT_DMEET_METRES (see Serde caveat below) | | Parser | AntlrSQLQueryPlanCreator.cpp — both PAIR_MEETING dispatch sites (lexer-token case + funcName string-name case) extract the constant from constantBuilder, std::stod it, pass to create() | | Lowering | LowerToPhysicalWindowedAggregation.cpp — pmDescriptor->getDMeetMetres() flows to the physical constructor | | YAMLs (×3) | Queries/berlinmod/q5_continuous.yaml, q5_snapshot.yaml, q5_windowed.yaml — add `, 200.0` as the explicit fifth arg; comments updated to reflect the parameterization | ## Serde round-trip caveat (out of scope for this PR) `AggregationLogicalFunctionRegistryArguments` is strongly typed to `vector<FieldAccessLogicalFunction>` — there is no slot for a numeric constant in the existing Registrar interface, and `SerializableAggregationFunction` has no proto field for it either. As a result: - The parser path (live query execution) is FULLY parameterized — dMeet flows from SQL to physical correctly. - The Serde deserialize path falls back to DEFAULT_DMEET_METRES (preserves the 200 m scaffold behaviour). Round-trip fidelity for the dMeet value requires (a) adding a new field to SerializableAggregationFunction.proto, (b) extending AggregationLogicalFunctionRegistryArguments to carry it, and (c) threading both through Serialize/Register. That's an infrastructure change touching every registered aggregation; tracked as a follow-up. ## Build / test verification Cannot compile-verify locally — NebulaStream needs the full C++23 + vcpkg toolchain. Submitted for maintainer build verification (cc @marianaGarcez). Expected to compile cleanly; the only construction-time behaviour change is the constructor signature (5 params → 6 params for physical, 5 → 6 for logical create/ctor); the only runtime behaviour change is that dMeet is now read from the instance field instead of the class constexpr (the lambda receives it via the nautilus::val<double> extra arg). ## Mirrors the CROSS_DISTANCE shape CROSS_DISTANCE (also added by MobilityDB#17, hardcoded VID_A=100, VID_B=200) has the exact same parameterization pattern; a sibling PR can apply the same change with (lon, lat, ts, vid, vid_a, vid_b) — 6 args total instead of 5. Holding for separate PR.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements the "Future PR can parameterize DMEET via a constant input to the aggregation" future-work item from PR #17 (
PAIR_MEETINGbody, headerPairMeetingAggregationPhysicalFunction.hpp).cc @marianaGarcez — this is a maintainer-build-verification PR; my environment can't compile NebulaStream (full C++23 + vcpkg toolchain needed). Submitting the spike with explicit Serde caveat (below) for your iteration.
Surface change
The first four args remain
FieldAccess(lon, lat, ts, vehicle_id); the fifth is a numeric constant pulled from the parser'sconstantBuilder, parsed viastd::stod, and threaded through the logical → lowering → physical chain into thelower()lambda's argument list (alongside the existingvehicleMapPtrandpairsBuffer) vianautilus::val<double>.9 files modified (across the 4 NebulaStream pipeline layers + 3 YAML scaffolds)
nes-physical-operators/include/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.hppstatic constexpr DMEET_METRES→static constexpr DEFAULT_DMEET_METRES+ instancedouble dMeetMetresfield; constructor takes itnes-physical-operators/src/Aggregation/Function/Meos/PairMeetingAggregationPhysicalFunction.cppdMeetMetres;lower()passesnautilus::val<double>(dMeetMetres)to the captureless lambda which uses it ingeog_dwithin(ggA, ggB, dMeet, true)nes-logical-operators/include/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.hppcreate()factory takedouble dMeetMetres; gettergetDMeetMetres()nes-logical-operators/src/Operators/Windows/Aggregations/Meos/PairMeetingAggregationLogicalFunction.cppDEFAULT_DMEET_METRES(see Serde caveat)nes-sql-parser/src/AntlrSQLQueryPlanCreator.cppPAIR_MEETINGdispatch sites (lexer-token case + funcName string-name case) — extract constant fromconstantBuilder,std::stodit, pass tocreate()nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpppmDescriptor->getDMeetMetres()→ physical constructorQueries/berlinmod/q5_continuous.yaml,q5_snapshot.yaml,q5_windowed.yaml, 200.0as the explicit fifth arg; comments updated to reflect parameterizationYAML parse-validated locally (
python3 -c "import yaml; yaml.safe_load(...)"on each). No C++ compile verification possible — see below.Serde round-trip caveat (deliberately out of scope)
AggregationLogicalFunctionRegistryArgumentsis strongly typed tovector<FieldAccessLogicalFunction>— there is no slot for a numeric constant in the Registrar interface, andSerializableAggregationFunction(the proto) has no field for it either. As a result:DEFAULT_DMEET_METRES(200 m, the previous hardcode). Round-trip fidelity for the dMeet value requires:SerializableAggregationFunction.protoAggregationLogicalFunctionRegistryArgumentsto carry the constantSerialize()/Register…()in the logical .cppThis is an infrastructure change touching the Registrar interface (which every NebulaStream aggregation registers through) and the proto schema (which has its own regeneration step). Out of scope for this spike; would be a coherent follow-up if cross-process query-plan serialization for the parameterized dMeet is needed.
Mirrors CROSS_DISTANCE
CROSS_DISTANCE(also added in #17, with hardcodedVID_A=100,VID_B=200) has the exact same parameterization pattern — the only differences are: 2 constants instead of 1, integer instead of double, and the parser+lowering need to extract both. Holding for a sibling PR with the same shape.What needs maintainer attention
nautilus::val<double>passing pattern in the lambda — I extrapolated from the existingnautilus::val<size_t>(0)pattern in the same file, but I'm uncertain whethernautilus::val<double>is the correct val type for adoublelambda arg or whether the lambda signature needsnautilus::val<double>parameters instead of plaindouble. The build will tell.Stacks on PR #18 (docs overlay) → PR #17 (PAIR_MEETING + CROSS_DISTANCE addition) → PR #16 (TEMPORAL_LENGTH) → PR #15 (scaffold).