-
Notifications
You must be signed in to change notification settings - Fork 31
feat(server): observability improvements and startup/retry fixes #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0fcdbbe
3208c8a
a61e995
466f9ec
d220c5a
72ffda0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,13 +4,14 @@ | |
| import random | ||
|
|
||
| from twisted.application import service | ||
| from twisted.internet import defer, pollreactor | ||
| from twisted.internet import defer, pollreactor, task | ||
| from twisted.internet.error import CannotListenError | ||
| from twisted.python import log, usage | ||
| from zope.interface import implementer | ||
|
|
||
| from twisted import plugin | ||
|
|
||
| from . import metrics | ||
| from .announce import Announcer | ||
| from .processors import ProcessorController | ||
| from .recast import CastFactory | ||
|
|
@@ -51,6 +52,8 @@ def __init__(self, config): | |
| self.addrlist = [] | ||
|
|
||
| self.port = int(portn or "0") | ||
| self.statusInterval = float(config.get("statusInterval", "60.0")) | ||
| self.metricsPort = int(config.get("metricsPort", "0")) | ||
|
|
||
| for addr in config.get("addrlist", "").split(","): | ||
| if not addr: | ||
|
|
@@ -110,9 +113,35 @@ def privilegedStartService(self): | |
| # This will start up plugin Processors | ||
| service.MultiService.privilegedStartService(self) | ||
|
|
||
| if self.metricsPort > 0: | ||
| if metrics.available: | ||
| self.reactor.listenTCP(self.metricsPort, metrics.make_site(), interface=self.bind) | ||
| _log.info("Prometheus metrics available on port %d", self.metricsPort) | ||
| else: | ||
| _log.warning("metricsPort configured but prometheus_client is not installed; metrics disabled") | ||
|
|
||
| metrics.connections_limit.set(self.tcpFactory.maxActive) | ||
|
|
||
| if self.statusInterval > 0: | ||
| self._statusLoop = task.LoopingCall(self._logStatus) | ||
| self._statusLoop.start(self.statusInterval, now=False) | ||
|
|
||
| def _logStatus(self): | ||
| metrics.connections_active.set(self.tcpFactory.NActive) | ||
| metrics.connections_waiting.set(len(self.tcpFactory.Wait)) | ||
| _log.info( | ||
| "status: connections active=%d/%d queued=%d", | ||
| self.tcpFactory.NActive, | ||
| self.tcpFactory.maxActive, | ||
| len(self.tcpFactory.Wait), | ||
| ) | ||
|
|
||
| def stopService(self): | ||
| _log.info("Stopping RecService") | ||
|
|
||
| if hasattr(self, "_statusLoop") and self._statusLoop.running: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh, I don't love using |
||
| self._statusLoop.stop() | ||
|
|
||
| # This will stop plugin Processors | ||
| D2 = defer.maybeDeferred(service.MultiService.stopService, self) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,12 +12,12 @@ | |
| from channelfinder import ChannelFinderClient | ||
| from requests import ConnectionError, RequestException | ||
| from twisted.application import service | ||
| from twisted.internet import defer | ||
| from twisted.internet import defer, task | ||
| from twisted.internet.defer import DeferredLock | ||
| from twisted.internet.threads import deferToThread | ||
| from zope.interface import implementer | ||
|
|
||
| from . import interfaces | ||
| from . import interfaces, metrics | ||
| from .interfaces import CommitTransaction | ||
| from .processors import ConfigAdapter | ||
|
|
||
|
|
@@ -59,7 +59,7 @@ class CFConfig: | |
| cf_password: Optional[str] = None | ||
| verify_ssl: Optional[bool] = None | ||
| push_max_retries: int = 10 | ||
| push_always_retry: bool = True | ||
| push_always_retry: bool = False | ||
|
|
||
| @classmethod | ||
| def loads(cls, conf: ConfigAdapter) -> "CFConfig": | ||
|
|
@@ -86,7 +86,7 @@ def loads(cls, conf: ConfigAdapter) -> "CFConfig": | |
| cf_password=conf.get("cfPassword"), | ||
| verify_ssl=conf.getboolean("verifySSL"), | ||
| push_max_retries=conf.getint("pushMaxRetries", 10), | ||
| push_always_retry=conf.getboolean("pushAlwaysRetry", True), | ||
| push_always_retry=conf.getboolean("pushAlwaysRetry", False), | ||
| ) | ||
|
|
||
| def __repr__(self) -> str: | ||
|
|
@@ -298,6 +298,14 @@ def startService(self): | |
| finally: | ||
| self.lock.release() | ||
|
|
||
| self._statusLoop = task.LoopingCall(self._logStatus) | ||
| self._statusLoop.start(60.0, now=False) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't look very configurable to me... |
||
|
|
||
| def _logStatus(self): | ||
| metrics.known_iocs.set(len(self.iocs)) | ||
| metrics.tracked_channels.set(len(self.channel_ioc_ids)) | ||
| _log.info("CF status: known_iocs=%d tracked_channels=%d", len(self.iocs), len(self.channel_ioc_ids)) | ||
|
|
||
| def _start_service_with_lock(self): | ||
| """Start the CFProcessor service with lock held. | ||
|
|
||
|
|
@@ -367,25 +375,38 @@ def _start_service_with_lock(self): | |
| raise | ||
| else: | ||
| if self.cf_config.clean_on_start: | ||
| self.clean_service() | ||
| _log.info("CF Clean: scheduling background startup sweep") | ||
| from twisted.internet import reactor | ||
|
|
||
| reactor.callLater(0, self._start_background_clean) | ||
|
|
||
| def stopService(self): | ||
| """Stop the CFProcessor service. | ||
|
|
||
| Overridden method of service.Service.stopService() | ||
| """ | ||
| _log.info("CF_STOP") | ||
| if hasattr(self, "_statusLoop") and self._statusLoop.running: | ||
| self._statusLoop.stop() | ||
| service.Service.stopService(self) | ||
| return self.lock.run(self._stop_service_with_lock) | ||
|
|
||
| def _stop_service_with_lock(self): | ||
| """Stop the CFProcessor service with lock held. | ||
|
|
||
| If clean_on_stop is enabled, mark all channels as inactive. | ||
| The sweep runs in a thread so the reactor stays live during shutdown. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want reactor live during any type of clean do we? |
||
| The lock is held throughout, preventing new commits from interleaving. | ||
| """ | ||
| if self.cf_config.clean_on_stop: | ||
| self.clean_service() | ||
| _log.info("CF_STOP with lock") | ||
| if self.cf_config.clean_on_stop: | ||
| return deferToThread(self.clean_service) | ||
|
|
||
| def _start_background_clean(self): | ||
| _log.info("CF Clean: background startup sweep beginning") | ||
| deferToThread(self.clean_service).addErrback( | ||
| lambda err: _log.error("CF Clean background sweep failed: %s", err) | ||
| ) | ||
|
|
||
| # @defer.inlineCallbacks # Twisted v16 does not support cancellation! | ||
| def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: | ||
|
|
@@ -576,7 +597,15 @@ def _commit_with_thread(self, transaction: CommitTransaction): | |
| ioc_name = transaction.client_infos.get("IOCNAME") | ||
| if not ioc_name: | ||
| ioc_name = str(port) | ||
| _log.debug("IOC at %s:%d did not send IOCNAME; using port as iocName", host, port) | ||
| _log.debug("IOC at %s:%d has no iocName; using source port as iocName", host, port) | ||
| if ioc_name.isdigit() and 1024 <= int(ioc_name) <= 65535: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This condition seems odd to me. Does this ever occur outside of the conditional right above it? |
||
| _log.warning( | ||
| "IOC at %s has numeric iocName '%s' (looks like an ephemeral port) — " | ||
| "iocid will change on every reconnect, causing stale channels in CF; " | ||
| "configure a stable iocName via reccaster", | ||
| host, | ||
| ioc_name, | ||
| ) | ||
|
|
||
| owner = ( | ||
| transaction.client_infos.get(self.cf_config.env_owner_variable) | ||
|
|
@@ -608,6 +637,12 @@ def _commit_with_thread(self, transaction: CommitTransaction): | |
|
|
||
| record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) | ||
| self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) | ||
| if not transaction.connected and ioc_info.ioc_id not in self.iocs: | ||
| _log.warning( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we exit here? |
||
| "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", | ||
| host, | ||
| port, | ||
| ) | ||
| poll_success = push_to_cf(_update_channelfinder, self, record_info_by_name, records_to_delete, ioc_info) | ||
| if not poll_success: | ||
| raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") | ||
|
|
@@ -1120,9 +1155,10 @@ def _update_channelfinder( | |
| new_channels = set(record_info_by_name.keys()) | ||
| iocid = ioc_info.ioc_id | ||
|
|
||
| if iocid not in iocs: | ||
| if iocid not in iocs and record_info_by_name: | ||
| # Disconnect-before-upload is already logged in _commit_with_thread. | ||
| _log.warning( | ||
| "IOC %s did not send an initial transaction to join IOC list (%d IOCs known)", | ||
| "IOC %s committed update without prior initial transaction (%d IOCs known)", | ||
| ioc_info, | ||
| len(iocs), | ||
| ) | ||
|
|
@@ -1332,19 +1368,30 @@ def push_to_cf( | |
| records_to_delete: The records to delete. | ||
| ioc_info: The IOC information. | ||
| """ | ||
| _log.info("Pushing updates for %s begins...", ioc_info) | ||
| _log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) | ||
| count = 0 | ||
| sleep = 1.0 | ||
| while processor.cf_config.push_always_retry or count < processor.cf_config.push_max_retries: | ||
| if not processor.running: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to have a test for this? |
||
| _log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) | ||
| return False | ||
| count += 1 | ||
| t0 = time.monotonic() | ||
| try: | ||
| update_method(processor, record_info_by_name, records_to_delete, ioc_info) | ||
| elapsed = time.monotonic() - t0 | ||
| metrics.cf_commit_duration_seconds.observe(elapsed) | ||
| metrics.cf_commits_total.labels(result="success").inc() | ||
| _log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) | ||
| return True | ||
| except RequestException as e: | ||
| _log.error("ChannelFinder update failed: %s", e) | ||
| elapsed = time.monotonic() - t0 | ||
| metrics.cf_commits_total.labels(result="failed").inc() | ||
| _log.error("CF push failed after %.2fs (attempt %d): %s: %s", elapsed, count, ioc_info, e) | ||
| retry_seconds = min(60, sleep) | ||
| _log.info("ChannelFinder update retry in %s seconds", retry_seconds) | ||
| _log.info("CF push retry in %s seconds", retry_seconds) | ||
| time.sleep(retry_seconds) | ||
| sleep *= 1.5 | ||
| _log.error("Pushing updates for %s complete, failed after %d attempts", ioc_info, count) | ||
| metrics.cf_commits_total.labels(result="cancelled").inc() | ||
| _log.error("CF push gave up after %d attempts: %s", count, ioc_info) | ||
| return False | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| import logging | ||
|
|
||
| _log = logging.getLogger(__name__) | ||
|
|
||
| try: | ||
| from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Counter, Gauge, Histogram, generate_latest | ||
| from twisted.web.resource import Resource | ||
| from twisted.web.server import Site | ||
|
|
||
| _registry = CollectorRegistry(auto_describe=True) | ||
|
|
||
| connections_active = Gauge( | ||
| "recceiver_connections_active", | ||
| "Active uploading IOC connections", | ||
| registry=_registry, | ||
| ) | ||
| connections_waiting = Gauge( | ||
| "recceiver_connections_waiting", | ||
| "IOC connections waiting for an upload slot", | ||
| registry=_registry, | ||
| ) | ||
| connections_limit = Gauge( | ||
| "recceiver_connections_limit", | ||
| "Maximum concurrent active connections (maxActive)", | ||
| registry=_registry, | ||
| ) | ||
| known_iocs = Gauge( | ||
| "recceiver_known_iocs", | ||
| "IOCs with channels currently registered in CF", | ||
| registry=_registry, | ||
| ) | ||
| tracked_channels = Gauge( | ||
| "recceiver_tracked_channels", | ||
| "Unique channel names tracked by the CF processor", | ||
| registry=_registry, | ||
| ) | ||
| cf_commits_total = Counter( | ||
| "recceiver_cf_commits_total", | ||
| "CF push attempts by result", | ||
| ["result"], | ||
| registry=_registry, | ||
| ) | ||
| cf_commit_duration_seconds = Histogram( | ||
| "recceiver_cf_commit_duration_seconds", | ||
| "CF push duration in seconds", | ||
| buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], | ||
| registry=_registry, | ||
| ) | ||
|
|
||
|
jacomago marked this conversation as resolved.
|
||
| class _MetricsResource(Resource): | ||
| isLeaf = True | ||
|
|
||
| def render_GET(self, request): | ||
| request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode()) | ||
| return generate_latest(_registry) | ||
|
|
||
| def make_site(): | ||
| return Site(_MetricsResource()) | ||
|
|
||
| available = True | ||
|
|
||
| except ImportError: | ||
| available = False | ||
|
|
||
| class _Noop: | ||
| def set(self, v=None): | ||
|
Check failure on line 68 in server/recceiver/metrics.py
|
||
| pass | ||
|
|
||
| def inc(self, v=1): | ||
|
Check failure on line 71 in server/recceiver/metrics.py
|
||
| pass | ||
|
|
||
| def labels(self, **kw): | ||
|
Check warning on line 74 in server/recceiver/metrics.py
|
||
| return self | ||
|
|
||
| def observe(self, v): | ||
|
Check failure on line 77 in server/recceiver/metrics.py
|
||
| pass | ||
|
|
||
| connections_active = _Noop() | ||
| connections_waiting = _Noop() | ||
| connections_limit = _Noop() | ||
| known_iocs = _Noop() | ||
| tracked_channels = _Noop() | ||
| cf_commits_total = _Noop() | ||
| cf_commit_duration_seconds = _Noop() | ||
|
|
||
| def make_site(): | ||
| raise RuntimeError("prometheus_client is not installed") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% sure about this, but ok.
And also update recceiver-full.conf.