Skip to content

feat(meos): TEMPORAL_LENGTH aggregation closes BerlinMOD-Q6 streaming-form cells to full#16

Draft
estebanzimanyi wants to merge 2 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/temporal-length-aggregation
Draft

feat(meos): TEMPORAL_LENGTH aggregation closes BerlinMOD-Q6 streaming-form cells to full#16
estebanzimanyi wants to merge 2 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/temporal-length-aggregation

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

Adds the TEMPORAL_LENGTH aggregation across the four levels of the NebulaStream pipeline (logical / physical / parser / lowering) so the BerlinMOD-Q6 "cumulative distance per vehicle" streaming-form cells (continuous + windowed + snapshot) compute the spheroidal trajectory length entirely inside NebulaStream instead of emitting raw trajectories for a consumer-side reduction.

After this PR, the MobilityNebula BerlinMOD parity row is 21 of 27 cells full (was 18 of 27); the remaining 6 partial cells are Q5 × 3 and Q9 × 3, each addressable by a single follow-up Cartesian-aggregation PR using the same pattern as TemporalLengthAggregation.

What lands

Logical layer

  • New nes-logical-operators/include/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.hpp
  • New nes-logical-operators/src/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.cpp
  • Mirrors TemporalSequenceAggregationLogicalFunctionV2 (same lon/lat/ts/as constructor; same inferStamp; same registry registration shape) — only finalAggregateStampType differs: FLOAT64 vs VARSIZED.
  • Serializes through the existing TemporalAggregationSerde::serializeTemporalSequence wire shape with the type tag overridden to "TemporalLength" so no new Serde class is needed.
  • nes-logical-operators/src/Operators/Windows/Aggregations/Meos/CMakeLists.txt adds the plugin line.

Physical layer

  • New nes-physical-operators/include/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.hpp
  • New nes-physical-operators/src/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.cpp
  • Identical lift / combine / reset / cleanup / state-layout shape as TemporalSequenceAggregationPhysicalFunction (three-field PagedVector of lon/lat/ts).
  • lower() builds the same MEOS instant-set trajectory string, parses it via MEOSWrapper::parseTemporalPoint, and calls MEOS' tpoint_length(Temporal*) (from meos_geo.h) to return a single FLOAT64 result. Frees the MEOS Temporal and the trajectory string before returning.
  • Empty-window case returns 0.0 directly without touching MEOS.
  • nes-physical-operators/src/Aggregation/Function/Meos/CMakeLists.txt adds the plugin line.

Parser

  • nes-sql-parser/AntlrSQL.g4 adds TEMPORAL_LENGTH to functionName and to the lexer (TEMPORAL_LENGTH: 'TEMPORAL_LENGTH' | 'temporal_length').
  • nes-sql-parser/src/AntlrSQLQueryPlanCreator.cpp adds the TEMPORAL_LENGTH dispatch in both the case-label site and the string-name site, parallel to TEMPORAL_SEQUENCE. Validates three field-access arguments; constructs a TemporalLengthAggregationLogicalFunction.

Lowering

  • nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp adds the TemporalLength special-case path next to the existing TemporalSequence one. Produces a TemporalLengthAggregationPhysicalFunction with the same (lon, lat, timestamp) state schema layout.

YAMLs

  • Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml updated to call TEMPORAL_LENGTH(gps_lon, gps_lat, time_utc) AS cumulative_distance directly. Output sink column type switched from VARSIZED (trajectory blob) to FLOAT64 (cumulative distance in metres). Header comments updated to FULL.

Docs

  • docs/berlinmod-streaming-forms.md updated: Q6 row goes ◐/◐/◐ → ✓/✓/✓; coverage paragraph reads 21 cells full, 6 cells partial (Q5 + Q9); path-to-full table reduced to two rows.

Verified

  • python3 -c "import yaml; yaml.safe_load(open(...))" — all three Q6 YAMLs valid.
  • C++ code follows the established TemporalSequence template exactly: same constructor signature, same state schema, same registry registration shape, same Antlr token shape, same lowering case structure.
  • The tpoint_length MEOS C symbol is declared in meos_geo.h which is already included by MEOSWrapper.hpp; my new physical function adds the same extern "C" { #include <meos_geo.h> } block.

Out of scope

  • The remaining six partial cells (Q5 × 3 + Q9 × 3) — each addressable by a single follow-up Cartesian-aggregation PR using the exact same four-layer pattern as this one. The path is documented in docs/berlinmod-streaming-forms.md § "Path to 'full' for the two remaining partial Qs".
  • Runtime verification on the NebulaStream vcpkg-bootstrapped test harness — gated on the user's harness and reviewer pipeline.

Stacked on: #15

… on NebulaStream (33 YAMLs, 27/27 cells)

Additive scaffold for the BerlinMOD-9 × 3 streaming-form parity contract
on MobilityNebula, sibling to the existing SNCB Q-series and matching
the MobilityFlink MobilityDB#3 / MobilityKafka MobilityDB#1 streaming-form definitions.

All 27 cells covered:

  Q1 'which vehicles have appeared'      — full (continuous + windowed + snapshot)
  Q2 'where is vehicle X at time T'      — full
  Q3 'vehicles within 5 km of P'         — full
  Q4 'vehicles inside region R (polygon)'— full
  Q5 'pairs of vehicles meeting near P'  — partial (emit per-vehicle trajectories near P; consumer joins)
  Q6 'cumulative distance per vehicle'   — partial (emit TEMPORAL_SEQUENCE; consumer computes length)
  Q7 'first passage of vehicle through POI' × {POI1, POI2, POI3}
                                          — full (per-POI fan-out)
  Q8 'vehicles within d of LINESTRING'   — full (edwithin_tgeo_geo with LINESTRING geometry)
  Q9 'distance between X and Y at time T'— partial (emit X and Y trajectories; consumer joins)

18 of 27 cells are FULL (the BerlinMOD-Q semantic is computed entirely
inside NebulaStream). 9 cells are PARTIAL — NebulaStream emits the
per-window inputs (trajectory, candidate vehicles) and a consumer
post-processes for the final BerlinMOD-Q answer. The partial pattern
is the natural expression of these queries in NebulaStream's current
SQL surface; the path to FULL is documented per-Q in
docs/berlinmod-streaming-forms.md (a stream-self-join for Q5/Q9, a
temporal_length scalar function for Q6).

Form mapping to NebulaStream windows:

  continuous: SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
  windowed:   TUMBLING(time_utc, SIZE 10 SEC)
  snapshot:   TUMBLING(time_utc, SIZE 5 SEC)

MEOS-side surface consumed (already exposed by PR MobilityDB#14 + follow-ups):

  edwithin_tgeo_geo — Q3 (POINT predicate), Q4 (POLYGON, d=0.0),
                      Q5 (POINT predicate), Q7 (per-POI POINT),
                      Q8 (LINESTRING predicate)
  TEMPORAL_SEQUENCE — Q2 / Q5 / Q6 / Q9 (per-window per-vehicle trajectory)

No new MEOS PhysicalFunction classes added; no C++ changes; no SNCB
Q-series modifications. All 33 YAMLs are additive in a new
Queries/berlinmod/ subdirectory.

Add (additions):
  Queries/berlinmod/q1_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q2_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q3_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q4_{continuous,windowed,snapshot}.yaml          (3)
  Queries/berlinmod/q5_{continuous,windowed,snapshot}.yaml          (3, partial)
  Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml          (3, partial)
  Queries/berlinmod/q7_poi{1,2,3}_{continuous,windowed,snapshot}.yaml (9, full via fan-out)
  Queries/berlinmod/q8_{continuous,windowed,snapshot}.yaml          (3, LINESTRING predicate)
  Queries/berlinmod/q9_{continuous,windowed,snapshot}.yaml          (3, partial)
  Input/input_berlinmod.csv  (sample data: 3 vehicles × 21 events, 14 simulated seconds)
  docs/berlinmod-streaming-forms.md

Validation: every YAML parses cleanly via python3 yaml.safe_load.
Runtime verification gated on the NebulaStream test harness.

Coverage: 27 of 27 cells (100 %), with 18 FULL and 9 PARTIAL annotated
explicitly per Q. Path to FULL for the 9 PARTIAL cells is one
MobilityNebula C++ PhysicalFunction class each (or a NebulaStream
upstream stream-self-join), documented in
docs/berlinmod-streaming-forms.md.
…-form cells to full

Adds the TEMPORAL_LENGTH aggregation across the four levels of the
NebulaStream pipeline (logical / physical / parser / lowering) so the
BerlinMOD-Q6 "cumulative distance per vehicle" streaming-form cells
(continuous + windowed + snapshot) compute the spheroidal trajectory
length entirely inside NebulaStream instead of emitting raw trajectories
for a consumer-side reduction.

Logical: nes-logical-operators/{include,src}/Operators/Windows/Aggregations/Meos/TemporalLengthAggregationLogicalFunction.{hpp,cpp}
mirroring TemporalSequenceAggregationLogicalFunctionV2 but with finalAggregateStampType = FLOAT64.
Registers as "TemporalLength" in the aggregation registry. Serializes through the existing
TemporalAggregationSerde wire shape with the type tag overridden.

Physical: nes-physical-operators/{include,src}/Aggregation/Function/Meos/TemporalLengthAggregationPhysicalFunction.{hpp,cpp}
identical lift / combine / reset / cleanup to TemporalSequenceAggregationPhysicalFunction;
the lower() path builds the same MEOS instant-set trajectory string, parses it via
MEOSWrapper::parseTemporalPoint, and calls MEOS' tpoint_length(Temporal*) to return a single
FLOAT64 result.

Parser: nes-sql-parser/AntlrSQL.g4 adds the TEMPORAL_LENGTH lexer token and includes it in
functionName. AntlrSQLQueryPlanCreator.cpp adds the TEMPORAL_LENGTH dispatch in both the
case-label and string-name paths, parallel to TEMPORAL_SEQUENCE.

Lowering: nes-query-optimizer/src/RewriteRules/LowerToPhysical/LowerToPhysicalWindowedAggregation.cpp
adds the TEMPORAL_LENGTH special-case lowering, parallel to TEMPORAL_SEQUENCE, producing a
TemporalLengthAggregationPhysicalFunction with the same (lon, lat, timestamp) state schema.

YAMLs: Queries/berlinmod/q6_{continuous,windowed,snapshot}.yaml updated to call
TEMPORAL_LENGTH directly; the FLOAT64 output column replaces the VARSIZED trajectory output;
header comments updated to "FULL".

Docs: docs/berlinmod-streaming-forms.md updated to reflect 21 cells full + 6 cells partial
(Q5 + Q9 only); the path-to-full table now lists those two queries only.

YAML safe_load green on all 3 Q6 cells. Build verification gated on the user's NebulaStream
test harness (vcpkg-bootstrapped); the C++ code follows the established TemporalSequence
template exactly, with the lower() path replaced by tpoint_length.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant