|
14 | 14 | import requests |
15 | 15 |
|
16 | 16 | from grafoleancollector import Collector, send_results_to_grafolean |
17 | | -from dbutils import get_db_cursor, DB_PREFIX, S_PER_PARTITION, LEAVE_N_PAST_PARTITIONS |
| 17 | +from dbutils import get_db_cursor, DB_PREFIX, LEAVE_N_PAST_DAYS |
18 | 18 | from lookup import PROTOCOLS, DIRECTION_INGRESS, DIRECTION_EGRESS |
19 | 19 |
|
20 | 20 | logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s', |
@@ -76,26 +76,11 @@ def _save_current_max_ts(job_id, max_ts): |
76 | 76 | c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_ts) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_ts = %s;", (job_id, max_ts, max_ts)) |
77 | 77 |
|
78 | 78 |
|
79 | | -def job_maint_remove_old_partitions(*args, **kwargs): |
| 79 | +def job_maint_remove_old_data(*args, **kwargs): |
| 80 | + log.info("MAINT: Maintenance started - removing old data") |
80 | 81 | with get_db_cursor() as c: |
81 | | - log.info("MAINT: Maintenance started - removing old partitions") |
82 | | - today_seq = int(time.time() // S_PER_PARTITION) |
83 | | - c.execute(f"SELECT tablename FROM pg_tables WHERE schemaname = 'public' AND tablename LIKE '{DB_PREFIX}flows_%';") |
84 | | - for tablename, in c.fetchall(): |
85 | | - m = re.match(f'^{DB_PREFIX}flows_([0-9]+)$', tablename) |
86 | | - if not m: |
87 | | - log.warning(f"MAINT: Table {tablename} does not match regex, skipping") |
88 | | - continue |
89 | | - day_seq = int(m.group(1)) |
90 | | - if day_seq > today_seq: |
91 | | - log.warning(f"MAINT: CAREFUL! Table {tablename} marks a future day (today is {today_seq}); this should never happen! Skipping.") |
92 | | - continue |
93 | | - if day_seq < today_seq - LEAVE_N_PAST_PARTITIONS: |
94 | | - log.info(f"MAINT: Removing old data: {tablename} (today is {today_seq})") |
95 | | - c.execute(f"DROP TABLE {tablename};") |
96 | | - else: |
97 | | - log.info(f"MAINT: Leaving {tablename} (today is {today_seq})") |
98 | | - log.info("MAINT: Maintenance finished (removing old partitions).") |
| 82 | + c.execute(f"SELECT drop_chunks(INTERVAL '{LEAVE_N_PAST_DAYS} days', '{DB_PREFIX}flows2');") |
| 83 | + log.info("MAINT: Maintenance finished (removing old data).") |
99 | 84 |
|
100 | 85 |
|
101 | 86 | def job_maint_suggest_entities(*args, **job_params): |
@@ -149,9 +134,9 @@ def job_maint_suggest_entities(*args, **job_params): |
149 | 134 | class NetFlowBot(Collector): |
150 | 135 |
|
151 | 136 | def jobs(self): |
152 | | - # remove old partitions: |
| 137 | + # remove old data: |
153 | 138 | job_id = 'maint/remove_old_data' |
154 | | - yield job_id, [3600], job_maint_remove_old_partitions, {}, 50 |
| 139 | + yield job_id, [3600], job_maint_remove_old_data, {}, 50 |
155 | 140 |
|
156 | 141 | # suggest new netflow exporters / entities: |
157 | 142 | job_id = f'maint/suggest_entities' |
|
0 commit comments