Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Input/input_berlinmod.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
1735711200,100,4.3517,50.8503
1735711200,300,4.2000,50.7500
1735711201,200,4.3060,50.8270
1735711202,100,4.3517,50.8503
1735711202,300,4.2000,50.7500
1735711203,200,4.3060,50.8270
1735711204,100,4.3517,50.8503
1735711204,300,4.2000,50.7500
1735711205,200,4.3060,50.8270
1735711206,100,4.3517,50.8503
1735711206,300,4.2000,50.7500
1735711207,200,4.3060,50.8270
1735711208,100,4.3517,50.8503
1735711208,300,4.2000,50.7500
1735711209,200,4.3060,50.8270
1735711210,100,4.3517,50.8503
1735711210,300,4.2000,50.7500
1735711211,200,4.3060,50.8270
1735711212,100,4.3517,50.8503
1735711212,300,4.2000,50.7500
1735711213,200,4.3060,50.8270
47 changes: 47 additions & 0 deletions Queries/berlinmod/q1_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# BerlinMOD-Q1 — continuous form
# "Which vehicles have appeared in the stream?"
# Per 1-second sliding bucket: emit (start, end, vehicle_id, event-count-in-bucket).
# Reading N rows over consecutive buckets enumerates the distinct-vehicles-seen set.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — snapshot form
# "At each 5-second tick, list of distinct vehicles seen in the tick window."
# Streaming approximation of the batch BerlinMOD-Q1 snapshot at time T.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — windowed form
# "Per 10-second tumbling window, distinct vehicles seen."
# Emits one row per (window, vehicle) seen; reading N rows per window = distinctCount.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
44 changes: 44 additions & 0 deletions Queries/berlinmod/q2_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# BerlinMOD-Q2 — continuous form
# "Where is vehicle X (= 200) right now?"
# Per 1-second sliding bucket, emit a trajectory snippet for vehicle X.

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — snapshot form
# "At each 5-second tick, snapshot of vehicle X's (= 200) trajectory in the tick."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — windowed form
# "Per 10-second tumbling window, trajectory of vehicle X (= 200)."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
49 changes: 49 additions & 0 deletions Queries/berlinmod/q3_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# BerlinMOD-Q3 — continuous form
# "Vehicles within 5 km of Brussels city centre, right now."
# Per 1-second sliding bucket, emit (start, end, vehicle_id) for events near P.

query: |
SELECT start,
end,
vehicle_id
FROM berlinmod_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3517 50.8503)',
FLOAT64(5000.0)) = INT32(1)
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q3_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
50 changes: 50 additions & 0 deletions Queries/berlinmod/q3_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# BerlinMOD-Q3 — snapshot form
# "At each 5-second tick, distinct vehicles within 5 km of P."

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events_near_p
FROM berlinmod_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3517 50.8503)',
FLOAT64(5000.0)) = INT32(1)
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS_NEAR_P, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q3_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
Loading
Loading