Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions conserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,13 @@ def _process_storage(self, storage_name: str) -> None:
"chain_name": self.chain_details["name"]
}
):
started = time.time()
outcome = "success"
try:
logger.debug("Saving vCon %s to storage %s", self.vcon_id, storage_name)
Storage(storage_name).save(self.vcon_id)
except Exception as e:
outcome = "error"
# Record exception in the span
current_span = trace.get_current_span()
if current_span:
Expand All @@ -389,6 +392,11 @@ def _process_storage(self, storage_name: str) -> None:
str(e),
exc_info=True
)
finally:
duration_ms = round((time.time() - started) * 1000, 3)
attrs = {"backend": storage_name, "outcome": outcome}
increment_counter("conserver.storage.count", attributes=attrs)
record_histogram("conserver.storage.duration_ms", duration_ms, attributes=attrs)

def _process_storage_parallel(self, storage_backends: List[str]) -> None:
"""Save vCon to multiple storage backends concurrently.
Expand Down Expand Up @@ -545,6 +553,10 @@ def _process_link(self, links: list[str], link_index: int) -> bool:
"chain.name": self.chain_details["name"],
},
)
increment_counter(
"conserver.link.count",
attributes={"link_name": link_name, "outcome": "success"},
)
logger.info(
"Completed link %s (module: %s) for vCon: %s in %s seconds",
link_name,
Expand All @@ -563,6 +575,10 @@ def _process_link(self, links: list[str], link_index: int) -> bool:
self._process_tracers(self.vcon_id, self.vcon_id, links, link_index)
return should_continue_chain
except Exception as e:
increment_counter(
"conserver.link.count",
attributes={"link_name": link_name, "outcome": "error"},
)
try:
after_link_hook.after_link(
self.vcon_id, link_name, module, options, link_hook_config, "error", e, parties
Expand Down Expand Up @@ -703,6 +719,10 @@ def worker_loop(worker_id: int) -> None:

ingress_list, vcon_id = popped_item
logger.debug("[%s] Received vCon %s from ingress list %s", worker_name, vcon_id, ingress_list)
increment_counter(
"conserver.main_loop.count_vcons_received",
attributes={"ingress_list": ingress_list},
)

if shutdown_requested:
logger.info(
Expand Down
Loading