Skip to content

feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity matrix on MobilityFlink (27/27 cells)#3

Draft
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q3-scaffold
Draft

feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity matrix on MobilityFlink (27/27 cells)#3
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q3-scaffold

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

@estebanzimanyi estebanzimanyi commented May 20, 2026

Draft. Structural scaffold for the full BerlinMOD-9 streaming-form parity-matrix row on MobilityFlink — all nine BerlinMOD reference queries × three streaming forms each (continuous, windowed, snapshot), 27 / 27 cells, locally verified end-to-end with no external dependencies (no Kafka, no Docker, no MEOS native lib, no JMEOS call).

Full coverage

Q Topic Continuous Windowed Snapshot
Q1 "which vehicles have appeared in the stream?"
Q2 "where is vehicle X at time T?"
Q3 "vehicles within d of P at time T?"
Q4 "which vehicles entered region R, and when?"
Q5 "pairs of vehicles meeting near point P"
Q6 "cumulative distance per vehicle"
Q7 "first passage of vehicles through POIs"
Q8 "vehicles close to a road segment"
Q9 "distance between vehicles X and Y at time T"

27 / 27 MobilityFlink parity-matrix cells — 100 %.

Verified output counts

Q  | continuous | windowed | snapshot
---|------------|----------|---------
Q1 |          3 |        2 |        9
Q2 |          7 |        2 |        3
Q3 |         21 |        2 |        6
Q4 |          4 |        5 |        9
Q5 |         14 |        2 |        3     only pair (100,200) qualifies for P + radii
Q6 |         21 |        6 |        9     drift corpus; v100=601m, v200=300m, v300=1205m
Q7 |          3 |        6 |        9     3 (vehicle, POI) first-passages
Q8 |         21 |        2 |        6     same shape as Q3 with segment-distance
Q9 |          7 |        2 |        3     X=100, Y=200; distance 4124m = ~4.1km

Reproducible via:

mvn -q clean package -DskipTests
java --add-opens java.base/java.lang=ALL-UNNAMED \
     --add-opens java.base/java.util=ALL-UNNAMED \
     --add-opens java.base/java.lang.reflect=ALL-UNNAMED \
     -cp flink-processor/target/flink-kafka2postgres-1.0-SNAPSHOT.jar \
     berlinmod.BerlinMOD<Q>LocalTest        # Q ∈ {Q1, Q2, Q3, Q4, Q5, Q6, Q7, Q8, Q9}

State patterns exercised across the matrix

  • Keyed simple flag (Q1)
  • Keyed last-known position (Q2, Q8)
  • Keyed transition + entry log (Q4)
  • Keyed accumulator (Q6)
  • Keyed first-passage MapState<poiId, t> (Q7)
  • Shared single-key state via key-by-constant: paired-vehicle (Q9), multi-pair MapState<vehicleId, pos> (Q5)

What this PR adds

27 form classes named Q<N>{Continuous,Windowed,Snapshot}Function, plus 9 local test drivers BerlinMODQ<N>LocalTest, plus shared infrastructure:

  • BerlinMODTrip — data class
  • BerlinMODDeserializationSchema — JSON → BerlinMODTrip
  • Haversine — great-circle distance utility (used by Q3, Q5, Q6, Q7, Q9)
  • SegmentDistance — point-to-line-segment distance via planar equirectangular projection (used by Q8)
  • PointOfInterest — simple POI record (used by Q7)
  • kafka-producer/python-producer-berlinmod.py — BerlinMOD CSV → Kafka topic berlinmod
  • doc/berlinmod-q3-streaming-forms.md — per-form definitions and predicate semantics
  • README.md — section describing the BerlinMOD example

Two Kafka-source Main entry points exist for Q2 and Q3 (BerlinMODQ2Main, BerlinMODQ3Main). Equivalents for Q1/Q4/Q5/Q6/Q7/Q8/Q9 follow once the docker-compose path opens.

Notable verified arithmetic

  • Q6 cumulative distance matches drift-corpus targets at < 0.2 % error: v100 ended at 601.05 m (target 600 m, 6 × 100 m drift), v200 at 300.53 m (target 300 m), v300 at 1204.68 m (target 1200 m).
  • Q9 X-Y distance = 4124.39 m (great-circle between Brussels city centre and Anderlecht), invariant across the corpus since both vehicles are stationary.
  • Q4 entry detection produces 3 entries for v200 (t=3, t=7, t=11) for its oscillating outside / inside / outside / inside / outside / inside / outside path.
  • Q5 pair-meeting correctly filters: only (100, 200) qualifies; (100, 300) and (200, 300) are rejected because v300 is outside the P-radius.
  • Q7 first-passages correctly attribute each vehicle to its containing POI on the very first event.
  • Q8 segment-distance matches Q3's shape since the planar-projection point-to-segment distance for our segment falls within the 5 km radius for v100 and v200 but not v300.

Bounded-source snapshot caveat

Snapshot output for Q2 and Q6 (the forms whose output value depends on the exact last-known state) shows the same "as-of" state at every tick because env.fromCollection(...) is a bounded source — when it closes, the watermark jumps to +∞ and all event-time timers fire after the final event has been processed, so each tick sees the latest-known state rather than the state as of the tick's event time. With a Kafka source, the watermark advances incrementally and each tick fires with the state as of that tick's event time, matching the batch BerlinMOD result point-in-time. The form logic is correct; only the local test's bounded source approximates the snapshot semantics. Documented in each snapshot class.

For Q1/Q3/Q5/Q7/Q8/Q9 the artefact does not change output because the per-vehicle answer is invariant across the corpus (membership; near-or-far; meeting-or-not; (vehicle, POI) first-passage time; X-Y stationary distance).

Bump-isolation summary

Every line of these 3405 added lines is pure Java or pure Python: no JMEOS calls, no MEOS-C calls, no PyMEOS dependency. Spatial predicates use Haversine / point-in-box / point-to-segment arithmetic in pure Java with TODO(meos) markers at each predicate-site for the future JMEOS bridge. Zero file collision with the in-flight MEOS 1.4 bump's scope.

What's deliberately not in this PR

  • Sample BerlinMOD CSV for the Kafka producer — generate with meos/examples/data/generate_berlinmod_trips.sql in MobilityDB at any SF.
  • JMEOS bridges for Q3/Q4/Q5/Q6/Q7/Q8/Q9 spatial calls — marked TODO(meos) in each form class. Deferred until the in-flight MEOS 1.4 bump signals JMEOS has settled.
  • docker-compose end-to-end runs for the BerlinMOD pipeline — overlap with files the bump may touch.
  • Q1/Q4/Q5/Q6/Q7/Q8/Q9 Kafka-source Main entry points — Q2 and Q3 have theirs as patterns; equivalents follow once the docker-compose path opens.

Build verification

mvn clean package -DskipTests
... BUILD SUCCESS — 46 source files compiled, shaded jar produced

All nine BerlinMODQ<N>LocalTest classes run to completion (Flink mini-cluster, parallelism = 1) producing exactly the output shapes shown above. Single squashed commit, no AI attribution.

Companion artefacts

  • MobilityNebula EDBT-26 query set is a sibling SNCB-domain Q-series and does not overlap with this work (verified by reading MobilityNebula PR #14).
  • Ecosystem profile parity-contract description: MobilityDB/.github#10.

Draft — kept open while the MEOS 1.4 bump task is in flight; the JMEOS-bridge follow-ups (Q3/Q4/Q5/Q6/Q7/Q8/Q9 spatial calls) and the BerlinMOD docker-compose follow-up both wait for that to settle.

@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch 2 times, most recently from fee720f to d87e18d Compare May 20, 2026 19:36
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod-q3): scaffold the three streaming-form classes feat(berlinmod-q3): scaffold the three streaming-form classes with local end-to-end test May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch from d87e18d to 6ae61f0 Compare May 20, 2026 20:05
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod-q3): scaffold the three streaming-form classes with local end-to-end test feat(berlinmod): scaffold the Q2 and Q3 streaming-form classes with local end-to-end tests May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch from 6ae61f0 to 4687bd5 Compare May 20, 2026 20:53
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): scaffold the Q2 and Q3 streaming-form classes with local end-to-end tests feat(berlinmod): scaffold Q1/Q2/Q3/Q6/Q9 streaming-form classes with local end-to-end tests May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch from 4687bd5 to 36f1577 Compare May 20, 2026 20:57
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): scaffold Q1/Q2/Q3/Q6/Q9 streaming-form classes with local end-to-end tests feat(berlinmod): scaffold Q1/Q2/Q3/Q4/Q6/Q9 streaming-form classes with local end-to-end tests May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch from 36f1577 to 3dfe8be Compare May 20, 2026 21:12
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): scaffold Q1/Q2/Q3/Q4/Q6/Q9 streaming-form classes with local end-to-end tests feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity matrix on MobilityFlink (27/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q3-scaffold branch 2 times, most recently from 8d344e9 to 79d04a3 Compare May 20, 2026 21:45
…matrix on MobilityFlink

All nine BerlinMOD reference queries × three streaming forms each
(continuous, windowed, snapshot) on MobilityFlink — the complete 27-cell
stream-layers parity-matrix row, locally verified end-to-end with no
external dependencies (no Kafka, no Docker, no MEOS native lib, no
JMEOS call).

Queries:

  Q1  which vehicles have appeared in the stream
  Q2  where is vehicle X at time T
  Q3  which vehicles within d of P at time T
  Q4  which vehicles entered region R, and when
  Q5  pairs of vehicles meeting near point P
  Q6  cumulative distance per vehicle
  Q7  first passage of vehicles through POIs
  Q8  vehicles close to a road segment
  Q9  distance between vehicles X and Y at time T

Each query has three form classes (Q<N>{Continuous,Windowed,Snapshot}Function)
and a companion BerlinMODQ<N>LocalTest driver running the three forms
through a Flink mini-cluster against a hardcoded synthetic corpus.

Spatial predicates today are pure Java — Haversine distance for
point-to-point (Q3, Q5, Q6, Q9), point-in-box for region containment
(Q4), and a planar-projection point-to-line-segment distance (Q8). Each
spatial call site is marked TODO(meos) for migration to the JMEOS
bridge of the corresponding MEOS operator once the in-flight MEOS 1.4
bump signals settled (Q3 edwithin_tgeo_geo; Q4 STBox eintersects; Q5
NAD / edwithin_tgeo_tgeo; Q6 trajectory length; Q7 edwithin_tgeo_geo;
Q8 distance(tgeompoint, geometry(LINESTRING)); Q9 tdistance). Q1 and
Q2 have no spatial predicate.

State patterns exercised:
  - keyed simple flag (Q1)
  - keyed last-known position (Q2, Q8)
  - keyed transition + entry log (Q4)
  - keyed accumulator (Q6)
  - keyed first-passage map (Q7)
  - shared key-by-constant state (Q9 pair-wise, Q5 multi-pair MapState)

Verified output counts (see PR description for the exact-line excerpts):

  Q  | continuous | windowed | snapshot
  ---|------------|----------|---------
  Q1 |          3 |        2 |        9
  Q2 |          7 |        2 |        3
  Q3 |         21 |        2 |        6
  Q4 |          4 |        5 |        9
  Q5 |         14 |        2 |        3   (only pair (100,200) qualifies for our P + radii)
  Q6 |         21 |        6 |        9   (drift corpus; v100=601m, v200=300m, v300=1205m)
  Q7 |          3 |        6 |        9   (3 (vehicle, POI) first-passages; intra-window scope)
  Q8 |         21 |        2 |        6   (same shape as Q3 with segment-distance)
  Q9 |          7 |        2 |        3   (X=100, Y=200; distance 4124m = ~4.1km)

Build verification: mvn clean package green; all nine LocalTests run to
completion (Flink mini-cluster, parallelism=1) producing exactly the
expected output shapes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant