feat(wirings): Kafka Streams DSL tier wirings + capstone demo (stacks on #3; Kafka mirror of Flink #6→#10)#4
Open
estebanzimanyi wants to merge 5 commits into
Conversation
…eams (27/27) All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka, matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime. Continuous form (9): per-event emission via record-by-record dispatch. Windowed form (9): STREAM_TIME punctuator at WINDOW_SIZE_MILLIS (10_000 ms) emitting closed windows. Snapshot form (9): STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS (5_000 ms) emitting per-tick state. State patterns: - Stateless filter (Q2-c, Q3-c, Q8-c) - Single keyed flag (Q1-c, Q1-s, Q4-c) - Single-key state (Q2-s, Q9-s) - Per-vehicle accumulator (Q6-c, Q6-s) - Per-(vehicle, POI) keyed state (Q7-c, Q7-s) - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s) - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s) - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w) - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w) - winStart-keyed per-vehicle accumulator (Q6-w) - winStart-keyed per-vehicle entries log (Q4-w, Q7-w) Output counts (TopologyTestDriver, 21-event sorted-by-event-time corpus + 2 sentinel records at t=15001 and t=20001): Q | continuous | windowed | snapshot ---|------------|----------|--------- Q1 | 3 | 2 | 13 Q2 | 7 | 2 | 4 Q3 | 21 | 2 | 9 Q4 | 1 | 2 | 5 Q5 | 19 | 2 | 4 Q6 | 21 | 6 | 13 Q7 | 3 | 6 | 13 Q8 | 21 | 2 | 9 Q9 | 13 | 2 | 4 All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6 lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle outputs). Continuous and snapshot count differences vs MobilityFlink reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on stream-time advance with state-at-fire-moment, multi-interval jumps coalesced; vs Flink's bounded source flushing all keyed timers with final state at +infty. Dual-sentinel pattern in LocalTest steps the punctuator through the desired tick boundaries. Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates use Haversine / SegmentDistance / point-in-box, marked TODO(meos) for JMEOS-bridge migration.
… 1.4 MEOSBridge
Mirrors the MobilityFlink bridge-swap PR. Introduces MEOSBridge as the
runtime spatial-predicate surface for all 27 BerlinMOD-9 × 3-form Kafka
Streams cells. The bridge calls MEOS via JMEOS 1.4 (geog_dwithin over
WGS84 geographies) when libmeos is loadable and falls back to the
pure-Java Haversine / SegmentDistance utilities when it is not.
- New kafka-streams-app/src/main/java/berlinmod/MEOSBridge.java
with the dwithinMetres / dwithinSegmentMetres / distanceMetres
surface; fail-soft static init flips MEOS_AVAILABLE to false on
UnsatisfiedLinkError.
- All 27 Q<N>{Continuous,Snapshot,Windowed}Processor classes rewritten
to call MEOSBridge instead of Haversine / SegmentDistance directly.
- JMEOS.jar (478 305 bytes, JMEOS#15 regen artefact) brought into
kafka-streams-app/jar/ and declared as a system-path dependency in
pom.xml; jnr-ffi added explicitly since system-path jars do not bring
transitive dependencies.
- BerlinMODQ1LocalTest sets mobilitykafka.meos.enabled=false at main()
entry so the TopologyTestDriver run stays green without libmeos.so
on the runtime path.
- target/ build artefacts gitignored.
Build: mvn clean package -DskipTests green.
Verify: BerlinMODQ1LocalTest finishes clean, all 27 cells emit the
expected output.
…s + extended types + utils.spatial) Updates the bundled `kafka-streams-app/jar/JMEOS.jar` to a combined build of JMEOS PR #19 (regen against MEOS-API meos-idl.json, 2,699 methods including extended types) AND PR #18 (utils.spatial.Haversine + utils.spatial.PointToSegment wrappers that MEOSBridge.java imports). Surface delta vs the previous bundled jar: - public static methods: 2 699 (was 1 685) - utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2) → double - utils.spatial.PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) → double - tnpoint_ methods: 50 - tcbuffer / tpose / trgeo: now exposed - sha: a5895c9b94… size: 1,210,863 B Unblocks the MEOSBridge.java import path (line 116) — previously the jar shipped PR #19's GeneratedFunctions but not PR #18's utils.spatial, so base-branch mvn compile was RED. Both PRs now coalesced into a single jar. Unblocks codegen/kafka-meos-ops wedge stacked on this branch.
… surface (Kafka mirror of MobilityFlink #5) Mirror of MobilityFlink codegen/flink-meos-ops (PR #5) for the Kafka Streams binding. Same generators, same tier classification, same catalog source — differs only in package path (`org.mobilitydb.kafka.meos`) and module layout (`kafka-streams-app/` vs `flink-processor/`). Adds 50 `MeosOps<Class>` + 6 `MeosOpsFree<Header>` + 1 shared `MeosOpsRuntime` = 57 Java classes, 2,097 methods (77.7% of JMEOS PR #19's 2,699-method surface). Stacks on feat/jmeos-bridge-swap; additive-only; touches no existing file. See MobilityFlink #5 for the full tier vocabulary, regeneration recipe, and coexistence design with `berlinmod.MEOSBridge`.
…facades + capstone demo Adds the org.mobilitydb.kafka.meos.wirings package — Kafka mirror of the MobilityFlink wirings (PRs #6→#10). Single PR with all four tiers + runnable composite demo, since Kafka Streams' DSL is naturally lambda-driven and most tier wirings collapse to small static-factory classes returning serializable functional-interface implementations. ## Files - MeosStatelessOps.java — predicate/intPredicate/mapper factories returning Predicate<K,V> / ValueMapper<V,R> for KStream.filter / .mapValues (covers stateless tier: 804 methods + io-meta: 195 methods) - MeosBoundedStateProcessor.java — full Processor<KIn,VIn,KOut,VOut> class with KeyValueStore<KIn,byte[]> for per-key MEOS-handle state that survives changelog replay / rebalance (covers bounded-state tier: 797 methods) - MeosWindowedAggregator.java — initializer/aggregator factories for KStream.groupByKey().windowedBy(...).aggregate(...) (covers windowed tier: 161 methods) - MeosCrossStreamJoiner.java — joiner factory wrapping a serializable ValueJoiner for KStream.join(other, joiner, JoinWindows) (covers cross-stream tier: 140 methods) - MeosOpsRuntime.java — wirings-package alias for the codegen package's MeosOpsRuntime.MEOS_AVAILABLE flag (libmeos probed once per JVM) - demo/MeosWiringsDemoTopology.java — runnable Kafka Streams topology composing all four tier wirings into one pipeline (per-region running-union → 30s tumbling aggregate → ±1m cross-stream join against region-queries); main() prints topology description always, instantiates TopologyTestDriver when MEOS_AVAILABLE (no broker required) - README.md — tier vocabulary, lambda-first design rationale, full recipe + demo walkthrough, coexistence with berlinmod.MEOSBridge ## Cumulative wirings-layer coverage Same as the Flink side: 2,097 of 2,097 emitted methods (100%) wirable through 5 generic classes; zero per-method registration. ## Design choice — lambda-first Kafka Streams' DSL accepts lambdas directly (Predicate, ValueMapper, Aggregator, ValueJoiner). Only bounded-state needs a real class for state-store binding via Processor.init(ProcessorContext). The wirings reflect that asymmetry: small static-factory classes for the four lambda-shaped tiers, one full Processor class for bounded-state. Adopters wanting a class-shaped wiring (matching Flink for cross-binding parity) can subclass any of the helpers — the serializable functional interfaces are public. ## Stacks on PR MobilityDB#3 (codegen mirror) Additive-only: 6 new files under kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/. Touches no existing file. Locally compile-verified: 110 .class files total (94 from PR MobilityDB#3 base + 16 new — 4 wiring classes + 11 nested lambda interfaces + MeosOpsRuntime + 1 demo class).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Kafka-side mirror of the MobilityFlink wirings stack (PRs #6→#10). Single PR with all four tier wirings + runnable composite demo, since Kafka Streams' DSL is naturally lambda-driven and most tier wirings collapse to small static-factory classes returning serializable functional-interface implementations.
Stacks on PR #3 (the Kafka codegen mirror). Additive-only — no existing file touched.
What's in this PR (6 new files under
kafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/)stateless(804 methods)MeosStatelessOpspredicate(...)/intPredicate(...)/mapper(...)→Predicate<K,V>/ValueMapper<V,R>forKStream.filter/.mapValuesbounded-state(797 methods)MeosBoundedStateProcessorProcessor<KIn,VIn,KOut,VOut>class withKeyValueStore<KIn,byte[]>for per-key MEOS-handle state that survives changelog replay / rebalancewindowed(161 methods)MeosWindowedAggregatorinitializer(...)/aggregator(...)forKStream.groupByKey().windowedBy(...).aggregate(...)cross-stream(140 methods)MeosCrossStreamJoinerjoiner(...)wrapping serializableValueJoinerforKStream.join(other, joiner, JoinWindows), same-key pairing, time-bounded match windowio-meta(195 methods)MeosStatelessOps.mapper(...)sequence-only(14 methods)Plus:
MeosOpsRuntime.java— wirings-package alias for the codegen package'sMEOS_AVAILABLEflagdemo/MeosWiringsDemoTopology.java— runnable Kafka Streams topology composing all 4 tier wirings; usesTopologyTestDriver(no broker required)README.md— full tier vocabulary, lambda-first design rationale, demo walkthrough, coexistence withberlinmod.MEOSBridgeCumulative wirings coverage: 2,097 of 2,097 emitted methods (100%) wirable through 5 generic classes; zero per-method registration. Identical to Flink side.
Design choice — lambda-first
Kafka Streams' DSL accepts lambdas directly (
Predicate,ValueMapper,Aggregator,ValueJoiner). Onlybounded-stateneeds a real class for state-store binding viaProcessor.init(ProcessorContext). The wirings reflect that asymmetry — small static-factory classes for the four lambda-shaped tiers, one fullProcessorclass forbounded-state.Adopters wanting a class-shaped wiring (matching Flink for cross-binding parity) can subclass any of the helpers — the serializable functional interfaces are public.
State-store discipline (bounded-state)
Same as the Flink wirings: raw
jnr.ffi.Pointerdoes not survive Kafka Streams' changelog-replay / rebalance / state-rebuild paths (the state-store changelog is a Kafka topic; state must be byte-serializable to be replayable).MeosBoundedStateProcessorstores state asbyte[](typically MEOS-WKB or MEOS-WKT) with three adopter-supplied lambdas mediating the round-trip:First record for a key sees
prior == null(no prior state); wiring skips deserialize and lets step seed.End-to-end demo
MeosWiringsDemoTopologycomposes all four tier wirings into one Kafka Streams topology:vehicle-eventssource → 2. stateless filter → 3. bounded-state processor (per-region running tbox union) → 4. windowed aggregator (30s tumbling) → 5. cross-stream joiner (±1m bound, joined againstregion-queriestopic) → 6.overlap-outputsink.Run via
mvn exec:java.main()always printsTopology.describe(); when MEOS available, instantiatesTopologyTestDriver(no broker required).Compile verification
Locally green: 110 .class files total (94 from PR #3 base + 16 new — 4 wiring classes + 11 nested lambda interfaces +
MeosOpsRuntime+ 1 demo class).Stacking
Stacks on
codegen/kafka-meos-ops(PR #3). Whole 11-file diff is contained underkafka-streams-app/src/main/java/org/mobilitydb/kafka/meos/wirings/.