Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CXXFLAGS := $(CXXFLAGS) -DMAVLINK_SIGNING_TIMESTAMP_LIMIT=600
LIBS := -ltdb -lssl -lcrypto

# Source files
SOURCES := supportproxy.cpp mavlink.cpp util.cpp keydb.cpp conntdb.cpp websocket.cpp
SOURCES := supportproxy.cpp mavlink.cpp util.cpp keydb.cpp conntdb.cpp tlog.cpp cleanup.cpp websocket.cpp
OBJECTS := $(SOURCES:.cpp=.o)
TARGET := supportproxy

Expand Down Expand Up @@ -73,11 +73,13 @@ mavlink.o: mavlink.cpp mavlink.h $(MAVLINK_DIR)/protocol.h

# Dependencies. mavlink.h includes keydb.h, so any object that pulls in
# mavlink.h transitively depends on keydb.h too.
supportproxy.o: supportproxy.cpp mavlink.h util.h keydb.h conntdb.h websocket.h
supportproxy.o: supportproxy.cpp mavlink.h util.h keydb.h conntdb.h tlog.h cleanup.h websocket.h
mavlink.o: mavlink.cpp mavlink.h keydb.h $(MAVLINK_DIR)/protocol.h
util.o: util.cpp util.h
keydb.o: keydb.cpp keydb.h
conntdb.o: conntdb.cpp conntdb.h
tlog.o: tlog.cpp tlog.h
cleanup.o: cleanup.cpp cleanup.h keydb.h
websocket.o: websocket.cpp websocket.h util.h

# Testing
Expand Down
168 changes: 168 additions & 0 deletions cleanup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
hourly tlog cleanup worker
*/
#include "cleanup.h"
#include "keydb.h"

#include <dirent.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <tdb.h>

namespace {

struct PassCtx {
const char *base_dir;
time_t now;
};

static bool ends_with_tlog(const char *name)
{
size_t n = strlen(name);
return n > 5 && strcmp(name + n - 5, ".tlog") == 0;
}

static void cleanup_for_port2(uint32_t port2, double retention_days,
const char *base_dir, time_t now)
{
if (retention_days <= 0.0) {
return; // 0 = keep forever
}
double cutoff_age_s = retention_days * 86400.0;

char port_dir[768];
snprintf(port_dir, sizeof(port_dir), "%s/%u", base_dir, port2);

DIR *d = opendir(port_dir);
if (d == nullptr) {
// missing dir is fine: nothing has been logged for this port yet
return;
}
struct dirent *ent;
while ((ent = readdir(d)) != nullptr) {
if (ent->d_name[0] == '.') {
continue;
}
char date_dir[1024];
snprintf(date_dir, sizeof(date_dir), "%s/%s", port_dir, ent->d_name);

struct stat st;
if (stat(date_dir, &st) != 0 || !S_ISDIR(st.st_mode)) {
continue;
}

DIR *dd = opendir(date_dir);
if (dd == nullptr) {
continue;
}
unsigned remaining = 0;
struct dirent *fent;
while ((fent = readdir(dd)) != nullptr) {
if (fent->d_name[0] == '.') {
continue;
}
char fpath[1280];
snprintf(fpath, sizeof(fpath), "%s/%s", date_dir, fent->d_name);
if (ends_with_tlog(fent->d_name)) {
struct stat fst;
if (stat(fpath, &fst) == 0) {
double age = double(now - fst.st_mtime);
if (age > cutoff_age_s) {
if (unlink(fpath) == 0) {
::printf("tlog cleanup: removed %s (age %.0fs > %.0fs)\n",
fpath, age, cutoff_age_s);
continue;
}
}
}
}
remaining++;
}
closedir(dd);

if (remaining == 0) {
if (rmdir(date_dir) == 0) {
::printf("tlog cleanup: removed empty %s\n", date_dir);
}
}
}
closedir(d);
}

static int traverse_cb(struct tdb_context *db, TDB_DATA key, TDB_DATA data, void *ptr)
{
(void)db;
auto *ctx = static_cast<PassCtx *>(ptr);
if (key.dsize != sizeof(int) || data.dsize < KEYENTRY_MIN_SIZE) {
return 0;
}
int port2 = 0;
memcpy(&port2, key.dptr, sizeof(int));
if (port2 <= 0) {
return 0;
}
struct KeyEntry k {};
size_t copy = data.dsize < sizeof(KeyEntry) ? data.dsize : sizeof(KeyEntry);
memcpy(&k, data.dptr, copy);
if (k.magic != KEY_MAGIC) {
return 0;
}
cleanup_for_port2(uint32_t(port2), double(k.tlog_retention_days),
ctx->base_dir, ctx->now);
return 0;
}

static double cleanup_interval_seconds()
{
const char *env = getenv("SUPPORTPROXY_CLEANUP_INTERVAL");
if (env != nullptr && *env != '\0') {
char *endp = nullptr;
double v = strtod(env, &endp);
if (endp != env && v > 0.0) {
return v;
}
}
return 3600.0;
}

static void sleep_seconds(double s)
{
if (s <= 0.0) {
return;
}
struct timespec ts;
ts.tv_sec = time_t(s);
ts.tv_nsec = long((s - double(ts.tv_sec)) * 1e9);
nanosleep(&ts, nullptr);
}

} // namespace

void tlog_cleanup_once(const char *base_dir)
{
auto *db = db_open();
if (db == nullptr) {
return;
}
PassCtx ctx { base_dir, time(nullptr) };
tdb_traverse(db, traverse_cb, &ctx);
db_close(db);
}

void tlog_cleanup_loop(const char *base_dir)
{
// Run an immediate pass on startup so a fresh restart still cleans up.
tlog_cleanup_once(base_dir);
double interval = cleanup_interval_seconds();
while (true) {
sleep_seconds(interval);
tlog_cleanup_once(base_dir);
}
}
20 changes: 20 additions & 0 deletions cleanup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
hourly tlog cleanup worker
*/
#pragma once

/*
Run forever: every SUPPORTPROXY_CLEANUP_INTERVAL seconds (default 3600,
env var override accepts a float for tests), traverse keys.tdb and
remove .tlog files in logs/<port2>/ whose age in seconds exceeds
tlog_retention_days * 86400. Removes empty date subdirs as a follow-up.
Entries with retention_days == 0.0 are skipped (keep forever). Records
on disk for entries no longer in keys.tdb are NOT auto-deleted.
*/
void tlog_cleanup_loop(const char *base_dir = "logs");

/*
Run a single cleanup pass synchronously and return. Exposed for the
test suite so it can drive cleanup without the sleep loop.
*/
void tlog_cleanup_once(const char *base_dir = "logs");
9 changes: 9 additions & 0 deletions conntdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
// ever decide to drop a field (we shouldn't).
#define CONNENTRY_MIN_SIZE 64

// flag bits on ConnEntry.flags
//
// CONN_FLAG_DROP_REQUESTED: the web admin has asked the per-port-pair
// child to drop this specific connection. The webadmin sets the bit
// in TDB and sends SIGUSR1 to the child; the child's main_loop scans
// for entries matching its port2 with this bit set, closes the
// matching slot, and deletes the record.
#define CONN_FLAG_DROP_REQUESTED (1u << 0)

struct ConnEntry {
uint64_t magic; // CONN_MAGIC
uint64_t connected_at; // unix seconds
Expand Down
91 changes: 91 additions & 0 deletions conntdb_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@
"""
import errno
import os
import signal
import socket
import struct
import time

import keydb_lib

# Name reported by /proc/<pid>/comm for a live supportproxy process.
# request_drop cross-checks this so we don't accidentally signal a PID
# that died and got recycled by an unrelated process.
SUPPORTPROXY_COMM = 'supportproxy'

# ConnEntry.flags bits — keep in sync with conntdb.h.
CONN_FLAG_DROP_REQUESTED = 1 << 0

CONN_FILE = 'connections.tdb'
CONN_MAGIC = 0x436f6e6e45424553 # "ConnEBES"

Expand Down Expand Up @@ -153,3 +162,85 @@ def list_active(path, **kw):
out = list(iter_active(path, **kw))
out.sort(key=lambda c: (c.port2, c.conn_index))
return out


def _proc_comm(pid):
"""Read /proc/<pid>/comm; return None on any error (file missing,
permission denied, etc). Module-level so tests can monkeypatch."""
try:
with open('/proc/%d/comm' % pid) as f:
return f.read().strip()
except OSError:
return None


def _flip_drop_flag(path, port2, conn_index):
"""Set CONN_FLAG_DROP_REQUESTED on the (port2, conn_index) record.

Returns the PID stored in that record, or None when the record is
missing. Caller is responsible for signalling the PID afterwards.
"""
if not os.path.exists(path):
return None
try:
db = keydb_lib.open_db(path)
except OSError:
return None
pid = None
try:
db.transaction_start()
try:
key = struct.pack(KEY_FORMAT, port2, conn_index)
v = db.get(key)
if v is None or len(v) < CONNENTRY_MIN_SIZE:
return None
ce = ConnEntry.unpack(v)
if ce.magic != CONN_MAGIC:
return None
ce.flags |= CONN_FLAG_DROP_REQUESTED
pid = ce.pid if ce.pid > 0 else None
# rebuild record bytes preserving any forward-compat tail
tail = v[CONNENTRY_CURRENT_SIZE:] if len(v) > CONNENTRY_CURRENT_SIZE else b''
new_body = struct.pack(
PACK_FORMAT,
ce.magic, ce.connected_at, ce.last_update,
ce.port2, ce.conn_index, ce.pid,
ce.rx_msgs, ce.tx_msgs,
ce.peer_ip_be, ce.peer_port_be,
ce.transport, ce.is_user,
ce.flags, 0, # _pad
)
import tdb as _tdb
db.store(key, new_body + tail, _tdb.REPLACE)
db.transaction_prepare_commit()
db.transaction_commit()
except Exception:
db.transaction_cancel()
raise
finally:
db.close()
return pid


def request_drop(path, port2, conn_index, exec_name=SUPPORTPROXY_COMM):
"""Ask the per-port-pair child to drop a single connection.

Sets CONN_FLAG_DROP_REQUESTED on the (port2, conn_index) record so
the child can find it after the signal, then sends SIGUSR1 to the
child PID (validated against /proc/<pid>/comm so we don't hit a
recycled PID). Returns True if the signal was sent.

A user-side row (conn_index=0) ends the whole session because the
child has nothing left to proxy without conn1; an engineer row
(conn_index>=1) drops just that engineer slot.
"""
pid = _flip_drop_flag(path, port2, conn_index)
if pid is None:
return False
if _proc_comm(pid) != exec_name:
return False
try:
os.kill(pid, signal.SIGUSR1)
except OSError:
return False
return True
3 changes: 3 additions & 0 deletions keydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
*/
#define KEY_FLAG_ADMIN (1u << 0)
#define KEY_FLAG_BIDI_SIGN (1u << 1) // require signed MAVLink on the user side too
#define KEY_FLAG_TLOG (1u << 2) // record per-connection MAVProxy-format tlogs

struct KeyEntry {
uint64_t magic;
Expand All @@ -42,6 +43,8 @@ struct KeyEntry {
uint32_t count2;
char name[32];
uint32_t flags;
float tlog_retention_days; // 0.0 = forever; fractional values allowed for tests
uint32_t reserved[16];
};

/*
Expand Down
16 changes: 16 additions & 0 deletions keydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def main():
'setname', 'setpass', 'setport1',
'initialise', 'resettimestamp',
'setflag', 'clearflag', 'flags',
'setretention',
'stats'],
help="action to perform")
parser.add_argument("args", default=[], nargs=argparse.REMAINDER)
Expand Down Expand Up @@ -103,6 +104,21 @@ def main():
on = ke.flag_names()
print("flags=0x%x %s" % (ke.flags, ','.join(on) if on else '(none)'))

elif args.action == "setretention":
_expect(args.args, 2,
"keydb.py setretention PORT2 DAYS "
"(float; 0 = keep forever)")
try:
days = float(args.args[1])
except ValueError:
raise CLIError("retention DAYS must be a number, got %r"
% args.args[1])
ke = keydb_lib.set_tlog_retention(db, int(args.args[0]), days)
if days == 0.0:
print("Set tlog retention=0 (keep forever) for %s" % ke)
else:
print("Set tlog retention=%.4g days for %s" % (days, ke))

elif args.action == "stats":
# Live-connection stats from connections.tdb (sibling of
# keys.tdb), joined with each entry's name from this DB.
Expand Down
Loading
Loading