-
Notifications
You must be signed in to change notification settings - Fork 100
feat(strands-memory): add event metadata support to AgentCoreMemorySessionManager #339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -219,6 +219,8 @@ result = agent_with_tools("/path/to/image.png") | |
| - `actor_id`: Unique identifier for the user/actor | ||
| - `retrieval_config`: Dictionary mapping namespaces to RetrievalConfig objects | ||
| - `batch_size`: Number of messages to buffer before sending to AgentCore Memory (1-100, default: 1). A value of 1 sends immediately (no batching). | ||
| - `default_metadata`: Optional dictionary of key-value metadata to attach to every message event. Maximum 15 total keys per event (including internal keys). Example: `{"location": {"stringValue": "NYC"}}` | ||
| - `metadata_provider`: Optional callable returning a metadata dictionary. Called at each event creation for dynamic values (e.g., traceId). Merged after `default_metadata`. | ||
|
|
||
| ### RetrievalConfig Parameters | ||
|
|
||
|
|
@@ -239,6 +241,68 @@ https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-strategies. | |
| - `/summaries/{actorId}/{sessionId}/`: Session-specific summaries | ||
|
|
||
|
|
||
| --- | ||
|
|
||
| ## Event Metadata | ||
|
|
||
| You can attach custom key-value metadata to every message event. This is useful for tagging | ||
| conversations with contextual information (e.g., location, project, case type) that can later | ||
| be used to filter events with `list_events`. | ||
|
|
||
| ### Default Metadata (applied to all messages) | ||
|
|
||
| ```python | ||
| config = AgentCoreMemoryConfig( | ||
| memory_id=MEM_ID, | ||
| session_id=SESSION_ID, | ||
| actor_id=ACTOR_ID, | ||
| default_metadata={ | ||
| "project": {"stringValue": "atlas"}, | ||
| "env": {"stringValue": "production"}, | ||
| }, | ||
| ) | ||
| session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1') | ||
| agent = Agent(session_manager=session_manager) | ||
| agent("Hello!") # This event will have project=atlas and env=production metadata | ||
| ``` | ||
|
|
||
| ### Dynamic Metadata (metadata_provider) | ||
|
|
||
| For values that change per invocation (e.g., traceId for Langfuse), use `metadata_provider` — | ||
| a callable invoked at each event creation: | ||
|
|
||
| ```python | ||
| from langfuse.decorators import langfuse_context | ||
|
|
||
| def get_trace_metadata(): | ||
| return {"traceId": {"stringValue": langfuse_context.get_current_trace_id() or ""}} | ||
|
|
||
| config = AgentCoreMemoryConfig( | ||
| memory_id=MEM_ID, | ||
| session_id=SESSION_ID, | ||
| actor_id=ACTOR_ID, | ||
| metadata_provider=get_trace_metadata, | ||
| ) | ||
| session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1') | ||
| agent = Agent(session_manager=session_manager) | ||
| agent("Hello!") # Event gets the current traceId automatically | ||
| ``` | ||
|
|
||
| ### Per-call Metadata | ||
|
|
||
| You can also pass metadata on individual `create_message` calls. Per-call metadata is merged | ||
| with `default_metadata` and `metadata_provider` (per-call values override both for the same key): | ||
|
|
||
| ```python | ||
| session_manager.create_message( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens when we flush messages in a batch and the metadata is different on each message? Does all the metadata get merged? |
||
| session_id, agent_id, message, | ||
| metadata={"priority": {"stringValue": "high"}}, | ||
| ) | ||
| ``` | ||
|
|
||
| > **Note:** The API allows a maximum of 15 metadata key-value pairs per event. | ||
| > The keys `stateType` and `agentId` are reserved for internal use. | ||
|
|
||
| --- | ||
|
|
||
| ## Message Batching | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,8 @@ | ||
| """Configuration for AgentCore Memory Session Manager.""" | ||
|
|
||
| from typing import Dict, Optional | ||
| from typing import Any, Callable, Dict, Optional | ||
|
|
||
| from pydantic import BaseModel, Field | ||
| from pydantic import BaseModel, ConfigDict, Field | ||
|
|
||
|
|
||
| class RetrievalConfig(BaseModel): | ||
|
|
@@ -38,8 +38,16 @@ class AgentCoreMemoryConfig(BaseModel): | |
| Default is "user_context". | ||
| filter_restored_tool_context: When True, strip historical toolUse/toolResult blocks from | ||
| restored messages before loading them into Strands runtime memory. Default is False. | ||
| default_metadata: Optional default metadata key-value pairs to attach to every message event. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this actually solve the customer's ask?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we sure this is the interface the customer is looking for? Could we ask them to send an example code block of the support they want? |
||
| Merged with any per-call metadata. Maximum 15 total keys per event (including internal keys). | ||
| Example: {"location": {"stringValue": "NYC"}} | ||
| metadata_provider: Optional callable that returns metadata key-value pairs. Called at each | ||
| event creation, so it can return dynamic values (e.g. current traceId). The returned | ||
| dict is merged after default_metadata but before per-call metadata. | ||
| """ | ||
|
|
||
| model_config = ConfigDict(arbitrary_types_allowed=True) | ||
|
|
||
| memory_id: str = Field(min_length=1) | ||
| session_id: str = Field(min_length=1) | ||
| actor_id: str = Field(min_length=1) | ||
|
|
@@ -48,3 +56,5 @@ class AgentCoreMemoryConfig(BaseModel): | |
| flush_interval_seconds: Optional[float] = Field(default=None, gt=0) | ||
| context_tag: str = Field(default="user_context", min_length=1) | ||
| filter_restored_tool_context: bool = Field(default=False) | ||
| default_metadata: Optional[Dict[str, Any]] = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we need |
||
| metadata_provider: Optional[Callable[[], Dict[str, Any]]] = None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from datetime import datetime, timedelta, timezone | ||
| from enum import Enum | ||
| from typing import TYPE_CHECKING, Any, Optional | ||
| from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Optional | ||
|
|
||
| import boto3 | ||
| from botocore.config import Config as BotocoreConfig | ||
|
|
@@ -23,6 +23,7 @@ | |
| from bedrock_agentcore.memory.models.filters import ( | ||
| EventMetadataFilter, | ||
| LeftExpression, | ||
| MetadataValue, | ||
| OperatorType, | ||
| RightExpression, | ||
| ) | ||
|
|
@@ -46,6 +47,22 @@ | |
| STATE_TYPE_KEY = "stateType" | ||
| AGENT_ID_KEY = "agentId" | ||
|
|
||
| # Maximum metadata key-value pairs per event (API limit) | ||
| MAX_METADATA_KEYS = 15 | ||
|
|
||
| # Reserved internal metadata keys that users cannot override | ||
| RESERVED_METADATA_KEYS = frozenset({STATE_TYPE_KEY, AGENT_ID_KEY}) | ||
|
|
||
|
|
||
| class BufferedMessage(NamedTuple): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice, I agree with the decision to add some structure here. |
||
| """A pre-processed message waiting to be flushed to AgentCore Memory.""" | ||
|
|
||
| session_id: str | ||
| messages: list[tuple[str, str]] | ||
| is_blob: bool | ||
| timestamp: datetime | ||
| metadata: Optional[Dict[str, MetadataValue]] = None | ||
|
|
||
|
|
||
| class StateType(Enum): | ||
| """State type for distinguishing session and agent metadata in events.""" | ||
|
|
@@ -129,8 +146,8 @@ def __init__( | |
| session = boto_session or boto3.Session(region_name=region_name) | ||
| self.has_existing_agent = False | ||
|
|
||
| # Batching support - stores pre-processed messages: (session_id, messages, is_blob, timestamp) | ||
| self._message_buffer: list[tuple[str, list[tuple[str, str]], bool, datetime]] = [] | ||
| # Batching support - stores pre-processed messages | ||
| self._message_buffer: list[BufferedMessage] = [] | ||
| self._message_lock = threading.Lock() | ||
|
|
||
| # Agent state buffering - stores all agent state updates: (session_id, agent) | ||
|
|
@@ -169,6 +186,58 @@ def __init__( | |
| if self.config.flush_interval_seconds: | ||
| self._start_flush_timer() | ||
|
|
||
| def _build_metadata( | ||
| self, | ||
| internal_metadata: Optional[Dict[str, MetadataValue]] = None, | ||
| per_call_metadata: Optional[Dict[str, MetadataValue]] = None, | ||
| ) -> Optional[Dict[str, MetadataValue]]: | ||
| """Build merged metadata from config defaults, provider, per-call overrides, and internal keys. | ||
|
|
||
| Merge precedence (highest wins): | ||
| 1. internal_metadata (stateType, agentId) — always wins | ||
| 2. per_call_metadata (passed via **kwargs) | ||
| 3. metadata_provider() (called at event creation time for dynamic values) | ||
| 4. self.config.default_metadata (set at config construction time) | ||
|
|
||
| Args: | ||
| internal_metadata: System-reserved metadata (e.g. stateType, agentId). | ||
| per_call_metadata: Caller-supplied metadata for a single operation. | ||
|
|
||
| Returns: | ||
| Merged metadata dict, or None if empty. | ||
|
|
||
| Raises: | ||
| ValueError: If user metadata contains reserved keys or total keys exceed MAX_METADATA_KEYS. | ||
| """ | ||
| merged: Dict[str, MetadataValue] = {} | ||
|
|
||
| if self.config.default_metadata: | ||
| merged.update(self.config.default_metadata) | ||
|
|
||
| if self.config.metadata_provider: | ||
| merged.update(self.config.metadata_provider()) | ||
|
|
||
| if per_call_metadata: | ||
| merged.update(per_call_metadata) | ||
|
|
||
| # Validate user-supplied keys before merging internal keys | ||
| user_reserved = RESERVED_METADATA_KEYS & merged.keys() | ||
| if user_reserved: | ||
| raise ValueError( | ||
| f"Metadata keys {user_reserved} are reserved for internal use. " | ||
| f"Reserved keys: {RESERVED_METADATA_KEYS}" | ||
| ) | ||
|
|
||
| if internal_metadata: | ||
| merged.update(internal_metadata) | ||
|
|
||
| if len(merged) > MAX_METADATA_KEYS: | ||
| raise ValueError( | ||
| f"Combined metadata has {len(merged)} keys, exceeding the maximum of {MAX_METADATA_KEYS}." | ||
| ) | ||
|
|
||
| return merged or None | ||
|
|
||
| # region SessionRepository interface implementation | ||
| def create_session(self, session: Session, **kwargs: Any) -> Session: | ||
| """Create a new session in AgentCore Memory. | ||
|
|
@@ -482,6 +551,9 @@ def create_message( | |
|
|
||
| is_blob = self.converter.exceeds_conversational_limit(messages[0]) | ||
|
|
||
| # Build merged metadata from config defaults + per-call overrides | ||
| merged_metadata = self._build_metadata(per_call_metadata=kwargs.get("metadata")) | ||
|
|
||
| # Parse the original timestamp and use it as desired timestamp | ||
| original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")) | ||
| monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp) | ||
|
|
@@ -490,7 +562,15 @@ def create_message( | |
| # Buffer the pre-processed message | ||
| should_flush = False | ||
| with self._message_lock: | ||
| self._message_buffer.append((session_id, messages, is_blob, monotonic_timestamp)) | ||
| self._message_buffer.append( | ||
| BufferedMessage( | ||
| session_id=session_id, | ||
| messages=messages, | ||
| is_blob=is_blob, | ||
| timestamp=monotonic_timestamp, | ||
| metadata=merged_metadata, | ||
| ) | ||
| ) | ||
| should_flush = len(self._message_buffer) >= self.config.batch_size | ||
|
|
||
| # Flush only messages outside the lock to prevent deadlock | ||
|
|
@@ -508,17 +588,19 @@ def create_message( | |
| session_id=session_id, | ||
| messages=messages, | ||
| event_timestamp=monotonic_timestamp, | ||
| metadata=merged_metadata, | ||
| ) | ||
| else: | ||
| event = self.memory_client.gmdp_client.create_event( | ||
| memoryId=self.config.memory_id, | ||
| actorId=self.config.actor_id, | ||
| sessionId=session_id, | ||
| payload=[ | ||
| {"blob": json.dumps(messages[0])}, | ||
| ], | ||
| eventTimestamp=monotonic_timestamp, | ||
| ) | ||
| create_event_kwargs: dict[str, Any] = { | ||
| "memoryId": self.config.memory_id, | ||
| "actorId": self.config.actor_id, | ||
| "sessionId": session_id, | ||
| "payload": [{"blob": json.dumps(messages[0])}], | ||
| "eventTimestamp": monotonic_timestamp, | ||
| } | ||
| if merged_metadata: | ||
| create_event_kwargs["metadata"] = merged_metadata | ||
| event = self.memory_client.gmdp_client.create_event(**create_event_kwargs) | ||
| logger.debug("Created event: %s for message: %s", event.get("eventId"), session_message.message_id) | ||
| return event | ||
| except Exception as e: | ||
|
|
@@ -790,39 +872,45 @@ def _flush_messages_only(self) -> list[dict[str, Any]]: | |
| return [] | ||
|
|
||
| # Group all messages by session_id, combining conversational and blob messages | ||
| # Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp}} | ||
| # Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp, "metadata": {...}}} | ||
| session_groups: dict[str, dict[str, Any]] = {} | ||
|
|
||
| for session_id, messages, is_blob, monotonic_timestamp in messages_to_send: | ||
| if session_id not in session_groups: | ||
| session_groups[session_id] = {"payload": [], "timestamp": monotonic_timestamp} | ||
| for buffered_msg in messages_to_send: | ||
| sid = buffered_msg.session_id | ||
| if sid not in session_groups: | ||
| session_groups[sid] = {"payload": [], "timestamp": buffered_msg.timestamp, "metadata": {}} | ||
|
|
||
| if is_blob: | ||
| # Add blob messages to payload | ||
| for msg in messages: | ||
| session_groups[session_id]["payload"].append({"blob": json.dumps(msg)}) | ||
| if buffered_msg.is_blob: | ||
| for msg in buffered_msg.messages: | ||
| session_groups[sid]["payload"].append({"blob": json.dumps(msg)}) | ||
| else: | ||
| # Add conversational messages to payload | ||
| for text, role in messages: | ||
| session_groups[session_id]["payload"].append( | ||
| for text, role in buffered_msg.messages: | ||
| session_groups[sid]["payload"].append( | ||
| {"conversational": {"content": {"text": text}, "role": role.upper()}} | ||
| ) | ||
|
|
||
| # Use the latest timestamp for the combined event | ||
| if monotonic_timestamp > session_groups[session_id]["timestamp"]: | ||
| session_groups[session_id]["timestamp"] = monotonic_timestamp | ||
| if buffered_msg.timestamp > session_groups[sid]["timestamp"]: | ||
| session_groups[sid]["timestamp"] = buffered_msg.timestamp | ||
|
|
||
| # Merge metadata (later entries override earlier for same key) | ||
| if buffered_msg.metadata: | ||
| session_groups[sid]["metadata"].update(buffered_msg.metadata) | ||
|
|
||
| results = [] | ||
| try: | ||
| # Send one create_event per session_id with all messages (conversational + blob) | ||
| for session_id, group in session_groups.items(): | ||
| event = self.memory_client.gmdp_client.create_event( | ||
| memoryId=self.config.memory_id, | ||
| actorId=self.config.actor_id, | ||
| sessionId=session_id, | ||
| payload=group["payload"], | ||
| eventTimestamp=group["timestamp"], | ||
| ) | ||
| create_event_kwargs: dict[str, Any] = { | ||
| "memoryId": self.config.memory_id, | ||
| "actorId": self.config.actor_id, | ||
| "sessionId": session_id, | ||
| "payload": group["payload"], | ||
| "eventTimestamp": group["timestamp"], | ||
| } | ||
| if group["metadata"]: | ||
| create_event_kwargs["metadata"] = group["metadata"] | ||
| event = self.memory_client.gmdp_client.create_event(**create_event_kwargs) | ||
| results.append(event) | ||
| logger.debug( | ||
| "Flushed batched event for session %s with %d messages: %s", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we be able to build this map on behalf of the customer? It feels very verbose.
Ex:
{"project" : "atlas"}-->{"project" : { "stringValue": "atlas"}}