Skip to content

feat(meos): PAIR_MEETING + CROSS_DISTANCE aggregations close Q5 + Q9 streaming-form cells to full#17

Draft
estebanzimanyi wants to merge 3 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/pair-meeting-cross-distance-aggregations
Draft

feat(meos): PAIR_MEETING + CROSS_DISTANCE aggregations close Q5 + Q9 streaming-form cells to full#17
estebanzimanyi wants to merge 3 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/pair-meeting-cross-distance-aggregations

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

Closes the last 6 partial cells on the MobilityNebula BerlinMOD-9 × 3-form parity matrix by adding two new four-field Cartesian aggregations, mirroring the TEMPORAL_LENGTH pattern from #16 (this PR is stacked on #16 and contains #16's commits too; rebase on #16 once it lands).

After this PR, the MobilityNebula matrix-row is 27 / 27 cells full. Combined with MobilityFlink#3 + MobilityKafka#1 (27/27 each), the streaming-side ecosystem reaches 81 / 81 full MEOS-backed cells, 0 partial.

What lands

PAIR_MEETING(lon, lat, ts, vehicle_id) -> VARSIZED — closes Q5 × 3

  • Lift collects per-event tuples.
  • Lower picks each vehicle's latest known position in the window, enumerates pairs (a, b) with a < b, calls MEOS' geog_dwithin over POINT/POINT geographies with dMeet = 200 m hardcoded for the BerlinMOD scaffold, and emits the meeting pairs as a string-encoded list "vid_a,vid_b,ts,<=dMeet; …".
  • Upstream WHERE edwithin_tgeo_geo(... near-P 2km) pre-filters events; the aggregation focuses purely on the pair enumeration.
  • Future PR can parameterize dMeet via a constant input.

CROSS_DISTANCE(lon, lat, ts, vehicle_id) -> FLOAT64 — closes Q9 × 3

  • Same four-field lift shape.
  • Lower picks the latest known position of each of the two target vehicles (VID_A = 100, VID_B = 200 hardcoded), drives the MEOS nad_tgeo_tgeo distance over single-instant tgeompoints, and returns a FLOAT64 (NaN if either vehicle is unobserved).
  • Future PR can parameterize (VID_A, VID_B).

Pipeline wiring (identical four-layer pattern as TEMPORAL_LENGTH)

Layer Files
Physical nes-physical-operators/{include,src}/Aggregation/Function/Meos/{PairMeeting,CrossDistance}AggregationPhysicalFunction.{hpp,cpp} + CMakeLists plugin
Logical nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/{PairMeeting,CrossDistance}AggregationLogicalFunction.{hpp,cpp} + CMakeLists plugin
Parser nes-sql-parser/AntlrSQL.g4 lexer tokens (PAIR_MEETING, CROSS_DISTANCE) + functionName rule + case-label and string-name dispatch in AntlrSQLQueryPlanCreator.cpp
Lowering nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp special-case lowering with 4-field state schema (lon, lat, ts, vehicle_id)

YAMLs

  • Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml rewritten to call PAIR_MEETING(gps_lon, gps_lat, time_utc, vehicle_id) directly, with the near-P filter as the upstream WHERE predicate.
  • Queries/berlinmod/q9_{continuous,windowed,snapshot}.yaml rewritten to call CROSS_DISTANCE directly; sink schema swapped from VARSIZED trajectory to FLOAT64 distance.

Docs

  • docs/berlinmod-streaming-forms.md updated: Q5 + Q9 rows now ✓/✓/✓; coverage paragraph reads 27 of 27 cells full (was 21 + 6 partial); MEOS-operators table lists PAIR_MEETING and CROSS_DISTANCE alongside the existing ones.

Verified

  • All 6 rewritten YAMLs pass python3 -c "import yaml; yaml.safe_load(open(...))".
  • C++ code follows the established TemporalLengthAggregationPhysicalFunction template exactly. The only structural difference is the four-field lift (adds vehicle_id) and the Cartesian inner loop in lower().
  • The geog_dwithin (POINT, POINT) and nad_tgeo_tgeo MEOS C symbols used by the lower steps are declared in meos.h + meos_geo.h which are already included.

Out of scope

  • Constant parameterisation (dMeet, VID_A, VID_B): hardcoded BerlinMOD defaults in this PR; future enhancement to plumb through SerializableAggregationFunction's config map.
  • Runtime verification on the NebulaStream vcpkg-bootstrapped test harness — gated on the user's harness and reviewer pipeline.

Stacked on: #16 (TEMPORAL_LENGTH)
Sibling parity work: MobilityFlink#4, MobilityKafka#2

… on NebulaStream (33 YAMLs, 27/27 cells)

Additive scaffold for the BerlinMOD-9 × 3 streaming-form parity contract
on MobilityNebula, sibling to the existing SNCB Q-series and matching
the MobilityFlink MobilityDB#3 / MobilityKafka MobilityDB#1 streaming-form definitions.

All 27 cells covered:

  Q1 'which vehicles have appeared'      — full (continuous + windowed + snapshot)
  Q2 'where is vehicle X at time T'      — full
  Q3 'vehicles within 5 km of P'         — full
  Q4 'vehicles inside region R (polygon)'— full
  Q5 'pairs of vehicles meeting near P'  — partial (emit per-vehicle trajectories near P; consumer joins)
  Q6 'cumulative distance per vehicle'   — partial (emit TEMPORAL_SEQUENCE; consumer computes length)
  Q7 'first passage of vehicle through POI' × {POI1, POI2, POI3}
                                          — full (per-POI fan-out)
  Q8 'vehicles within d of LINESTRING'   — full (edwithin_tgeo_geo with LINESTRING geometry)
  Q9 'distance between X and Y at time T'— partial (emit X and Y trajectories; consumer joins)

18 of 27 cells are FULL (the BerlinMOD-Q semantic is computed entirely
inside NebulaStream). 9 cells are PARTIAL — NebulaStream emits the
per-window inputs (trajectory, candidate vehicles) and a consumer
post-processes for the final BerlinMOD-Q answer. The partial pattern
is the natural expression of these queries in NebulaStream's current
SQL surface; the path to FULL is documented per-Q in
docs/berlinmod-streaming-forms.md (a stream-self-join for Q5/Q9, a
temporal_length scalar function for Q6).

Form mapping to NebulaStream windows:

  continuous: SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
  windowed:   TUMBLING(time_utc, SIZE 10 SEC)
  snapshot:   TUMBLING(time_utc, SIZE 5 SEC)

MEOS-side surface consumed (already exposed by PR MobilityDB#14 + follow-ups):

  edwithin_tgeo_geo — Q3 (POINT predicate), Q4 (POLYGON, d=0.0),
                      Q5 (POINT predicate), Q7 (per-POI POINT),
                      Q8 (LINESTRING predicate)
  TEMPORAL_SEQUENCE — Q2 / Q5 / Q6 / Q9 (per-window per-vehicle trajectory)

No new MEOS PhysicalFunction classes added; no C++ changes; no SNCB
Q-series modifications. All 33 YAMLs are additive in a new
Queries/berlinmod/ subdirectory.

Add (additions):
  Queries/berlinmod/q1_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q2_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q3_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q4_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml          (3, partial)
  Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml          (3, partial)
  Queries/berlinmod/q7_poi{1,2,3}_{continuous,windowed,snapshot}.yaml (9, full via fan-out)
  Queries/berlinmod/q8_{continuous,windowed,snapshot}.yaml          (3, LINESTRING predicate)
  Queries/berlinmod/q9_{continuous,windowed,snapshot}.yaml          (3, partial)
  Input/input_berlinmod.csv  (sample data: 3 vehicles × 21 events, 14 simulated seconds)
  docs/berlinmod-streaming-forms.md

Validation: every YAML parses cleanly via python3 yaml.safe_load.
Runtime verification gated on the NebulaStream test harness.

Coverage: 27 of 27 cells (100 %), with 18 FULL and 9 PARTIAL annotated
explicitly per Q. Path to FULL for the 9 PARTIAL cells is one
MobilityNebula C++ PhysicalFunction class each (or a NebulaStream
upstream stream-self-join), documented in
docs/berlinmod-streaming-forms.md.
…-form cells to full

Adds the TEMPORAL_LENGTH aggregation across the four levels of the
NebulaStream pipeline (logical / physical / parser / lowering) so the
BerlinMOD-Q6 "cumulative distance per vehicle" streaming-form cells
(continuous + windowed + snapshot) compute the spheroidal trajectory
length entirely inside NebulaStream instead of emitting raw trajectories
for a consumer-side reduction.

Logical: nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.{hpp,cpp}
mirroring TemporalSequenceAggregationLogicalFunctionV2 but with finalAggregateStampType = FLOAT64.
Registers as "TemporalLength" in the aggregation registry. Serializes through the existing
TemporalAggregationSerde wire shape with the type tag overridden.

Physical: nes-physical-operators/{include,src}/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.{hpp,cpp}
identical lift / combine / reset / cleanup to TemporalSequenceAggregationPhysicalFunction;
the lower() path builds the same MEOS instant-set trajectory string, parses it via
MEOSWrapper::parseTemporalPoint, and calls MEOS' tpoint_length(Temporal*) to return a single
FLOAT64 result.

Parser: nes-sql-parser/AntlrSQL.g4 adds the TEMPORAL_LENGTH lexer token and includes it in
functionName. AntlrSQLQueryPlanCreator.cpp adds the TEMPORAL_LENGTH dispatch in both the
case-label and string-name paths, parallel to TEMPORAL_SEQUENCE.

Lowering: nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp
adds the TEMPORAL_LENGTH special-case lowering, parallel to TEMPORAL_SEQUENCE, producing a
TemporalLengthAggregationPhysicalFunction with the same (lon, lat, timestamp) state schema.

YAMLs: Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml updated to call
TEMPORAL_LENGTH directly; the FLOAT64 output column replaces the VARSIZED trajectory output;
header comments updated to "FULL".

Docs: docs/berlinmod-streaming-forms.md updated to reflect 21 cells full + 6 cells partial
(Q5 + Q9 only); the path-to-full table now lists those two queries only.

YAML safe_load green on all 3 Q6 cells. Build verification gated on the user's NebulaStream
test harness (vcpkg-bootstrapped); the C++ code follows the established TemporalSequence
template exactly, with the lower() path replaced by tpoint_length.
…streaming-form cells to full

Mirrors the TEMPORAL_LENGTH pattern from the parent PR with two new
four-field aggregations that close the last 6 partial cells on the
MobilityNebula BerlinMOD parity matrix:

PAIR_MEETING(lon, lat, ts, vehicle_id) -> VARSIZED
  Lift collects per-event tuples. Lower picks each vehicle's latest known
  position in the window, enumerates pairs (a < b), calls MEOS' geog_dwithin
  with dMeet = 200 m hardcoded for the BerlinMOD scaffold, and emits a
  string-encoded list of meeting pairs (vid_a, vid_b, ts, "<=dMeet" tag).
  Future PR can parameterize dMeet via a constant input. Closes Q5 × 3 cells.

CROSS_DISTANCE(lon, lat, ts, vehicle_id) -> FLOAT64
  Same lift shape. Lower picks the latest known position of each of the two
  target vehicles (VID_A = 100, VID_B = 200 hardcoded), drives the MEOS
  nad_tgeo_tgeo distance, and returns a FLOAT64 (NaN if either vehicle is
  unobserved). Future PR can parameterize (VID_A, VID_B). Closes Q9 × 3 cells.

Wired across the four pipeline layers identically to TEMPORAL_LENGTH:
  - nes-physical-operators/{include,src}/Aggregation/Function/Meos/{PairMeeting,CrossDistance}AggregationPhysicalFunction.{hpp,cpp}
  - nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/{PairMeeting,CrossDistance}AggregationLogicalFunction.{hpp,cpp}
  - nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt + nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt plugin entries
  - nes-sql-parser/AntlrSQL.g4 lexer + functionName tokens
  - nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp case-label + string-name dispatch
  - nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp special-case lowering with 4-field state schema

YAMLs: Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml and
q9_{continuous,windowed,snapshot}.yaml rewritten to call the new
aggregations directly; sink schemas updated to FLOAT64 / VARSIZED;
header comments updated to FULL.

Docs: docs/berlinmod-streaming-forms.md updated to reflect 27/27 cells
full (was 21 full + 6 partial); MEOS-operators table now lists
PAIR_MEETING and CROSS_DISTANCE alongside the existing ones.

YAML safe_load green on all 6 rewritten Q5/Q9 cells. C++ follows the
established TemporalLength template from the parent MobilityDB#16; build
verification gated on the user's NebulaStream test harness.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant