Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 139 additions & 89 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,19 @@ def get_conversation(self):
return self._conversation

def _get_event_sync(self, event_id: str) -> Event | None:
"""Private sync function to get event with state lock."""
"""Get a single event by ID without holding locks.

Event reading is safe without lock since events are immutable once
appended and the event log index is only modified by append operations.
"""
if not self._conversation:
raise ValueError("inactive_service")
with self._conversation._state as state:
index = state.events.get_index(event_id)
event = state.events[index]
return event
event_log = self._conversation._state.events
try:
index = event_log.get_index(event_id)
return event_log[index]
except (KeyError, IndexError, FileNotFoundError):
return None

async def get_event(self, event_id: str) -> Event | None:
if not self._conversation:
Expand All @@ -101,72 +107,101 @@ def _search_events_sync(
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
) -> EventPage:
"""Private sync function to search events with state lock."""
"""Search events with filtering and pagination.

Optimized to avoid holding locks during I/O and to support early exit
when limit is reached. The kind filter accepts simple class names
(e.g., 'MessageEvent') for usability.
"""
if not self._conversation:
raise ValueError("inactive_service")

# Convert datetime to ISO string for comparison (ISO strings are comparable)
timestamp_gte_str = timestamp__gte.isoformat() if timestamp__gte else None
timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None

# Collect all events
all_events = []
with self._conversation._state as state:
for event in state.events:
# Apply kind filter if provided
if (
kind is not None
and f"{event.__class__.__module__}.{event.__class__.__name__}"
!= kind
):
continue
# Snapshot event log length without holding lock (GIL makes int read atomic)
event_log = self._conversation._state.events
event_count = len(event_log)

# Determine iteration order based on sort order
# For DESC order, iterate from end for efficiency
if sort_order == EventSortOrder.TIMESTAMP_DESC:
indices = range(event_count - 1, -1, -1)
else:
indices = range(event_count)

# Apply source filter if provided
if source is not None and event.source != source:
# Find starting index if page_id is provided (O(1) lookup if EventLog)
start_idx = None
if page_id:
# Try O(1) lookup via get_index if available (EventLog)
if hasattr(event_log, "get_index"):
try:
start_idx = event_log.get_index(page_id)
except KeyError:
start_idx = None
else:
# Fallback to linear search for list-like objects (test mocks)
for i, evt in enumerate(event_log):
if evt.id == page_id:
start_idx = i
break

# Collect matching events
items: list[Event] = []
found_cursor = start_idx is None # If no cursor, start immediately

for i in indices:
# Skip until we reach the cursor position (inclusive)
if not found_cursor:
if i == start_idx:
found_cursor = True
else:
continue

# Apply body filter if provided (case-insensitive substring match)
if body is not None:
if not self._event_matches_body(event, body):
continue
# Read event from storage
try:
event = event_log[i]
except (IndexError, FileNotFoundError):
continue

# Apply timestamp filters if provided (ISO string comparison)
if (
timestamp_gte_str is not None
and event.timestamp < timestamp_gte_str
):
continue
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
continue
# Apply kind filter - use simple class name for usability
if kind is not None and event.__class__.__name__ != kind:
continue

all_events.append(event)
# Apply source filter
if source is not None and event.source != source:
continue

# Sort events based on sort_order
if sort_order == EventSortOrder.TIMESTAMP:
all_events.sort(key=lambda x: x.timestamp)
elif sort_order == EventSortOrder.TIMESTAMP_DESC:
all_events.sort(key=lambda x: x.timestamp, reverse=True)
# Apply body filter (case-insensitive substring match)
if body is not None and not self._event_matches_body(event, body):
continue

# Handle pagination
items = []
start_index = 0
# Apply timestamp filters (ISO string comparison)
if timestamp_gte_str is not None and event.timestamp < timestamp_gte_str:
# For ASC order, we can't break early because events aren't sorted
# For DESC order, once we hit older timestamps, we can stop
if sort_order == EventSortOrder.TIMESTAMP_DESC:
break
continue

# Find the starting point if page_id is provided
if page_id:
for i, event in enumerate(all_events):
if event.id == page_id:
start_index = i
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
# For ASC order, once we hit newer timestamps, we can stop
if sort_order == EventSortOrder.TIMESTAMP:
break
continue

# Collect items for this page
next_page_id = None
for i in range(start_index, len(all_events)):
if len(items) >= limit:
# We have more items, set next_page_id
if i < len(all_events):
next_page_id = all_events[i].id
items.append(event)

# Early exit when limit is reached
if len(items) >= limit + 1:
break
items.append(all_events[i])

# Handle pagination - if we have more than limit, set next_page_id
next_page_id = None
if len(items) > limit:
next_page_id = items[limit].id
items = items[:limit]

return EventPage(items=items, next_page_id=next_page_id)

Expand Down Expand Up @@ -205,44 +240,48 @@ def _count_events_sync(
timestamp__gte: datetime | None = None,
timestamp__lt: datetime | None = None,
) -> int:
"""Private sync function to count events with state lock."""
"""Count events matching the given filters.

The kind filter accepts simple class names (e.g., 'MessageEvent')
for usability.
"""
if not self._conversation:
raise ValueError("inactive_service")

# Convert datetime to ISO string for comparison (ISO strings are comparable)
timestamp_gte_str = timestamp__gte.isoformat() if timestamp__gte else None
timestamp_lt_str = timestamp__lt.isoformat() if timestamp__lt else None

# Snapshot event log without holding lock
event_log = self._conversation._state.events
event_count = len(event_log)

count = 0
with self._conversation._state as state:
for event in state.events:
# Apply kind filter if provided
if (
kind is not None
and f"{event.__class__.__module__}.{event.__class__.__name__}"
!= kind
):
continue
for i in range(event_count):
try:
event = event_log[i]
except (IndexError, FileNotFoundError):
continue

# Apply source filter if provided
if source is not None and event.source != source:
continue
# Apply kind filter - use simple class name for usability
if kind is not None and event.__class__.__name__ != kind:
continue

# Apply source filter
if source is not None and event.source != source:
continue

# Apply body filter if provided (case-insensitive substring match)
if body is not None:
if not self._event_matches_body(event, body):
continue
# Apply body filter (case-insensitive substring match)
if body is not None and not self._event_matches_body(event, body):
continue

# Apply timestamp filters if provided (ISO string comparison)
if (
timestamp_gte_str is not None
and event.timestamp < timestamp_gte_str
):
continue
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
continue
# Apply timestamp filters (ISO string comparison)
if timestamp_gte_str is not None and event.timestamp < timestamp_gte_str:
continue
if timestamp_lt_str is not None and event.timestamp >= timestamp_lt_str:
continue

count += 1
count += 1

return count

Expand Down Expand Up @@ -329,20 +368,31 @@ async def _run_with_error_handling():
async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
subscriber_id = self._pub_sub.subscribe(subscriber)

# Send current state to the new subscriber immediately
# Send current state to the new subscriber immediately.
# We use non-blocking lock acquisition to avoid blocking if the agent
# is actively running (which can hold the lock for seconds or longer).
if self._conversation:
state = self._conversation._state
# Create state snapshot while holding the lock to ensure consistency.
# ConversationStateUpdateEvent inherits from Event which has frozen=True
# in its model_config, making the snapshot immutable after creation.
with state:
state_update_event = (
ConversationStateUpdateEvent.from_conversation_state(state)
state_update_event = None

# Try non-blocking lock acquisition first
if state.acquire(blocking=False):
try:
state_update_event = (
ConversationStateUpdateEvent.from_conversation_state(state)
)
finally:
state.release()
else:
# Lock is held (agent running) - create a minimal state update
# without full serialization to avoid blocking
logger.debug("Lock held during subscribe, sending minimal state update")
state_update_event = ConversationStateUpdateEvent(
key="execution_status",
value=state.execution_status.value,
)

# Send state update outside the lock - the event is frozen (immutable),
# so we don't need to hold the lock during the async send operation.
# This prevents potential deadlocks between the sync FIFOLock and async I/O.
# Send state update outside any lock context
try:
await subscriber(state_update_event)
except Exception as e:
Expand Down
22 changes: 8 additions & 14 deletions tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,8 @@ async def test_search_events_kind_filter(
result = await event_service.search_events(kind="ActionEvent")
assert len(result.items) == 0

# Test filtering by MessageEvent
result = await event_service.search_events(
kind="openhands.sdk.event.llm_convertible.message.MessageEvent"
)
# Test filtering by MessageEvent (simple class name)
result = await event_service.search_events(kind="MessageEvent")
assert len(result.items) == 5
for event in result.items:
assert event.__class__.__name__ == "MessageEvent"
Expand Down Expand Up @@ -225,9 +223,9 @@ async def test_search_events_combined_filter_and_sort(
"""Test combining kind filtering with sorting."""
event_service._conversation = mock_conversation_with_events

# Filter by ActionEvent and sort by TIMESTAMP_DESC
# Filter by MessageEvent and sort by TIMESTAMP_DESC
result = await event_service.search_events(
kind="openhands.sdk.event.llm_convertible.message.MessageEvent",
kind="MessageEvent",
sort_order=EventSortOrder.TIMESTAMP_DESC,
)

Expand All @@ -245,16 +243,14 @@ async def test_search_events_pagination_with_filter(
event_service._conversation = mock_conversation_with_events

# Filter by MessageEvent with limit 1
result = await event_service.search_events(
kind="openhands.sdk.event.llm_convertible.message.MessageEvent", limit=1
)
result = await event_service.search_events(kind="MessageEvent", limit=1)
assert len(result.items) == 1
assert result.items[0].__class__.__name__ == "MessageEvent"
assert result.next_page_id is not None

# Get second page
result = await event_service.search_events(
kind="openhands.sdk.event.llm_convertible.message.MessageEvent",
kind="MessageEvent",
page_id=result.next_page_id,
limit=4,
)
Expand Down Expand Up @@ -486,10 +482,8 @@ async def test_count_events_kind_filter(
result = await event_service.count_events()
assert result == 5

# Count ActionEvent events (should be 5)
result = await event_service.count_events(
kind="openhands.sdk.event.llm_convertible.message.MessageEvent"
)
# Count MessageEvent events (should be 5)
result = await event_service.count_events(kind="MessageEvent")
assert result == 5

# Count non-existent event type (should be 0)
Expand Down
Loading
Loading