Skip to content

feat(wirings): Kafka Streams DSL tier wirings + capstone demo (stacks on #3; Kafka mirror of Flink #6→#10)#4

Open
estebanzimanyi wants to merge 5 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/kafka-tier-wirings
Open

feat(wirings): Kafka Streams DSL tier wirings + capstone demo (stacks on #3; Kafka mirror of Flink #6→#10)#4
estebanzimanyi wants to merge 5 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/kafka-tier-wirings

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

Kafka-side mirror of the MobilityFlink wirings stack (PRs #6→#10). Single PR with all four tier wirings + runnable composite demo, since Kafka Streams' DSL is naturally lambda-driven and most tier wirings collapse to small static-factory classes returning serializable functional-interface implementations.

Stacks on PR #3 (the Kafka codegen mirror). Additive-only — no existing file touched.

What's in this PR (6 new files under kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/)

Tier Wiring class Pattern
stateless (804 methods) MeosStatelessOps static factory: predicate(...) / intPredicate(...) / mapper(...)Predicate<K,V> / ValueMapper<V,R> for KStream.filter / .mapValues
bounded-state (797 methods) MeosBoundedStateProcessor full Processor<KIn,VIn,KOut,VOut> class with KeyValueStore<KIn,byte[]> for per-key MEOS-handle state that survives changelog replay / rebalance
windowed (161 methods) MeosWindowedAggregator static factory: initializer(...) / aggregator(...) for KStream.groupByKey().windowedBy(...).aggregate(...)
cross-stream (140 methods) MeosCrossStreamJoiner static factory: joiner(...) wrapping serializable ValueJoiner for KStream.join(other, joiner, JoinWindows), same-key pairing, time-bounded match window
io-meta (195 methods) covered by MeosStatelessOps.mapper(...) (no state, no window)
sequence-only (14 methods) inherently non-streamable, no wiring

Plus:

  • MeosOpsRuntime.java — wirings-package alias for the codegen package's MEOS_AVAILABLE flag
  • demo/MeosWiringsDemoTopology.java — runnable Kafka Streams topology composing all 4 tier wirings; uses TopologyTestDriver (no broker required)
  • README.md — full tier vocabulary, lambda-first design rationale, demo walkthrough, coexistence with berlinmod.MEOSBridge

Cumulative wirings coverage: 2,097 of 2,097 emitted methods (100%) wirable through 5 generic classes; zero per-method registration. Identical to Flink side.

Design choice — lambda-first

Kafka Streams' DSL accepts lambdas directly (Predicate, ValueMapper, Aggregator, ValueJoiner). Only bounded-state needs a real class for state-store binding via Processor.init(ProcessorContext). The wirings reflect that asymmetry — small static-factory classes for the four lambda-shaped tiers, one full Processor class for bounded-state.

Adopters wanting a class-shaped wiring (matching Flink for cross-binding parity) can subclass any of the helpers — the serializable functional interfaces are public.

State-store discipline (bounded-state)

Same as the Flink wirings: raw jnr.ffi.Pointer does not survive Kafka Streams' changelog-replay / rebalance / state-rebuild paths (the state-store changelog is a Kafka topic; state must be byte-serializable to be replayable). MeosBoundedStateProcessor stores state as byte[] (typically MEOS-WKB or MEOS-WKT) with three adopter-supplied lambdas mediating the round-trip:

byte[] state               -- per-key serialized MEOS value (in changelog-backed KeyValueStore)
    ↓ deserialize (bytes → Pointer)
Pointer prev               -- in-flight MEOS handle
    ↓ step(prev, record) → (newPointer, output record)
Pointer next, Record       -- new in-flight handle + optional forward
    ↓ serialize (Pointer → bytes)
byte[] newState            -- back to KeyValueStore

First record for a key sees prior == null (no prior state); wiring skips deserialize and lets step seed.

End-to-end demo

MeosWiringsDemoTopology composes all four tier wirings into one Kafka Streams topology:

  1. vehicle-events source → 2. stateless filter → 3. bounded-state processor (per-region running tbox union) → 4. windowed aggregator (30s tumbling) → 5. cross-stream joiner (±1m bound, joined against region-queries topic) → 6. overlap-output sink.

Run via mvn exec:java. main() always prints Topology.describe(); when MEOS available, instantiates TopologyTestDriver (no broker required).

Compile verification

Locally green: 110 .class files total (94 from PR #3 base + 16 new — 4 wiring classes + 11 nested lambda interfaces + MeosOpsRuntime + 1 demo class).

Stacking

Stacks on codegen/kafka-meos-ops (PR #3). Whole 11-file diff is contained under kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/.

…eams (27/27)

All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka,
matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime.

Continuous form (9): per-event emission via record-by-record dispatch.
Windowed form (9):   STREAM_TIME punctuator at WINDOW_SIZE_MILLIS
                     (10_000 ms) emitting closed windows.
Snapshot form (9):   STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS
                     (5_000 ms) emitting per-tick state.

State patterns:
  - Stateless filter (Q2-c, Q3-c, Q8-c)
  - Single keyed flag (Q1-c, Q1-s, Q4-c)
  - Single-key state (Q2-s, Q9-s)
  - Per-vehicle accumulator (Q6-c, Q6-s)
  - Per-(vehicle, POI) keyed state (Q7-c, Q7-s)
  - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s)
  - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s)
  - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w)
  - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w)
  - winStart-keyed per-vehicle accumulator (Q6-w)
  - winStart-keyed per-vehicle entries log (Q4-w, Q7-w)

Output counts (TopologyTestDriver, 21-event sorted-by-event-time
corpus + 2 sentinel records at t=15001 and t=20001):

  Q  | continuous | windowed | snapshot
  ---|------------|----------|---------
  Q1 |          3 |        2 |       13
  Q2 |          7 |        2 |        4
  Q3 |         21 |        2 |        9
  Q4 |          1 |        2 |        5
  Q5 |         19 |        2 |        4
  Q6 |         21 |        6 |       13
  Q7 |          3 |        6 |       13
  Q8 |         21 |        2 |        9
  Q9 |         13 |        2 |        4

All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6
lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle
outputs). Continuous and snapshot count differences vs MobilityFlink
reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on
stream-time advance with state-at-fire-moment, multi-interval jumps
coalesced; vs Flink's bounded source flushing all keyed timers with
final state at +infty. Dual-sentinel pattern in LocalTest steps the
punctuator through the desired tick boundaries.

Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS
dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates
use Haversine / SegmentDistance / point-in-box, marked TODO(meos)
for JMEOS-bridge migration.
… 1.4 MEOSBridge

Mirrors the MobilityFlink bridge-swap PR. Introduces MEOSBridge as the
runtime spatial-predicate surface for all 27 BerlinMOD-9 × 3-form Kafka
Streams cells. The bridge calls MEOS via JMEOS 1.4 (geog_dwithin over
WGS84 geographies) when libmeos is loadable and falls back to the
pure-Java Haversine / SegmentDistance utilities when it is not.

- New kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
  with the dwithinMetres / dwithinSegmentMetres / distanceMetres
  surface; fail-soft static init flips MEOS_AVAILABLE to false on
  UnsatisfiedLinkError.
- All 27 Q<N>{Continuous,Snapshot,Windowed}Processor classes rewritten
  to call MEOSBridge instead of Haversine / SegmentDistance directly.
- JMEOS.jar (478 305 bytes, JMEOS#15 regen artefact) brought into
  kafka-streams-app/jar/ and declared as a system-path dependency in
  pom.xml; jnr-ffi added explicitly since system-path jars do not bring
  transitive dependencies.
- BerlinMODQ1LocalTest sets mobilitykafka.meos.enabled=false at main()
  entry so the TopologyTestDriver run stays green without libmeos.so
  on the runtime path.
- target/ build artefacts gitignored.

Build: mvn clean package -DskipTests green.
Verify: BerlinMODQ1LocalTest finishes clean, all 27 cells emit the
expected output.
…s + extended types + utils.spatial)

Updates the bundled `kafka-streams-app/jar/JMEOS.jar` to a combined
build of JMEOS PR #19 (regen against MEOS-API meos-idl.json, 2,699
methods including extended types) AND PR #18 (utils.spatial.Haversine +
utils.spatial.PointToSegment wrappers that MEOSBridge.java imports).

Surface delta vs the previous bundled jar:
  - public static methods: 2 699 (was 1 685)
  - utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2) → double
  - utils.spatial.PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) → double
  - tnpoint_ methods: 50
  - tcbuffer / tpose / trgeo: now exposed
  - sha: a5895c9b94…  size: 1,210,863 B

Unblocks the MEOSBridge.java import path (line 116) — previously the
jar shipped PR #19's GeneratedFunctions but not PR #18's utils.spatial,
so base-branch mvn compile was RED.  Both PRs now coalesced into a
single jar.

Unblocks codegen/kafka-meos-ops wedge stacked on this branch.
… surface (Kafka mirror of MobilityFlink #5)

Mirror of MobilityFlink codegen/flink-meos-ops (PR #5) for the Kafka
Streams binding. Same generators, same tier classification, same
catalog source — differs only in package path
(`org.mobilitydb.kafka.meos`) and module layout (`kafka-streams-app/`
vs `flink-processor/`).

Adds 50 `MeosOps<Class>` + 6 `MeosOpsFree<Header>` + 1 shared
`MeosOpsRuntime` = 57 Java classes, 2,097 methods (77.7% of JMEOS
PR #19's 2,699-method surface).

Stacks on feat/jmeos-bridge-swap; additive-only; touches no existing
file. See MobilityFlink #5 for the full tier vocabulary, regeneration
recipe, and coexistence design with `berlinmod.MEOSBridge`.
…facades + capstone demo

Adds the org.mobilitydb.kafka.meos.wirings package — Kafka mirror of
the MobilityFlink wirings (PRs #6→#10). Single PR with all four tiers
+ runnable composite demo, since Kafka Streams' DSL is naturally
lambda-driven and most tier wirings collapse to small static-factory
classes returning serializable functional-interface implementations.

## Files

- MeosStatelessOps.java — predicate/intPredicate/mapper factories
  returning Predicate<K,V> / ValueMapper<V,R> for KStream.filter /
  .mapValues (covers stateless tier: 804 methods + io-meta: 195
  methods)
- MeosBoundedStateProcessor.java — full Processor<KIn,VIn,KOut,VOut>
  class with KeyValueStore<KIn,byte[]> for per-key MEOS-handle state
  that survives changelog replay / rebalance (covers bounded-state
  tier: 797 methods)
- MeosWindowedAggregator.java — initializer/aggregator factories for
  KStream.groupByKey().windowedBy(...).aggregate(...) (covers
  windowed tier: 161 methods)
- MeosCrossStreamJoiner.java — joiner factory wrapping a serializable
  ValueJoiner for KStream.join(other, joiner, JoinWindows) (covers
  cross-stream tier: 140 methods)
- MeosOpsRuntime.java — wirings-package alias for the codegen package's
  MeosOpsRuntime.MEOS_AVAILABLE flag (libmeos probed once per JVM)
- demo/MeosWiringsDemoTopology.java — runnable Kafka Streams topology
  composing all four tier wirings into one pipeline (per-region
  running-union → 30s tumbling aggregate → ±1m cross-stream join
  against region-queries); main() prints topology description always,
  instantiates TopologyTestDriver when MEOS_AVAILABLE (no broker
  required)
- README.md — tier vocabulary, lambda-first design rationale, full
  recipe + demo walkthrough, coexistence with berlinmod.MEOSBridge

## Cumulative wirings-layer coverage

Same as the Flink side: 2,097 of 2,097 emitted methods (100%)
wirable through 5 generic classes; zero per-method registration.

## Design choice — lambda-first

Kafka Streams' DSL accepts lambdas directly (Predicate, ValueMapper,
Aggregator, ValueJoiner). Only bounded-state needs a real class for
state-store binding via Processor.init(ProcessorContext). The wirings
reflect that asymmetry: small static-factory classes for the four
lambda-shaped tiers, one full Processor class for bounded-state.
Adopters wanting a class-shaped wiring (matching Flink for
cross-binding parity) can subclass any of the helpers — the
serializable functional interfaces are public.

## Stacks on PR MobilityDB#3 (codegen mirror)

Additive-only: 6 new files under
kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/.
Touches no existing file. Locally compile-verified: 110 .class files
total (94 from PR MobilityDB#3 base + 16 new — 4 wiring classes + 11 nested
lambda interfaces + MeosOpsRuntime + 1 demo class).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant