Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions server/demo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
# Time interval for sending recceiver advertisments
#announceInterval = 15.0

# Interval in seconds between periodic status log lines (0 to disable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure about this, but ok.

And also update recceiver-full.conf.

#statusInterval = 60.0

# TCP port to expose Prometheus metrics on (0 or absent to disable).
# Requires prometheus_client: pip install recceiver[metrics]
#metricsPort = 0

# Idle Timeout for TCP connections.
#tcptimeout = 15.0

Expand Down Expand Up @@ -126,5 +133,6 @@
# Number of times to retry polling before giving up. Default is 10.
#pushMaxRetries = 10

# Whether to retry polling indefinitely until success. Default is True.
#pushAlwaysRetry = True
# Whether to retry polling indefinitely until success. Default is False.
# Enabling this holds the global CF commit lock until CF recovers; use with caution.
#pushAlwaysRetry = False
1 change: 1 addition & 0 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"requests",
"twisted",
]
optional-dependencies.metrics = [ "prometheus-client" ]
Comment thread
jacomago marked this conversation as resolved.
optional-dependencies.test = [ "pytest", "testcontainers>=4" ]
urls.Repository = "https://github.com/ChannelFinder/recsync"

Expand Down
31 changes: 30 additions & 1 deletion server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import random

from twisted.application import service
from twisted.internet import defer, pollreactor
from twisted.internet import defer, pollreactor, task
from twisted.internet.error import CannotListenError
from twisted.python import log, usage
from zope.interface import implementer

from twisted import plugin

from . import metrics
from .announce import Announcer
from .processors import ProcessorController
from .recast import CastFactory
Expand Down Expand Up @@ -51,6 +52,8 @@ def __init__(self, config):
self.addrlist = []

self.port = int(portn or "0")
self.statusInterval = float(config.get("statusInterval", "60.0"))
self.metricsPort = int(config.get("metricsPort", "0"))

for addr in config.get("addrlist", "").split(","):
if not addr:
Expand Down Expand Up @@ -110,9 +113,35 @@ def privilegedStartService(self):
# This will start up plugin Processors
service.MultiService.privilegedStartService(self)

if self.metricsPort > 0:
if metrics.available:
self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind)
_log.info("Prometheus metrics available on port %d", self.metricsPort)
else:
_log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled")

metrics.connections_limit.set(self.tcpFactory.maxActive)

if self.statusInterval > 0:
self._statusLoop = task.LoopingCall(self._logStatus)
self._statusLoop.start(self.statusInterval, now=False)

def _logStatus(self):
metrics.connections_active.set(self.tcpFactory.NActive)
metrics.connections_waiting.set(len(self.tcpFactory.Wait))
_log.info(
"status: connections active=%d/%d queued=%d",
self.tcpFactory.NActive,
self.tcpFactory.maxActive,
len(self.tcpFactory.Wait),
)

def stopService(self):
_log.info("Stopping RecService")

if hasattr(self, "_statusLoop") and self._statusLoop.running:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I don't love using hasattr. Why not just initialise _statusLoop to None?

self._statusLoop.stop()

# This will stop plugin Processors
D2 = defer.maybeDeferred(service.MultiService.stopService, self)

Expand Down
75 changes: 61 additions & 14 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from channelfinder import ChannelFinderClient
from requests import ConnectionError, RequestException
from twisted.application import service
from twisted.internet import defer
from twisted.internet import defer, task
from twisted.internet.defer import DeferredLock
from twisted.internet.threads import deferToThread
from zope.interface import implementer

from . import interfaces
from . import interfaces, metrics
from .interfaces import CommitTransaction
from .processors import ConfigAdapter

Expand Down Expand Up @@ -59,7 +59,7 @@ class CFConfig:
cf_password: Optional[str] = None
verify_ssl: Optional[bool] = None
push_max_retries: int = 10
push_always_retry: bool = True
push_always_retry: bool = False

@classmethod
def loads(cls, conf: ConfigAdapter) -> "CFConfig":
Expand All @@ -86,7 +86,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig":
cf_password=conf.get("cfPassword"),
verify_ssl=conf.getboolean("verifySSL"),
push_max_retries=conf.getint("pushMaxRetries", 10),
push_always_retry=conf.getboolean("pushAlwaysRetry", True),
push_always_retry=conf.getboolean("pushAlwaysRetry", False),
)

def __repr__(self) -> str:
Expand Down Expand Up @@ -298,6 +298,14 @@ def startService(self):
finally:
self.lock.release()

self._statusLoop = task.LoopingCall(self._logStatus)
self._statusLoop.start(60.0, now=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look very configurable to me...


def _logStatus(self):
metrics.known_iocs.set(len(self.iocs))
metrics.tracked_channels.set(len(self.channel_ioc_ids))
_log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids))

def _start_service_with_lock(self):
"""Start the CFProcessor service with lock held.

Expand Down Expand Up @@ -367,25 +375,38 @@ def _start_service_with_lock(self):
raise
else:
if self.cf_config.clean_on_start:
self.clean_service()
_log.info("CF Clean: scheduling background startup sweep")
from twisted.internet import reactor

reactor.callLater(0, self._start_background_clean)

def stopService(self):
"""Stop the CFProcessor service.

Overridden method of service.Service.stopService()
"""
_log.info("CF_STOP")
if hasattr(self, "_statusLoop") and self._statusLoop.running:
self._statusLoop.stop()
service.Service.stopService(self)
return self.lock.run(self._stop_service_with_lock)

def _stop_service_with_lock(self):
"""Stop the CFProcessor service with lock held.

If clean_on_stop is enabled, mark all channels as inactive.
The sweep runs in a thread so the reactor stays live during shutdown.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want reactor live during any type of clean do we?

The lock is held throughout, preventing new commits from interleaving.
"""
if self.cf_config.clean_on_stop:
self.clean_service()
_log.info("CF_STOP with lock")
if self.cf_config.clean_on_stop:
return deferToThread(self.clean_service)

def _start_background_clean(self):
_log.info("CF Clean: background startup sweep beginning")
deferToThread(self.clean_service).addErrback(
lambda err: _log.error("CF Clean background sweep failed: %s", err)
)

# @defer.inlineCallbacks # Twisted v16 does not support cancellation!
def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred:
Expand Down Expand Up @@ -576,7 +597,15 @@ def _commit_with_thread(self, transaction: CommitTransaction):
ioc_name = transaction.client_infos.get("IOCNAME")
if not ioc_name:
ioc_name = str(port)
_log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port)
_log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port)
if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition seems odd to me. Does this ever occur outside of the conditional right above it?

_log.warning(
"IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — "
"iocid will change on every reconnect, causing stale channels in CF; "
"configure a stable iocName via reccaster",
host,
ioc_name,
)

owner = (
transaction.client_infos.get(self.cf_config.env_owner_variable)
Expand Down Expand Up @@ -608,6 +637,12 @@ def _commit_with_thread(self, transaction: CommitTransaction):

record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info)
self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name)
if not transaction.connected and ioc_info.ioc_id not in self.iocs:
_log.warning(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we exit here?

"IOC at %s:%d disconnected before completing initial upload (0 channels registered)",
host,
port,
)
poll_success = push_to_cf(_update_channelfinder, self, record_info_by_name, records_to_delete, ioc_info)
if not poll_success:
raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}")
Expand Down Expand Up @@ -1120,9 +1155,10 @@ def _update_channelfinder(
new_channels = set(record_info_by_name.keys())
iocid = ioc_info.ioc_id

if iocid not in iocs:
if iocid not in iocs and record_info_by_name:
# Disconnect-before-upload is already logged in _commit_with_thread.
_log.warning(
"IOC %s did not send an initial transaction to join IOC list (%d IOCs known)",
"IOC %s committed update without prior initial transaction (%d IOCs known)",
ioc_info,
len(iocs),
)
Expand Down Expand Up @@ -1332,19 +1368,30 @@ def push_to_cf(
records_to_delete: The records to delete.
ioc_info: The IOC information.
"""
_log.info("Pushing updates for %s begins...", ioc_info)
_log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name))
count = 0
sleep = 1.0
while processor.cf_config.push_always_retry or count < processor.cf_config.push_max_retries:
if not processor.running:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to have a test for this?

_log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count)
return False
count += 1
t0 = time.monotonic()
try:
update_method(processor, record_info_by_name, records_to_delete, ioc_info)
elapsed = time.monotonic() - t0
metrics.cf_commit_duration_seconds.observe(elapsed)
metrics.cf_commits_total.labels(result="success").inc()
_log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name))
return True
except RequestException as e:
_log.error("ChannelFinder update failed: %s", e)
elapsed = time.monotonic() - t0
metrics.cf_commits_total.labels(result="failed").inc()
_log.error("CF push failed after %.2fs (attempt %d): %s: %s", elapsed, count, ioc_info, e)
retry_seconds = min(60, sleep)
_log.info("ChannelFinder update retry in %s seconds", retry_seconds)
_log.info("CF push retry in %s seconds", retry_seconds)
time.sleep(retry_seconds)
sleep *= 1.5
_log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count)
metrics.cf_commits_total.labels(result="cancelled").inc()
_log.error("CF push gave up after %d attempts: %s", count, ioc_info)
return False
89 changes: 89 additions & 0 deletions server/recceiver/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-

import logging

_log = logging.getLogger(__name__)

try:
from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest
from twisted.web.resource import Resource
from twisted.web.server import Site

_registry = CollectorRegistry(auto_describe=True)

connections_active = Gauge(
"recceiver_connections_active",
"Active uploading IOC connections",
registry=_registry,
)
connections_waiting = Gauge(
"recceiver_connections_waiting",
"IOC connections waiting for an upload slot",
registry=_registry,
)
connections_limit = Gauge(
"recceiver_connections_limit",
"Maximum concurrent active connections (maxActive)",
registry=_registry,
)
known_iocs = Gauge(
"recceiver_known_iocs",
"IOCs with channels currently registered in CF",
registry=_registry,
)
tracked_channels = Gauge(
"recceiver_tracked_channels",
"Unique channel names tracked by the CF processor",
registry=_registry,
)
cf_commits_total = Counter(
"recceiver_cf_commits_total",
"CF push attempts by result",
["result"],
registry=_registry,
)
cf_commit_duration_seconds = Histogram(
"recceiver_cf_commit_duration_seconds",
"CF push duration in seconds",
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
registry=_registry,
)

Comment thread
jacomago marked this conversation as resolved.
class _MetricsResource(Resource):
isLeaf = True

def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode())
return generate_latest(_registry)

def make_site():
return Site(_MetricsResource())

available = True

except ImportError:
available = False

class _Noop:
def set(self, v=None):

Check failure on line 68 in server/recceiver/metrics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_recsync&issues=AZ4HHuYH5eR_ztK153t_&open=AZ4HHuYH5eR_ztK153t_&pullRequest=159
pass

def inc(self, v=1):

Check failure on line 71 in server/recceiver/metrics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_recsync&issues=AZ4HHuYH5eR_ztK153uA&open=AZ4HHuYH5eR_ztK153uA&pullRequest=159
pass

def labels(self, **kw):

Check warning on line 74 in server/recceiver/metrics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused function parameter "kw".

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_recsync&issues=AZ4HHuYH5eR_ztK153uB&open=AZ4HHuYH5eR_ztK153uB&pullRequest=159
return self

def observe(self, v):

Check failure on line 77 in server/recceiver/metrics.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_recsync&issues=AZ4HHuYH5eR_ztK153uC&open=AZ4HHuYH5eR_ztK153uC&pullRequest=159
pass

connections_active = _Noop()
connections_waiting = _Noop()
connections_limit = _Noop()
known_iocs = _Noop()
tracked_channels = _Noop()
cf_commits_total = _Noop()
cf_commit_duration_seconds = _Noop()

def make_site():
raise RuntimeError("prometheus_client is not installed")
2 changes: 1 addition & 1 deletion server/tests/unit/test_cfstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_push_max_retries_from_env(self):
def test_default_push_always_retry(self):
adapter = make_adapter()
config = CFConfig.loads(adapter)
assert config.push_always_retry is True
assert config.push_always_retry is False

def test_alias_disabled_by_default(self):
adapter = make_adapter()
Expand Down
Loading