Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 76 additions & 71 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -92,6 +93,31 @@ async def get_event(self, event_id: str) -> Event | None:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, self._get_event_sync, event_id)

def _event_passes_filters(
self,
event: Event,
kind: str | None,
source: str | None,
body: str | None,
timestamp_gte_str: str | None,
timestamp_lt_str: str | None,
) -> bool:
"""Return True if the event matches all active filters."""
if (
kind is not None
and f"{event.__class__.__module__}.{event.__class__.__name__}" != kind
):
return False
if source is not None and event.source != source:
return False
if body is not None and not self._event_matches_body(event, body):
return False
if timestamp_gte_str is not None and event.timestamp < timestamp_gte_str:
return False
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
return False
return True

def _search_events_sync(
self,
page_id: str | None = None,
Expand All @@ -107,66 +133,56 @@ def _search_events_sync(
if not self._conversation:
raise ValueError("inactive_service")

# Convert datetime to ISO string for comparison (ISO strings are comparable)
timestamp_gte_str = timestamp__gte.isoformat() if timestamp__gte else None
timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None

# Collect all events
all_events = []
# Minimal lock scope: capture event_log reference and count only.
# Event files are immutable once written; reading indices < event_count
# outside the lock is safe.
with self._conversation._state as state:
for event in state.events:
# Apply kind filter if provided
if (
kind is not None
and f"{event.__class__.__module__}.{event.__class__.__name__}"
!= kind
):
continue

# Apply source filter if provided
if source is not None and event.source != source:
continue

# Apply body filter if provided (case-insensitive substring match)
if body is not None:
if not self._event_matches_body(event, body):
continue

# Apply timestamp filters if provided (ISO string comparison)
if (
timestamp_gte_str is not None
and event.timestamp < timestamp_gte_str
):
continue
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
continue

all_events.append(event)
event_log = state.events
event_count = len(event_log)

def read_event(idx: int) -> Event | None:
try:
return event_log[idx]
except Exception:
return None

# Read all events in parallel outside the lock
with ThreadPoolExecutor(max_workers=8) as executor:
all_raw = list(executor.map(read_event, range(event_count)))

# Filter events
all_events = [
e
for e in all_raw
if e is not None
and self._event_passes_filters(
e, kind, source, body, timestamp_gte_str, timestamp_lt_str
)
]

# Sort events based on sort_order
if sort_order == EventSortOrder.TIMESTAMP:
all_events.sort(key=lambda x: x.timestamp)
elif sort_order == EventSortOrder.TIMESTAMP_DESC:
all_events.sort(key=lambda x: x.timestamp, reverse=True)

# Handle pagination
items = []
start_index = 0

# Find the starting point if page_id is provided
if page_id:
start_index = 0
if page_id is not None:
for i, event in enumerate(all_events):
if event.id == page_id:
start_index = i
break

# Collect items for this page
items = []
next_page_id = None
for i in range(start_index, len(all_events)):
if len(items) >= limit:
# We have more items, set next_page_id
if i < len(all_events):
next_page_id = all_events[i].id
next_page_id = all_events[i].id
break
items.append(all_events[i])

Expand Down Expand Up @@ -211,42 +227,31 @@ def _count_events_sync(
if not self._conversation:
raise ValueError("inactive_service")

# Convert datetime to ISO string for comparison (ISO strings are comparable)
timestamp_gte_str = timestamp__gte.isoformat() if timestamp__gte else None
timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None

count = 0
# Minimal lock scope: capture event_log reference and count only.
with self._conversation._state as state:
for event in state.events:
# Apply kind filter if provided
if (
kind is not None
and f"{event.__class__.__module__}.{event.__class__.__name__}"
!= kind
):
continue

# Apply source filter if provided
if source is not None and event.source != source:
continue

# Apply body filter if provided (case-insensitive substring match)
if body is not None:
if not self._event_matches_body(event, body):
continue

# Apply timestamp filters if provided (ISO string comparison)
if (
timestamp_gte_str is not None
and event.timestamp < timestamp_gte_str
):
continue
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
continue

count += 1

return count
event_log = state.events
event_count = len(event_log)

def read_event(idx: int) -> Event | None:
try:
return event_log[idx]
except Exception:
return None

with ThreadPoolExecutor(max_workers=8) as executor:
all_events = list(executor.map(read_event, range(event_count)))

return sum(
1
for e in all_events
if e is not None
and self._event_passes_filters(
e, kind, source, body, timestamp_gte_str, timestamp_lt_str
)
)

async def count_events(
self,
Expand Down
Loading