Skip to content

Commit caa7571

Browse files
committed
Partition DB data
1 parent 605ba12 commit caa7571

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

dbutils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def migration_step_2():
145145
output_snmp SMALLINT NOT NULL,
146146
ipv4_dst_addr INET NOT NULL,
147147
ipv4_src_addr INET NOT NULL
148-
);
148+
) PARTITION BY RANGE (ts);
149149
""")
150150
c.execute(f'CREATE INDEX {DB_PREFIX}flows_ts on {DB_PREFIX}flows (ts);')
151151

netflowwriter.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import argparse
22
import base64
3+
from datetime import datetime, timedelta
34
import gzip
45
import json
56
import logging
@@ -47,6 +48,7 @@ def process_named_pipe(named_pipe_filename):
4748
raise
4849

4950
templates = {}
51+
last_day_seq = None
5052
while True:
5153
with open(named_pipe_filename, "rb") as fp:
5254
log.info(f"Opened named pipe {named_pipe_filename}")
@@ -60,9 +62,16 @@ def process_named_pipe(named_pipe_filename):
6062
data_b64, ts, client = json.loads(line)
6163
data = base64.b64decode(data_b64)
6264

65+
# sequence number of the (24h) day from UNIX epoch helps us determine the
66+
# DB partition we are working with:
67+
day_seq = int(ts // (24 * 3600))
68+
if day_seq != last_day_seq:
69+
create_flow_table_partition(day_seq)
70+
last_day_seq = day_seq
71+
6372
try:
6473
export = parse_packet(data, templates)
65-
write_record(ts, client, export)
74+
write_record(ts, client, export, day_seq)
6675
except UnknownNetFlowVersion:
6776
log.warning("Unknown NetFlow version")
6877
continue
@@ -75,10 +84,22 @@ def process_named_pipe(named_pipe_filename):
7584
log.exception("Error writing line, skipping...")
7685

7786

87+
# Based on timestamp, make sure that the partition exists:
88+
def create_flow_table_partition(day_seq):
89+
day_start = day_seq * (24 * 3600)
90+
day_end = day_start + (24 * 3600)
91+
with get_db_cursor() as c:
92+
# "When creating a range partition, the lower bound specified with FROM is an inclusive bound, whereas
93+
# the upper bound specified with TO is an exclusive bound."
94+
# https://www.postgresql.org/docs/12/sql-createtable.html
95+
c.execute(f"CREATE UNLOGGED TABLE IF NOT EXISTS {DB_PREFIX}flows_{day_seq} PARTITION OF {DB_PREFIX}flows FOR VALUES FROM ({day_start}) TO ({day_end})")
96+
return day_seq
97+
98+
7899
last_record_seqs = {}
79100

80101

81-
def write_record(ts, client, export):
102+
def write_record(ts, client, export, day_seq):
82103
# {
83104
# "DST_AS": 0,
84105
# "SRC_AS": 0,
@@ -175,7 +196,7 @@ def _get_data(netflow_version, ts, client_ip, flows):
175196
data_iterator = _get_data(export.header.version, ts, client_ip, export.flows)
176197
psycopg2.extras.execute_values(
177198
c,
178-
f"INSERT INTO {DB_PREFIX}flows (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",
199+
f"INSERT INTO {DB_PREFIX}flows_{day_seq} (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",
179200
data_iterator,
180201
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
181202
page_size=100

0 commit comments

Comments
 (0)