Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def key_to_bool(k: str) -> bool:
import inspect

all_accepted_patch_all_args = inspect.getfullargspec(monkey.patch_all)[0]
provided_options = provided_options.replace(" ", "").replace("--", "").split(",")
provided_options = (
provided_options.replace(" ", "").replace("--", "").split(",")
)

provided_options = [
k for k in provided_options if short_key(k) in all_accepted_patch_all_args
Expand Down Expand Up @@ -210,11 +212,6 @@ def boot_agent() -> None:
server as tornado_server, # noqa: F401
)

# Hooks
from instana.hooks import (
hook_gunicorn, # noqa: F401
)


def _start_profiler() -> None:
"""Start the Instana Auto Profile."""
Expand Down
10 changes: 8 additions & 2 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ def handle_fork(self) -> None:
"""
Forks happen. Here we handle them.
"""
# Update boot PID to current PID to prevent duplicate fork detection
self._boot_pid = os.getpid()
# Reset the Agent
self.reset()

Expand Down Expand Up @@ -457,9 +459,13 @@ def diagnostics(self) -> None:
logger.warning(
f"is_collector_thread_running?: {self.collector.is_reporting_thread_running()}"
)
logger.warning(
f"background_report_lock.locked?: {self.collector.background_report_lock.locked()}"
# RLock doesn't have a locked() method, so we check by trying to acquire
lock_acquired = self.collector.background_report_lock.acquire(
blocking=False
)
if lock_acquired:
self.collector.background_report_lock.release()
logger.warning(f"background_report_lock.locked?: {not lock_acquired}")
logger.warning(f"ready_to_start: {self.collector.ready_to_start}")
logger.warning(f"reporting_thread: {self.collector.reporting_thread}")
logger.warning(f"report_interval: {self.collector.report_interval}")
Expand Down
18 changes: 13 additions & 5 deletions src/instana/collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Type

from instana.log import logger
from instana.util import DictionaryOfStan
from instana.util import DictionaryOfStan, get_lock

if TYPE_CHECKING:
from instana.agent.base import BaseAgent
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, agent: Type["BaseAgent"]) -> None:
# Lock used synchronize reporting - no updates when sending
# Used by the background reporting thread. Used to synchronize report attempts and so
# that we never have two in progress at once.
self.background_report_lock = threading.Lock()
self.background_report_lock = get_lock()

# Reporting interval for the background thread(s)
self.report_interval = 1
Expand All @@ -70,16 +70,21 @@ def is_reporting_thread_running(self) -> bool:
"""
Indicates if there is a thread running with the name self.THREAD_NAME
"""
for thread in threading.enumerate():
if thread.name == self.THREAD_NAME:
return True
if (
self.started
and self.reporting_thread is not None
and self.reporting_thread.is_alive()
):
return True
return False

def start(self) -> None:
"""
Starts the collector and starts reporting as long as the agent is in a ready state.
@return: None
"""
# Check if we already have a valid running thread using is_alive()
# This is more reliable than is_reporting_thread_running() after fork
if self.is_reporting_thread_running():
if self.thread_shutdown.is_set():
# Force a restart.
Expand All @@ -93,6 +98,7 @@ def start(self) -> None:
logger.debug(
f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})"
)
return

if self.agent.can_send():
logger.debug("BaseCollector.start: launching collection thread")
Expand Down Expand Up @@ -120,6 +126,8 @@ def shutdown(self, report_final: bool = True) -> None:
logger.debug("Collector.shutdown: Reporting final data.")
self.prepare_and_report_data()
self.started = False
# Clear the thread reference to ensure clean restart after fork
self.reporting_thread = None

def background_report(self) -> None:
"""
Expand Down
19 changes: 10 additions & 9 deletions src/instana/collector/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from time import time
from typing import DefaultDict, Any
from typing import Any, DefaultDict

from instana.collector.base import BaseCollector
from instana.collector.helpers.runtime import RuntimeHelper
Expand Down Expand Up @@ -43,19 +43,20 @@ def prepare_and_report_data(self) -> None:
state machine case.
"""
try:
if self.agent.machine.fsm.current == "wait4init":
with self.agent.machine._lock:
current_state = self.agent.machine.fsm.current

if current_state == "wait4init":
# Test the host agent if we're ready to send data
if self.agent.is_agent_ready():
if self.agent.machine.fsm.current != "good2go":
logger.debug("Agent is ready. Getting to work.")
self.agent.machine.fsm.ready()
with self.agent.machine._lock:
if self.agent.machine.fsm.current != "good2go":
logger.debug("Agent is ready. Getting to work.")
self.agent.machine.fsm.ready()
else:
return

if (
self.agent.machine.fsm.current == "good2go"
and self.agent.is_timed_out()
):
if current_state == "good2go" and self.agent.is_timed_out():
logger.info(
"The Instana host agent has gone offline or is no longer reachable for > 1 min. Will retry periodically."
)
Expand Down
63 changes: 42 additions & 21 deletions src/instana/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fysom import Fysom

from instana.log import logger
from instana.util import get_default_gateway
from instana.util import get_default_gateway, get_lock
from instana.util.process_discovery import Discovery
from instana.version import VERSION

Expand All @@ -25,14 +25,16 @@ class TheMachine:
RETRY_PERIOD = 30
THREAD_NAME = "Instana Machine"

warnedPeriodic = False

def __init__(self, agent: "HostAgent") -> None:
logger.debug("Initializing host agent state machine")

self._lock = get_lock()
self._warned_periodic = False

self.agent = agent
self.fsm = Fysom(
{
"initial": "*",
"events": [
("lookup", "*", "found"),
("announce", "found", "announced"),
Expand All @@ -41,7 +43,7 @@ def __init__(self, agent: "HostAgent") -> None:
],
"callbacks": {
# Can add the following to debug
# "onchangestate": self.print_state_change,
# "onchangestate": self.print_state_change,
"onlookup": self.lookup_agent_host,
"onannounce": self.announce_sensor,
"onpending": self.on_ready,
Expand All @@ -50,17 +52,33 @@ def __init__(self, agent: "HostAgent") -> None:
}
)

self.timer = threading.Timer(1, self.fsm.lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()
with self._lock:
self.timer = threading.Timer(1, self._safe_fsm_lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()

@staticmethod
def print_state_change(e: Any) -> None:
logger.debug(
f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} =========="
)

def _safe_fsm_lookup(self) -> None:
"""Thread-safe wrapper for FSM lookup."""
with self._lock:
self.fsm.lookup()

def _safe_fsm_announce(self) -> None:
"""Thread-safe wrapper for FSM announce."""
with self._lock:
self.fsm.announce()

def _safe_fsm_pending(self) -> None:
"""Thread-safe wrapper for FSM pending."""
with self._lock:
self.fsm.pending()

def reset(self) -> None:
"""
reset is called to start from scratch in a process. It may be called on first boot or
Expand All @@ -72,14 +90,15 @@ def reset(self) -> None:
:return: void
"""
logger.debug("State machine being reset. Will start a new announce cycle.")
self.fsm.lookup()
with self._lock:
self.fsm.lookup()

def lookup_agent_host(self, e: Any) -> bool:
host = self.agent.options.agent_host
port = self.agent.options.agent_port

if self.agent.is_agent_listening(host, port):
self.fsm.announce()
self._safe_fsm_announce()
return True

if os.path.exists("/proc/"):
Expand All @@ -88,14 +107,15 @@ def lookup_agent_host(self, e: Any) -> bool:
if self.agent.is_agent_listening(host, port):
self.agent.options.agent_host = host
self.agent.options.agent_port = port
self.fsm.announce()
self._safe_fsm_announce()
return True

if self.warnedPeriodic is False:
logger.info(
"Instana Host Agent couldn't be found. Will retry periodically..."
)
self.warnedPeriodic = True
with self._lock:
if self._warned_periodic is False:
logger.info(
"Instana Host Agent couldn't be found. Will retry periodically..."
)
self._warned_periodic = True

self.schedule_retry(
self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup"
Expand Down Expand Up @@ -156,17 +176,18 @@ def announce_sensor(self, e: Any) -> bool:
return False

self.agent.set_from(payload)
self.fsm.pending()
self._safe_fsm_pending()
logger.debug(
f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..."
)
return True

def schedule_retry(self, fun: Callable, e: Any, name: str) -> None:
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
self.timer.daemon = True
self.timer.name = name
self.timer.start()
with self._lock:
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
self.timer.daemon = True
self.timer.name = name
self.timer.start()

def on_ready(self, _: Any) -> None:
self.agent.start()
Expand Down
Empty file removed src/instana/hooks/__init__.py
Empty file.
25 changes: 0 additions & 25 deletions src/instana/hooks/hook_gunicorn.py

This file was deleted.

30 changes: 29 additions & 1 deletion src/instana/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,40 @@

import importlib.metadata
import json
import threading
from collections import defaultdict
from typing import Any, DefaultDict
from typing import Any, DefaultDict, Union
from urllib import parse

from instana.log import logger

try:
import gevent.lock

GeventRLock = gevent.lock.RLock
except ImportError:
GeventRLock = None


def get_lock() -> Union[threading.RLock, Any]:
"""
Get an appropriate lock for the current environment.
Returns a gevent-compatible lock if gevent is active, otherwise a threading.Lock.
"""
try:
# Check if gevent is active by looking for monkey-patched threading
import gevent.monkey

if gevent.monkey.is_module_patched("threading"):
from gevent.lock import RLock

return RLock()
except (ImportError, AttributeError):
pass

# Default to threading.RLock for regular threads
return threading.RLock()


def nested_dictionary() -> DefaultDict[str, Any]:
return defaultdict(DictionaryOfStan)
Expand Down
3 changes: 3 additions & 0 deletions tests/collector/test_base_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def reporting_function():
name=self.collector.THREAD_NAME, target=reporting_function
)
sample_thread.start()
# Set the required state for is_reporting_thread_running to return True
self.collector.started = True
self.collector.reporting_thread = sample_thread
try:
assert self.collector.is_reporting_thread_running()
finally:
Expand Down
Loading