Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 57 additions & 164 deletions imap_processing/ialirt/calculate_ingest.py
Original file line number Diff line number Diff line change
@@ -1,151 +1,78 @@
"""Packet ingest and tcp connection times for each station."""
"""Packet ingest times and rates for each station."""

import logging
from datetime import datetime, timedelta, timezone
from typing import Any

from imap_processing.ialirt.constants import STATIONS

logger = logging.getLogger(__name__)

STATIONS = ["Kiel"]


def find_tcp_connections( # noqa: PLR0912
start_file_creation: datetime,
end_file_creation: datetime,
lines: list,
realtime_summary: dict,
) -> dict:
def packets_created(start_file_creation: datetime, lines: list) -> dict:
"""
Find tcp connection time ranges for ground station from log lines.
Find timestamps and rates when packets were ingested based on log lines.

Parameters
----------
start_file_creation : datetime
File creation time of last file minus 48 hrs.
end_file_creation : datetime
File creation time of last file.
lines : list
All lines of log files.
realtime_summary : dict
Input dictionary containing ingest parameters.

Returns
-------
realtime_summary : dict
Output dictionary with tcp connection info.
station_dict : dict
Timestamps and rates when packets were ingested.
"""
current_starts: dict[str, datetime | None] = {}
partners_opened = set()
station_dict: dict[str, dict[str, list[Any]]] = {
station: {"last_data_received": [], "rate_kbps": []}
for station in list(STATIONS)
}

station_year: dict[str, int] = {
station: start_file_creation.year for station in station_dict
}
prev_doy: dict[str, int | None] = {station: None for station in station_dict}

for line in lines:
# Note if this line appears.
if "Opened raw record file" in line:
station = line.split("Opened raw record file for ")[1].split(
" antenna_partner"
)[0]
partners_opened.add(station)

if "antenna partner connection is" not in line:
continue

timestamp_str = line.split(" ")[0]
msg = " ".join(line.split(" ")[1:])
station = msg.split(" antenna")[0]

if station not in realtime_summary["connection_times"]:
realtime_summary["connection_times"][station] = []
if station not in realtime_summary["stations"]:
realtime_summary["stations"].append(station)

timestamp = datetime.strptime(timestamp_str, "%Y/%j-%H:%M:%S.%f")

if f"{station} antenna partner connection is up." in line:
current_starts[station] = timestamp

elif f"{station} antenna partner connection is down!" in line:
start = current_starts.get(station)
if start is not None:
realtime_summary["connection_times"][station].append(
{
"start": datetime.isoformat(start),
"end": datetime.isoformat(timestamp),
}
# If line begins with a digit and the station is present.
if line.split()[0].isdigit() and line.split()[1] in STATIONS:
# Get bps rate.
rate = float(line.split()[-1])
# Get last data received.
data_last_received = line.split()[2]
# Get day of year.
doy = int(data_last_received[:3])
# Get station.
station = line.split()[1]

# Handle end of year rollover
prev = prev_doy[station]

if prev is not None and doy < prev:
station_year[station] += 1

prev_doy[station] = doy

dt = (
datetime.strptime(
f"{station_year[station]}/{data_last_received}",
"%Y/%j-%H:%M:%S",
)
current_starts[station] = None
else:
# No matching "up"
realtime_summary["connection_times"][station].append(
{
"start": datetime.isoformat(start_file_creation),
"end": datetime.isoformat(timestamp),
}
)
current_starts[station] = None

# Handle hanging "up" at the end of file
for station, start in current_starts.items():
if start is not None:
realtime_summary["connection_times"][station].append(
{
"start": datetime.isoformat(start),
"end": datetime.isoformat(end_file_creation),
}
)

# Handle stations with only "Opened raw record file" (no up/down)
for station in partners_opened:
if not realtime_summary["connection_times"][station]:
realtime_summary["connection_times"][station].append(
{
"start": datetime.isoformat(start_file_creation),
"end": datetime.isoformat(end_file_creation),
}
.replace(tzinfo=timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
station_dict[station]["last_data_received"].append(dt)
station_dict[station]["rate_kbps"].append(rate)

# Filter out connection windows that are completely outside the time window
for station in realtime_summary["connection_times"]:
realtime_summary["connection_times"][station] = [
window
for window in realtime_summary["connection_times"][station]
if datetime.fromisoformat(window["end"]) >= start_file_creation
and datetime.fromisoformat(window["start"]) <= end_file_creation
]

return realtime_summary


def packets_created(start_file_creation: datetime, lines: list) -> list:
"""
Find timestamps when packets were created based on log lines.

Parameters
----------
start_file_creation : datetime
File creation time of last file minus 48 hrs.
lines : list
All lines of log files.

Returns
-------
packet_times : list
List of datetime objects when packets were created.
"""
packet_times = []

for line in lines:
if "Renamed iois_1_packets" in line:
timestamp_str = line.split(" ")[0]
timestamp = datetime.strptime(timestamp_str, "%Y/%j-%H:%M:%S.%f")
# Possible that data extends further than 48 hrs in the past.
if timestamp >= start_file_creation:
packet_times.append(timestamp)

return packet_times
return station_dict


def format_ingest_data(last_filename: str, log_lines: list) -> dict:
"""
Format TCP connection and packet ingest data from multiple log files.
Format packet ingest times and rates from log file.

Parameters
----------
Expand All @@ -157,8 +84,7 @@ def format_ingest_data(last_filename: str, log_lines: list) -> dict:
Returns
-------
realtime_summary : dict
Structured output with TCP connection windows per station
and global packet ingest timestamps.
Structured output with packet receipt info per station.

Notes
-----
Expand All @@ -167,71 +93,38 @@ def format_ingest_data(last_filename: str, log_lines: list) -> dict:
"summary": "I-ALiRT Real-time Ingest Summary",
"generated": "2025-08-07T21:36:09Z",
"time_format": "UTC (ISOC)",
"stations": [
"Kiel"
],
"time_range": [
"2025-07-30T23:00:00",
"2025-07-31T02:00:00"
"2025-01-21T09:50:58Z",
"2025-01-21T09:55:58Z"
],
"packet_ingest": [
"2025-07-31T00:00:00",
"2025-07-31T02:01:00"
],
"connection_times": {
"Kiel": [
{
"start": "2025-07-30T23:00:00",
"end": "2025-07-31T00:15:00"
},
{
"start": "2025-07-31T02:00:00",
"end": "2025-07-31T02:00:00"
}
]
}
"Kiel": {"last_data_received": ["2025-01-21T09:50:58Z", "2025-01-21T09:51:58Z"],
"rate_kbps": [2.0, 2.0]}
}

where time_range is the overall time range of the data,
packet_ingest contains timestamps when packets were finalized,
and tcp contains connection windows for each station.
"""
# File creation time.
last_timestamp_str = last_filename.split(".")[2]
last_timestamp_str = last_timestamp_str.replace("_", ":")
end_of_time = datetime.strptime(last_timestamp_str, "%Y-%jT%H:%M:%S")

# File creation time of last file minus 48 hrs.
# File is created every 5 minutes.
start_of_time = datetime.strptime(last_timestamp_str, "%Y-%jT%H:%M:%S") - timedelta(
hours=48
minutes=5
)

# Parse file.
station_dict = packets_created(start_of_time, log_lines)

realtime_summary: dict[str, Any] = {
"summary": "I-ALiRT Real-time Ingest Summary",
"generated": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"time_format": "UTC (ISOC)",
"stations": list(STATIONS),
"time_range": [
start_of_time.isoformat(),
end_of_time.isoformat(),
], # Overall time range of the data
"packet_ingest": [], # Global packet ingest times
"connection_times": {
station: [] for station in list(STATIONS)
}, # Per-station TCP connection windows
**station_dict,
}

# TCP connection data for each station
realtime_summary = find_tcp_connections(
start_of_time, end_of_time, log_lines, realtime_summary
)

# Global packet ingest timestamps
packet_times = packets_created(start_of_time, log_lines)
realtime_summary["packet_ingest"] = [
pkt_time.isoformat() for pkt_time in packet_times
]

logger.info(f"Created ingest files for {realtime_summary['time_range']}")

return realtime_summary
Loading