Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,35 @@ Kafka producer
Flink Processor
<img src="doc/images/flink-processor.png" width="700" alt="Flink Processor" />


# BerlinMOD-9 × 3 streaming forms — the parity matrix on Flink

The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..Q9) in three streaming forms each on this runtime: **continuous** (always-on, per-event emission), **windowed** (tumbling 10-second aggregation), and **snapshot** (5-second tick — the parity-oracle form whose output at watermark T equals the batch BerlinMOD-Q result on data up to T).

| Q | Topic | Continuous | Windowed | Snapshot |
|---|---|---|---|---|
| Q1 | "which vehicles have appeared in the stream?" | ✓ | ✓ | ✓ |
| Q2 | "where is vehicle X at time T?" | ✓ | ✓ | ✓ |
| Q3 | "vehicles within d of P at time T?" | ✓ | ✓ | ✓ |
| Q4 | "vehicles entered region R, and when?" | ✓ | ✓ | ✓ |
| Q5 | "pairs of vehicles meeting near P" | ✓ | ✓ | ✓ |
| Q6 | "cumulative distance per vehicle" | ✓ | ✓ | ✓ |
| Q7 | "first passage of vehicles through POIs" | ✓ | ✓ | ✓ |
| Q8 | "vehicles close to a road segment" | ✓ | ✓ | ✓ |
| Q9 | "distance between vehicles X and Y at time T" | ✓ | ✓ | ✓ |

**27 / 27 cells** = the full MobilityFlink parity-matrix row. Each cell has a dedicated `Q<N>{Continuous,Windowed,Snapshot}Function` class in [`flink-processor/src/main/java/berlinmod/`](flink-processor/src/main/java/berlinmod/) and is locally verified via the companion `BerlinMODQ<N>LocalTest` driver running on a Flink mini-cluster.

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates today use pure-Java great-circle ([`Haversine`](flink-processor/src/main/java/berlinmod/Haversine.java)) and planar segment-distance ([`SegmentDistance`](flink-processor/src/main/java/berlinmod/SegmentDistance.java)) utilities; each call site is marked `TODO(meos)` for JMEOS-bridge migration after [JMEOS#15](https://github.com/MobilityDB/JMEOS/pull/15) (the MEOS 1.4 regen) settles.

The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).

### Sibling parity work in the ecosystem

- [MobilityKafka#1](https://github.com/MobilityDB/MobilityKafka/pull/1) — the same 27-cell row on Kafka Streams
- [MobilityNebula#15](https://github.com/MobilityDB/MobilityNebula/pull/15) — 15 of 27 cells on NebulaStream (Q1, Q2, Q3, Q4, Q7-via-POI-fanout)
- [MobilityDB-BerlinMOD#29](https://github.com/MobilityDB/MobilityDB-BerlinMOD/pull/29) — the batch BerlinMOD-9 cross-platform timings (the snapshot form's gold-answer source)
- [MobilityDB/.github#10](https://github.com/MobilityDB/.github/pull/10) — the ecosystem-profile description of the stream-layers tier

107 changes: 107 additions & 0 deletions doc/berlinmod-q3-streaming-forms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# BerlinMOD-Q3 streaming forms

This document defines what **BerlinMOD-Q3** means in each of the three
streaming forms the parity contract specifies for the MobilityFlink /
MobilityKafka / MobilityNebula trio (see the planned-tier section of the
[ecosystem profile](https://github.com/MobilityDB/.github)).

## The batch query

> *Which vehicles were within distance `d` of point `P` at time `T`?*

Parameters: a point `P = (lon, lat)`, a radius `d` in metres, and a time `T`.
Returns: the set of `vehicle_id`s whose trajectory passed within `d` of `P` at `T`.

The batch reference implementation lives in
[MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD) and
runs against the three SQL surfaces (MobilityDB / MobilityDuck / MobilitySpark)
with byte-identical results — the batch oracle for the snapshot streaming form
below.

## The three streaming forms

### 1. Continuous form

> *"At every moment, which vehicles are currently within `d` of `P`?"*

For each incoming GPS event `(vehicle_id, t, lon, lat)`:

- Evaluate the radius predicate `distance((lon, lat), P) ≤ d`.
- Emit `(vehicle_id, t, near)` per event.

No window; output updates per event. Watermark-independent.

Use case: real-time geofence alerting where each event matters.

Implemented by [`Q3ContinuousFunction`](../flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java).

### 2. Windowed form

> *"Per N-second tumbling window, how many distinct vehicles were
> within `d` of `P` at any time during the window?"*

Tumbling event-time window of size `W` (default `W = 10s`). For each window:

- Collect all events whose timestamp falls in the window.
- Compute the distinct set `{vehicle_id : ∃ event in window with distance ≤ d}`.
- Emit `(window_start, window_end, distinct_count)`.

Use case: time-bucketed dashboards, near-real-time aggregates.

Implemented by [`Q3WindowedFunction`](../flink-processor/src/main/java/berlinmod/Q3WindowedFunction.java).

### 3. Snapshot form — **the parity oracle**

> *"At time `T`, which vehicles are within `d` of `P`?"*

Watermark-driven. Per vehicle, maintain `lastKnownPosition` state. At each
snapshot tick (event-time timer at multiples of `snapshotTickMillis`,
default `5000 ms`):

- For each vehicle's most recent `(lon, lat)`, evaluate the radius predicate.
- Emit `(T, vehicle_id)` for every vehicle satisfying the predicate at `T`.

As the watermark advances to `T = max(event_times)`, the streaming snapshot
output **equals the batch BerlinMOD-Q3 result** on the same scale-factor
corpus. This is the parity property the contract enforces:

```
streaming-Q3-snapshot(T) ≡ batch-BerlinMOD-Q3 on data up to T
(same SF, same P, same d)
```

Use case: lambda-architecture style verification — streaming pipeline's
output must converge to the batch reference.

Implemented by [`Q3SnapshotFunction`](../flink-processor/src/main/java/berlinmod/Q3SnapshotFunction.java).

## Default parameters

The `BerlinMODQ3Main` entry point uses:

| Parameter | Value | Source |
|---|---|---|
| `P` (lon, lat) | (4.3517, 50.8503) — Brussels city centre | Default centre for the BerlinMOD-Brussels corpus |
| `d` (radius) | 5 000 m | Within-city-centre scale |
| `W` (window size) | 10 s | Same as the AIS example for consistency |
| Snapshot tick | 5 s | Half the window for finer parity-oracle granularity |
| Topic | `berlinmod` | Single shared topic across the three forms |

## Predicate implementation

The scaffold today uses a pure-Java great-circle (Haversine) distance check in
[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This
matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the
same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the
predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line
change once the JMEOS surface for that operator is verified — it is marked
`TODO(meos)` in each form's class.

## Companion producer

The BerlinMOD CSV → Kafka producer lives at
[`kafka-producer/python-producer-berlinmod.py`](../kafka-producer/python-producer-berlinmod.py).
Generate a BerlinMOD CSV at scale factor SF with the upstream generator
(`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB), name the
columns `(t, vehicle_id, lon, lat)`, and the producer streams it to the
`berlinmod` topic.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package berlinmod;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
* JSON → {@link BerlinMODTrip} deserializer for the Kafka {@code berlinmod} topic.
*
* <p>Expected JSON shape per record:
* <pre>
* { "t": "2007-05-28 06:00:00", "vehicle_id": 42, "lon": 4.36, "lat": 50.84 }
* </pre>
*
* <p>The timestamp format is the same {@code yyyy-MM-dd HH:mm:ss} the BerlinMOD
* generator emits in {@code generate_berlinmod_trips.sql}; we parse it as UTC
* to match the AIS pipeline's event-time convention.
*/
public class BerlinMODDeserializationSchema implements DeserializationSchema<BerlinMODTrip> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final DateTimeFormatter TS_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public BerlinMODDeserializationSchema() {
OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public BerlinMODTrip deserialize(byte[] message) throws IOException {
JsonNode node = OBJECT_MAPPER.readTree(message);
BerlinMODTrip trip = new BerlinMODTrip();
trip.setTimestamp(parseTimestamp(node.get("t").asText()));
trip.setVehicleId(node.get("vehicle_id").asInt());
trip.setLon(node.get("lon").asDouble());
trip.setLat(node.get("lat").asDouble());
return trip;
}

private long parseTimestamp(String s) {
LocalDateTime dt = LocalDateTime.parse(s, TS_FORMATTER);
return dt.atZone(ZoneId.of("UTC")).toInstant().toEpochMilli();
}

@Override
public boolean isEndOfStream(BerlinMODTrip nextElement) {
return false;
}

@Override
public TypeInformation<BerlinMODTrip> getProducedType() {
return TypeExtractor.getForClass(BerlinMODTrip.class);
}
}
95 changes: 95 additions & 0 deletions flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package berlinmod;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
* Local end-to-end test driver for the BerlinMOD-Q1 three streaming forms.
*
* <p>Same 21-event synthetic corpus as Q2/Q3 local tests. Q1 has no spatial
* predicate and no per-event filter parameter — it simply enumerates vehicles
* seen.
*
* <p>Expected output:
* <ul>
* <li><b>Q1-continuous</b>: 3 lines, one per distinct vehicle (firstSeenTime)</li>
* <li><b>Q1-windowed</b>: 2 windows, each with distinctCount=3</li>
* <li><b>Q1-snapshot</b>: 9 lines (3 ticks × 3 vehicles all seen by source-close)</li>
* </ul>
*/
public class BerlinMODQ1LocalTest {

private static final Logger LOG = LoggerFactory.getLogger(BerlinMODQ1LocalTest.class);

private static final long WINDOW_SIZE_SECONDS = 10L;
private static final long SNAPSHOT_TICK_MILLIS = 5_000L;
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms",
WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

List<BerlinMODTrip> events = buildEvents();
DataStreamSource<BerlinMODTrip> raw = env.fromCollection(events);
DataStream<BerlinMODTrip> trips = raw.assignTimestampsAndWatermarks(
WatermarkStrategy
.<BerlinMODTrip>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((e, t) -> e.getTimestamp()));

DataStream<Tuple2<Integer, Long>> cont = trips
.keyBy(BerlinMODTrip::getVehicleId)
.process(new Q1ContinuousFunction());
cont.print("Q1-continuous");

DataStream<Tuple3<Long, Long, Long>> win = trips
.windowAll(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.process(new Q1WindowedFunction());
win.print("Q1-windowed");

DataStream<Tuple2<Long, Integer>> snap = trips
.keyBy(BerlinMODTrip::getVehicleId)
.process(new Q1SnapshotFunction(SNAPSHOT_TICK_MILLIS));
snap.print("Q1-snapshot");

env.execute("BerlinMODQ1LocalTest");
LOG.info("BerlinMODQ1LocalTest done");
}

private static List<BerlinMODTrip> buildEvents() {
List<BerlinMODTrip> events = new ArrayList<>();
for (int i = 0; i <= 12; i += 2) {
events.add(make(100, T0 + i * 1000L, 4.3517, 50.8503));
}
for (int i = 1; i <= 13; i += 2) {
events.add(make(200, T0 + i * 1000L, 4.3060, 50.8270));
}
for (int i = 0; i <= 12; i += 2) {
events.add(make(300, T0 + i * 1000L, 4.2000, 50.7500));
}
return events;
}

private static BerlinMODTrip make(int vid, long t, double lon, double lat) {
BerlinMODTrip trip = new BerlinMODTrip();
trip.setVehicleId(vid);
trip.setTimestamp(t);
trip.setLon(lon);
trip.setLat(lat);
return trip;
}
}
Loading