Skip to content

feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells)#1

Draft
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q1-scaffold
Draft

feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells)#1
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q1-scaffold

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

@estebanzimanyi estebanzimanyi commented May 20, 2026

Draft. BerlinMOD-9 × 3-form parity-matrix work on MobilityKafka (Kafka-Streams runtime). 27 of 27 cells = 100 % of the MobilityKafka parity-matrix row, matching MobilityFlink #3.

Coverage

Q Topic Continuous Windowed Snapshot
Q1 "which vehicles have appeared?"
Q2 "where is vehicle X at time T?"
Q3 "vehicles within d of P at time T?"
Q4 "vehicles entered region R?"
Q5 "pairs of vehicles meeting near P"
Q6 "cumulative distance per vehicle"
Q7 "first passage through POIs"
Q8 "vehicles close to road segment"
Q9 "X-Y distance at time T"

Module contents

kafka-streams-app/ — Maven project (Java 21, Kafka Streams 3.6.0).

Per-form processors (27): Q<N>ContinuousProcessor, Q<N>WindowedProcessor, Q<N>SnapshotProcessor for N ∈ {1..9}. Each Q-Windowed/Q-Snapshot processor schedules a PunctuationType.STREAM_TIME callback at the form-appropriate interval (10 000 ms for windowed, 5 000 ms for snapshot).

Shared infrastructure: BerlinMODTrip (data class), BerlinMODTripSerde (JSON Serde), Haversine (great-circle distance, m), SegmentDistance (planar projection point-to-segment, m), PointOfInterest (Q7 record), BerlinMODTopology (unified topology fanning to per-Q-form output topics), BerlinMODQ1LocalTest (TopologyTestDriver-based local driver).

Output counts (TopologyTestDriver, sorted-event-time corpus + dual sentinels)

Q Continuous Windowed Snapshot
Q1 3 2 13
Q2 7 2 4
Q3 21 2 9
Q4 1 2 5
Q5 19 2 4
Q6 21 6 13
Q7 3 6 13
Q8 21 2 9
Q9 13 2 4

Two windows close in the test ([T0+0, T0+10000) and [T0+10000, T0+20000)); Q1/Q2/Q3/Q4/Q5/Q8/Q9 emit 1 line per closed window; Q6/Q7 emit 1 line per vehicle per closed window.

Snapshot semantics vs MobilityFlink #3

Kafka Streams' STREAM_TIME punctuator fires on stream-time advance with state-at-fire-moment, and coalesces multi-interval stream-time jumps into a single fire at the current stream-time. Flink's bounded-source watermark jumps to +∞ at source-close and flushes all keyed timers with final state.

The LocalTest pipes two sentinel records (at t=T0+15001 and t=T0+20001) to step the punctuator through the desired tick boundaries one at a time. The first event's initial-catchup fire also produces a tick at T=T0 — a Kafka Streams runtime feature with no Flink-bounded-source analogue.

State patterns covered

Pattern Cells
Stateless filter / predicate Q2-c, Q3-c, Q8-c
Single keyed flag Q1-c, Q1-s, Q4-c
Single-key state Q2-s, Q9-s
Per-vehicle accumulator Q6-c, Q6-s
Per-(vehicle, POI) keyed state Q7-c, Q7-s
Cross-vehicle shared via selectKey(0) Q5-c, Q3-s, Q5-s, Q7-s, Q8-s
Paired shared via selectKey(0) Q9-c, Q4-s, Q9-s
winStart-keyed CSV-vehicleId set Q1-w, Q3-w, Q8-w
winStart-keyed encoded last-known per vehicle Q2-w, Q5-w, Q9-w
winStart-keyed per-vehicle accumulator Q6-w
winStart-keyed per-vehicle entries log Q4-w, Q7-w

Bump-isolation

Zero MEOS surface anywhere:

  • No JMEOS jar dependency
  • No libmeos.so linkage
  • No PyMEOS-side code
  • No file collision with the in-flight MEOS 1.4 bump's scope

Spatial predicates use pure-Java Haversine / planar SegmentDistance / point-in-box, marked TODO(meos) at each call site for JMEOS-bridge migration after the bump settles.

Build verification

mvn -q clean package -DskipTests
... BUILD SUCCESS

java --add-opens java.base/java.lang=ALL-UNNAMED \
     --add-opens java.base/java.util=ALL-UNNAMED \
     --add-opens java.base/java.lang.reflect=ALL-UNNAMED \
     -cp target/mobility-kafka-streams-1.0-SNAPSHOT.jar \
     berlinmod.BerlinMODQ1LocalTest

Single squashed commit, no AI attribution, target/ ignored by .gitignore.

Companion artefacts

@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch 3 times, most recently from 44a2fb2 to 3ec6d58 Compare May 20, 2026 21:32
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod-q1): scaffold the Q1 continuous-form cell on Kafka Streams feat(berlinmod): scaffold Q1 + Q2 + Q3 continuous-form cells on Kafka Streams (3/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 3ec6d58 to 8e702c4 Compare May 20, 2026 21:40
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): scaffold Q1 + Q2 + Q3 continuous-form cells on Kafka Streams (3/27 cells) feat(berlinmod): Q1..Q9 continuous-form cells on Kafka Streams (9/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch 2 times, most recently from 9369347 to 85d8de6 Compare May 20, 2026 22:03
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous-form cells on Kafka Streams (9/27 cells) feat(berlinmod): Q1..Q9 continuous + Q1..Q9 snapshot cells on Kafka Streams (18/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 85d8de6 to 97a8a66 Compare May 20, 2026 22:10
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous + Q1..Q9 snapshot cells on Kafka Streams (18/27 cells) feat(berlinmod): Q1..Q9 continuous + Q1/Q3/Q8 windowed + Q1..Q9 snapshot on Kafka Streams (21/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 97a8a66 to 4ce7999 Compare May 21, 2026 05:42
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous + Q1/Q3/Q8 windowed + Q1..Q9 snapshot on Kafka Streams (21/27 cells) feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells) May 21, 2026
…eams (27/27)

All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka,
matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime.

Continuous form (9): per-event emission via record-by-record dispatch.
Windowed form (9):   STREAM_TIME punctuator at WINDOW_SIZE_MILLIS
                     (10_000 ms) emitting closed windows.
Snapshot form (9):   STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS
                     (5_000 ms) emitting per-tick state.

State patterns:
  - Stateless filter (Q2-c, Q3-c, Q8-c)
  - Single keyed flag (Q1-c, Q1-s, Q4-c)
  - Single-key state (Q2-s, Q9-s)
  - Per-vehicle accumulator (Q6-c, Q6-s)
  - Per-(vehicle, POI) keyed state (Q7-c, Q7-s)
  - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s)
  - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s)
  - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w)
  - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w)
  - winStart-keyed per-vehicle accumulator (Q6-w)
  - winStart-keyed per-vehicle entries log (Q4-w, Q7-w)

Output counts (TopologyTestDriver, 21-event sorted-by-event-time
corpus + 2 sentinel records at t=15001 and t=20001):

  Q  | continuous | windowed | snapshot
  ---|------------|----------|---------
  Q1 |          3 |        2 |       13
  Q2 |          7 |        2 |        4
  Q3 |         21 |        2 |        9
  Q4 |          1 |        2 |        5
  Q5 |         19 |        2 |        4
  Q6 |         21 |        6 |       13
  Q7 |          3 |        6 |       13
  Q8 |         21 |        2 |        9
  Q9 |         13 |        2 |        4

All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6
lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle
outputs). Continuous and snapshot count differences vs MobilityFlink
reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on
stream-time advance with state-at-fire-moment, multi-interval jumps
coalesced; vs Flink's bounded source flushing all keyed timers with
final state at +infty. Dual-sentinel pattern in LocalTest steps the
punctuator through the desired tick boundaries.

Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS
dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates
use Haversine / SegmentDistance / point-in-box, marked TODO(meos)
for JMEOS-bridge migration.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant