feat(berlinmod): scaffold the full BerlinMOD-9 streaming-form parity matrix on MobilityFlink (27/27 cells)#3
Draft
estebanzimanyi wants to merge 1 commit into
Conversation
fee720f to
d87e18d
Compare
d87e18d to
6ae61f0
Compare
6ae61f0 to
4687bd5
Compare
4687bd5 to
36f1577
Compare
36f1577 to
3dfe8be
Compare
8d344e9 to
79d04a3
Compare
…matrix on MobilityFlink
All nine BerlinMOD reference queries × three streaming forms each
(continuous, windowed, snapshot) on MobilityFlink — the complete 27-cell
stream-layers parity-matrix row, locally verified end-to-end with no
external dependencies (no Kafka, no Docker, no MEOS native lib, no
JMEOS call).
Queries:
Q1 which vehicles have appeared in the stream
Q2 where is vehicle X at time T
Q3 which vehicles within d of P at time T
Q4 which vehicles entered region R, and when
Q5 pairs of vehicles meeting near point P
Q6 cumulative distance per vehicle
Q7 first passage of vehicles through POIs
Q8 vehicles close to a road segment
Q9 distance between vehicles X and Y at time T
Each query has three form classes (Q<N>{Continuous,Windowed,Snapshot}Function)
and a companion BerlinMODQ<N>LocalTest driver running the three forms
through a Flink mini-cluster against a hardcoded synthetic corpus.
Spatial predicates today are pure Java — Haversine distance for
point-to-point (Q3, Q5, Q6, Q9), point-in-box for region containment
(Q4), and a planar-projection point-to-line-segment distance (Q8). Each
spatial call site is marked TODO(meos) for migration to the JMEOS
bridge of the corresponding MEOS operator once the in-flight MEOS 1.4
bump signals settled (Q3 edwithin_tgeo_geo; Q4 STBox eintersects; Q5
NAD / edwithin_tgeo_tgeo; Q6 trajectory length; Q7 edwithin_tgeo_geo;
Q8 distance(tgeompoint, geometry(LINESTRING)); Q9 tdistance). Q1 and
Q2 have no spatial predicate.
State patterns exercised:
- keyed simple flag (Q1)
- keyed last-known position (Q2, Q8)
- keyed transition + entry log (Q4)
- keyed accumulator (Q6)
- keyed first-passage map (Q7)
- shared key-by-constant state (Q9 pair-wise, Q5 multi-pair MapState)
Verified output counts (see PR description for the exact-line excerpts):
Q | continuous | windowed | snapshot
---|------------|----------|---------
Q1 | 3 | 2 | 9
Q2 | 7 | 2 | 3
Q3 | 21 | 2 | 6
Q4 | 4 | 5 | 9
Q5 | 14 | 2 | 3 (only pair (100,200) qualifies for our P + radii)
Q6 | 21 | 6 | 9 (drift corpus; v100=601m, v200=300m, v300=1205m)
Q7 | 3 | 6 | 9 (3 (vehicle, POI) first-passages; intra-window scope)
Q8 | 21 | 2 | 6 (same shape as Q3 with segment-distance)
Q9 | 7 | 2 | 3 (X=100, Y=200; distance 4124m = ~4.1km)
Build verification: mvn clean package green; all nine LocalTests run to
completion (Flink mini-cluster, parallelism=1) producing exactly the
expected output shapes.
79d04a3 to
07a9e4f
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Draft. Structural scaffold for the full BerlinMOD-9 streaming-form parity-matrix row on MobilityFlink — all nine BerlinMOD reference queries × three streaming forms each (continuous, windowed, snapshot), 27 / 27 cells, locally verified end-to-end with no external dependencies (no Kafka, no Docker, no MEOS native lib, no JMEOS call).
Full coverage
27 / 27 MobilityFlink parity-matrix cells — 100 %.
Verified output counts
Reproducible via:
State patterns exercised across the matrix
MapState<poiId, t>(Q7)MapState<vehicleId, pos>(Q5)What this PR adds
27 form classes named
Q<N>{Continuous,Windowed,Snapshot}Function, plus 9 local test driversBerlinMODQ<N>LocalTest, plus shared infrastructure:BerlinMODTrip— data classBerlinMODDeserializationSchema— JSON → BerlinMODTripHaversine— great-circle distance utility (used by Q3, Q5, Q6, Q7, Q9)SegmentDistance— point-to-line-segment distance via planar equirectangular projection (used by Q8)PointOfInterest— simple POI record (used by Q7)kafka-producer/python-producer-berlinmod.py— BerlinMOD CSV → Kafka topicberlinmoddoc/berlinmod-q3-streaming-forms.md— per-form definitions and predicate semanticsREADME.md— section describing the BerlinMOD exampleTwo Kafka-source
Mainentry points exist for Q2 and Q3 (BerlinMODQ2Main,BerlinMODQ3Main). Equivalents for Q1/Q4/Q5/Q6/Q7/Q8/Q9 follow once the docker-compose path opens.Notable verified arithmetic
Bounded-source snapshot caveat
Snapshot output for Q2 and Q6 (the forms whose output value depends on the exact last-known state) shows the same "as-of" state at every tick because
env.fromCollection(...)is a bounded source — when it closes, the watermark jumps to +∞ and all event-time timers fire after the final event has been processed, so each tick sees the latest-known state rather than the state as of the tick's event time. With a Kafka source, the watermark advances incrementally and each tick fires with the state as of that tick's event time, matching the batch BerlinMOD result point-in-time. The form logic is correct; only the local test's bounded source approximates the snapshot semantics. Documented in each snapshot class.For Q1/Q3/Q5/Q7/Q8/Q9 the artefact does not change output because the per-vehicle answer is invariant across the corpus (membership; near-or-far; meeting-or-not; (vehicle, POI) first-passage time; X-Y stationary distance).
Bump-isolation summary
Every line of these 3405 added lines is pure Java or pure Python: no JMEOS calls, no MEOS-C calls, no PyMEOS dependency. Spatial predicates use Haversine / point-in-box / point-to-segment arithmetic in pure Java with
TODO(meos)markers at each predicate-site for the future JMEOS bridge. Zero file collision with the in-flight MEOS 1.4 bump's scope.What's deliberately not in this PR
meos/examples/data/generate_berlinmod_trips.sqlin MobilityDB at any SF.TODO(meos)in each form class. Deferred until the in-flight MEOS 1.4 bump signals JMEOS has settled.Mainentry points — Q2 and Q3 have theirs as patterns; equivalents follow once the docker-compose path opens.Build verification
All nine
BerlinMODQ<N>LocalTestclasses run to completion (Flink mini-cluster, parallelism = 1) producing exactly the output shapes shown above. Single squashed commit, no AI attribution.Companion artefacts
Draft — kept open while the MEOS 1.4 bump task is in flight; the JMEOS-bridge follow-ups (Q3/Q4/Q5/Q6/Q7/Q8/Q9 spatial calls) and the BerlinMOD docker-compose follow-up both wait for that to settle.