Skip to content

feat(wirings): capstone demo composing all 4 tier wirings in one pipeline (closes the #6→#9 stack)#10

Open
estebanzimanyi wants to merge 9 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/flink-tier-wirings-capstone
Open

feat(wirings): capstone demo composing all 4 tier wirings in one pipeline (closes the #6→#9 stack)#10
estebanzimanyi wants to merge 9 commits into
MobilityDB:mainfrom
estebanzimanyi:feat/flink-tier-wirings-capstone

Conversation

@estebanzimanyi
Copy link
Copy Markdown
Member

Capstone follow-up on the PR #6#7#8#9 tier-wirings stack. Single Flink DataStream job that composes all four tier wirings in a coherent end-to-end pipeline — proves the wirings compose into a realistic pipeline shape, not just work in isolation.

Pipeline

Each stage uses one tier-wiring class:

Stream A (vehicles)                Stream B (region queries)
     │                                  │
① MeosStatelessFilter                   │   ← keep events in regions of interest
     │                                  │
② MeosBoundedStateMap                   │   ← per-vehicle running tbox union (byte[] state)
     │                                  │
③ MeosWindowedAggregate                 │   ← per-vehicle 30s tumbling tbox summary
     │                                  │
     └──────────────┐            ┌──────┘
                    ↓            ↓
④ MeosCrossStreamJoin                       ← interval-join on regionId, ±1m bound
                    ↓
              output

Answers: "for each region, which vehicles had an aggregate trajectory (running union) overlapping the region's query bbox during the latest 30-second window?"

Tier-per-stage breakdown

Stage Wiring Tier contract demonstrated
① Filter MeosStatelessFilter Per-event predicate, no state
② Running union MeosBoundedStateMap MEOS handle persisted across events as byte[] state (checkpoint-safe)
③ Window summary MeosWindowedAggregate Window-close-only aggregation; no handle persistence across windows
④ Region overlap MeosCrossStreamJoin Interval-join, same-key pairing, time-bounded match

File / compile

Stacking + closure

This PR closes the PR #6#9 wiring stack with the composite-demo capstone. The 5-PR stack collectively delivers:

PR What it adds
PR #6 MeosStatelessMap + MeosStatelessFilter (stateless tier — 804 methods)
PR #7 MeosBoundedStateMap (bounded-state tier — 797 methods)
PR #8 MeosWindowedAggregate (windowed tier — 161 methods)
PR #9 MeosCrossStreamJoin (cross-stream tier — 140 methods)
PR #10 (this) Capstone demo composing all 4 in one pipeline

Cumulative wirings-layer coverage = 2,097 of 2,097 emitted methods (100%) wirable through 5 generic classes; this PR proves they compose.

…matrix on MobilityFlink

All nine BerlinMOD reference queries × three streaming forms each
(continuous, windowed, snapshot) on MobilityFlink — the complete 27-cell
stream-layers parity-matrix row, locally verified end-to-end with no
external dependencies (no Kafka, no Docker, no MEOS native lib, no
JMEOS call).

Queries:

  Q1  which vehicles have appeared in the stream
  Q2  where is vehicle X at time T
  Q3  which vehicles within d of P at time T
  Q4  which vehicles entered region R, and when
  Q5  pairs of vehicles meeting near point P
  Q6  cumulative distance per vehicle
  Q7  first passage of vehicles through POIs
  Q8  vehicles close to a road segment
  Q9  distance between vehicles X and Y at time T

Each query has three form classes (Q<N>{Continuous,Windowed,Snapshot}Function)
and a companion BerlinMODQ<N>LocalTest driver running the three forms
through a Flink mini-cluster against a hardcoded synthetic corpus.

Spatial predicates today are pure Java — Haversine distance for
point-to-point (Q3, Q5, Q6, Q9), point-in-box for region containment
(Q4), and a planar-projection point-to-line-segment distance (Q8). Each
spatial call site is marked TODO(meos) for migration to the JMEOS
bridge of the corresponding MEOS operator once the in-flight MEOS 1.4
bump signals settled (Q3 edwithin_tgeo_geo; Q4 STBox eintersects; Q5
NAD / edwithin_tgeo_tgeo; Q6 trajectory length; Q7 edwithin_tgeo_geo;
Q8 distance(tgeompoint, geometry(LINESTRING)); Q9 tdistance). Q1 and
Q2 have no spatial predicate.

State patterns exercised:
  - keyed simple flag (Q1)
  - keyed last-known position (Q2, Q8)
  - keyed transition + entry log (Q4)
  - keyed accumulator (Q6)
  - keyed first-passage map (Q7)
  - shared key-by-constant state (Q9 pair-wise, Q5 multi-pair MapState)

Verified output counts (see PR description for the exact-line excerpts):

  Q  | continuous | windowed | snapshot
  ---|------------|----------|---------
  Q1 |          3 |        2 |        9
  Q2 |          7 |        2 |        3
  Q3 |         21 |        2 |        6
  Q4 |          4 |        5 |        9
  Q5 |         14 |        2 |        3   (only pair (100,200) qualifies for our P + radii)
  Q6 |         21 |        6 |        9   (drift corpus; v100=601m, v200=300m, v300=1205m)
  Q7 |          3 |        6 |        9   (3 (vehicle, POI) first-passages; intra-window scope)
  Q8 |         21 |        2 |        6   (same shape as Q3 with segment-distance)
  Q9 |          7 |        2 |        3   (X=100, Y=200; distance 4124m = ~4.1km)

Build verification: mvn clean package green; all nine LocalTests run to
completion (Flink mini-cluster, parallelism=1) producing exactly the
expected output shapes.
… 1.4 MEOSBridge

Introduce MEOSBridge as the runtime spatial-predicate surface for all
BerlinMOD-9 × 3-form streaming cells. The bridge calls into MEOS via
JMEOS 1.4 (geog_dwithin over WGS84 geographies) when libmeos is loadable
and falls back to the pure-Java Haversine / SegmentDistance utilities
when it is not — the fallback path is what the BerlinMODQ*LocalTest
mini-cluster drivers exercise (system property mobilityflink.meos.enabled=false).

- New berlinmod/MEOSBridge.java with the dwithinMetres /
  dwithinSegmentMetres / distanceMetres surface and a fail-soft
  static init that flips MEOS_AVAILABLE to false on UnsatisfiedLinkError.
- All BerlinMOD-9 × 3-form spatial predicates rewritten to call
  MEOSBridge instead of Haversine / SegmentDistance directly. 27 cells,
  one bridge call surface, identical predicate semantics.
- JMEOS.jar updated to the JMEOS#15 regen branch artefact (478 305
  bytes); this is the JMEOS 1.4 regen build that exposes geog_dwithin /
  geom_in / geom_to_geog / edwithin_tgeo_geo / nad_tgeo_geo / tpoint_length.
- aisdata/Main.java and aisdata/TrajectoryWindowFunction.java adapted
  to the JMEOS 1.4 meos_initialize() / meos_initialize_timezone()
  split (the old two-arg meos_initialize(String, error_handler_fn)
  signature is gone in JMEOS#15).
- All nine BerlinMODQ*LocalTest mini-cluster drivers set
  mobilityflink.meos.enabled=false at main() entry so they remain
  green-CI without libmeos.so on the runtime path.
- target/ build artefacts gitignored.

The README's spatial-predicate paragraph is updated to describe the
MEOSBridge route as the production path; the TODO(meos) markers across
the BerlinMOD cells are gone.

Build: mvn clean package -DskipTests green.
Verify: BerlinMODQ{1,3,5,8}LocalTest all finish with FINISHED state
on the mini-cluster fallback path.
…s + extended types + utils.spatial)

Updates the bundled `flink-processor/jar/JMEOS.jar` to a combined build
of JMEOS PR #19 (regen against MEOS-API meos-idl.json, 2,699 methods
including extended types) AND PR #18 (utils.spatial.Haversine +
utils.spatial.PointToSegment wrappers that MEOSBridge.java imports).

Surface delta vs the previous bundled jar:
  - public static methods: 2 699 (was 1 685)
  - utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2) → double
  - utils.spatial.PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) → double
  - tnpoint_ methods: 50
  - tcbuffer / tpose / trgeo: now exposed
  - sha: a5895c9b94…  size: 1,210,863 B

Unblocks the MEOSBridge.java import path (line 116) — previously the
jar shipped PR #19's GeneratedFunctions but not PR #18's utils.spatial,
so base-branch mvn compile was RED.  Both PRs now coalesced into a
single jar built by:

    mvn -pl codegen,jmeos-core compile -Dmaven.test.skip=true
    cd jmeos-core/target/classes && jar cf JMEOS.jar .

Unblocks codegen/flink-meos-ops wedge stacked on this branch.
… surface

Add a generated, tier-aware Java facade over the MEOS public API,
organized as one Java class per MEOS object-model class plus one per
public-MEOS-header for free functions:

- 50 `MeosOps<Class>` classes (751 methods): one per MEOS object-model
  class (TFloat, TInt, TBool, TText, TGeomPoint, TGeogPoint, TCbuffer,
  TNpoint, TPose, TRGeometry, TBox, STBox, Set, Span, SpanSet, …).
- 6 `MeosOpsFree<Header>` classes (1,346 methods): one per public MEOS
  header for functions not assigned to any object-model class
  (MeosOpsFreeCore, MeosOpsFreeGeo, MeosOpsFreeCbuffer, MeosOpsFreeNpoint,
  MeosOpsFreePose, MeosOpsFreeRgeo).
- 1 shared `MeosOpsRuntime` (single `MEOS_AVAILABLE` static-init across
  all 56 facades).

Each emitted method forwards to `functions.GeneratedFunctions.<name>(...)`
after probing the shared `MeosOpsRuntime.MEOS_AVAILABLE` flag. Each
method carries a Javadoc tier marker (stateless / bounded-state /
windowed / cross-stream / io-meta) so consumers know the per-method
wiring shape.

Total emit: 2,097 of JMEOS PR #19's 2,699-method surface (77.7%);
remainder is the JMEOS-deliberately-omitted type-catalog helpers plus
the streaming-relevance-baseline ambiguous (59) and sequence-only (14)
buckets, both surfaced separately for design decisions before emit.

Two generators under flink-processor/tools/codegen/:
- codegen-oo.py: reads JMEOS jar signatures via javap-p +
  streaming-relevance baseline + MEOS object model → emits per-OO-class
  facades.
- codegen-free.py: same shape, but for functions not in the OO model →
  emits per-header facades.

Both are ~250 LOC, deterministic, audit-by-regeneration. Manifests
record provenance (JMEOS method total, baseline target count, emit
count, per-tier breakdown, per-class/per-header method count, sample
of functions absent from JMEOS).

Coexists with the existing berlinmod.MEOSBridge hand-written
BerlinMOD-scoped bridge (high-level, query-shaped); the generated
MeosOps* facades expose the raw MEOS surface tier-by-tier
(low-level, catalog-shaped). Both share the same MEOS_AVAILABLE
discipline and `functions.GeneratedFunctions` delegation.

Stacks on feat/jmeos-bridge-swap; additive-only; touches no existing
file. Locally compile-verified against the union of JMEOS PR #19's
jmeos-core + PR #18's utils.spatial (the latter needed by MEOSBridge,
separately tracked).
…OS facades

Adds the org.mobilitydb.flink.meos.wirings package — thin, generic
Flink-DataStream wrappers around the generated MeosOps* facades from
PR MobilityDB#5, organized per streaming tier.

This PR ships the stateless tier:
- MeosStatelessMap<IN, OUT>: generic MapFunction wrapping any stateless
  MeosOps* method (804 of the 2,097 generated methods qualify per the
  v4 baseline — 92 OO-classified + 712 free-fn)
- MeosStatelessFilter<IN>: generic FilterFunction wrapping any stateless
  boolean-returning MeosOps* method, plus a .fromIntPredicate(...)
  adapter for JMEOS' int-coded predicates
- demo/MeosWiringsDemoJob: runnable end-to-end DataStream pipeline
  parsing TBox WKT → filtering by overlap with a query box →
  serializing surviving boxes to hex-WKB, all through the generated
  facades wired via this package
- README documenting tier vocabulary, the wrap-once-use-everywhere
  pattern, the DataStream-API-only design choice (Table API as future
  follow-up), and coexistence with berlinmod.MEOSBridge

Future follow-ups (one PR per tier, mirroring this one's shape):
- MeosBoundedStateMap (generic KeyedProcessFunction with
  ValueState<Pointer> for MEOS handle per key — covers 797 of the
  generated methods)
- MeosWindowedAggregate (generic ProcessWindowFunction — 161 methods)
- MeosCrossStreamJoin (generic KeyedCoProcessFunction or interval-join
  — 140 methods)
- Optional: Table API sibling (MeosScalarUDF + MeosCatalogRegistrar)
  if the repo adopts Table API for other reasons

Stacks on codegen/flink-meos-ops (PR MobilityDB#5). Additive-only; touches no
existing file. Locally compile-verified: 129 .class files total (123
from the parent PR + 6 new from this package's classes + demo + their
nested lambdas).
Adds MeosBoundedStateMap<K, IN, OUT> — the second tier-wiring class
in the org.mobilitydb.flink.meos.wirings package, stacked on PR MobilityDB#6
(stateless wirings).

Bounded-state is the second-largest streaming tier in the v4 baseline
(797 of 2,097 emitted methods — 513 OO-classified + 284 free-fn). The
canonical pattern is per-key MEOS-handle accumulation: a running tbox
union, a running temporal value, a per-vehicle accumulator that keeps
the MEOS value alive across events.

## Design — state lives as bytes, not as Pointer

A jnr.ffi.Pointer is a raw native-memory address. It is not portable
across JVM restarts; Flink could not checkpoint, savepoint, or replay
state if the wiring stored raw pointers. MeosBoundedStateMap stores
state as byte[] (MEOS-WKB or MEOS-WKT, adopter's choice) with three
adopter-supplied lambdas mediating the round-trip:

- PointerSerialize:   Pointer → byte[]  (called after each step)
- PointerDeserialize: byte[] → Pointer  (called before each step)
- MeosStepFn:         (prior Pointer, event) → (new Pointer, output)

The first event for a key sees prior == null; the wiring handles that
case by skipping deserialize and seeding state with the first event's
result. Subsequent events re-hydrate, mutate, re-serialize.

Net effect: state crossing the operator boundary is always byte[];
checkpoints, savepoints, and rescaling all work correctly. This is
the same serde discipline MobilityDuck's persistent state machines
use.

## Files

- MeosBoundedStateMap.java — the generic wiring class
- demo/MeosBoundedStateDemoJob.java — runnable per-vehicle running
  tbox union pipeline (6 events × 2 vehicles; demonstrates per-key
  isolation, first-event-null correctness, and checkpoint-safe state)
- README — bounded-state row marked ✅ shipped

## Stacks on PR MobilityDB#6

Additive-only; touches no existing file. Locally compile-verified:
135 .class files total (129 from PR MobilityDB#6 base + 6 new — 1 wiring class
+ 3 nested lambda interfaces + MeosStep tuple + 1 demo class).
Adds MeosWindowedAggregate<K, IN, OUT, W> — the third tier-wiring
class in the org.mobilitydb.flink.meos.wirings package, stacked on
PR MobilityDB#7 (bounded-state wirings).

Windowed is the third streaming tier (161 of 2,097 emitted methods,
~8%) — output cardinality changes; one MEOS aggregate per window.
Canonical examples: temporal_length(tgeo) for per-window trajectory
length, temporal_twavg(tnumber) for time-weighted average per
window, per-class _trajectory / _time / _timespan accessors.

## Design

Wraps any windowed MeosOps call as a ProcessWindowFunction with a
slim adopter-facing signature: the lambda receives the window
metadata, the iterable of in-window events, and a slim Context
exposing key + processing-time + watermark (free of Flink internals).

Unlike bounded-state, NO MEOS handle persists across window
boundaries — each window's MEOS value is built fresh from the
iterable on window close, used to compute the output, discarded.
The iterable's events are Flink-side data; MEOS handles are
short-lived per-window.

## Files

- MeosWindowedAggregate.java — the generic wiring class
- demo/MeosWindowedDemoJob.java — runnable 30s-tumbling-window
  per-vehicle aggregate-tbox demo (8 events × 2 vehicles × 2 windows;
  demonstrates window-close timing, per-key isolation, fresh-per-
  window aggregation)
- README — windowed row marked ✅ shipped

## Stacks on PR MobilityDB#7

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 140 .class files total (135 from PR MobilityDB#7
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant