Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/bedrock_agentcore/memory/integrations/strands/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ result = agent_with_tools("/path/to/image.png")
- `actor_id`: Unique identifier for the user/actor
- `retrieval_config`: Dictionary mapping namespaces to RetrievalConfig objects
- `batch_size`: Number of messages to buffer before sending to AgentCore Memory (1-100, default: 1). A value of 1 sends immediately (no batching).
- `default_metadata`: Optional dictionary of key-value metadata to attach to every message event. Maximum 15 total keys per event (including internal keys). Example: `{"location": {"stringValue": "NYC"}}`
- `metadata_provider`: Optional callable returning a metadata dictionary. Called at each event creation for dynamic values (e.g., traceId). Merged after `default_metadata`.

### RetrievalConfig Parameters

Expand All @@ -239,6 +241,68 @@ https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-strategies.
- `/summaries/{actorId}/{sessionId}/`: Session-specific summaries


---

## Event Metadata

You can attach custom key-value metadata to every message event. This is useful for tagging
conversations with contextual information (e.g., location, project, case type) that can later
be used to filter events with `list_events`.

### Default Metadata (applied to all messages)

```python
config = AgentCoreMemoryConfig(
memory_id=MEM_ID,
session_id=SESSION_ID,
actor_id=ACTOR_ID,
default_metadata={
"project": {"stringValue": "atlas"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we be able to build this map on behalf of the customer? It feels very verbose.

Ex:
{"project" : "atlas"} --> {"project" : { "stringValue": "atlas"}}

"env": {"stringValue": "production"},
},
)
session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1')
agent = Agent(session_manager=session_manager)
agent("Hello!") # This event will have project=atlas and env=production metadata
```

### Dynamic Metadata (metadata_provider)

For values that change per invocation (e.g., traceId for Langfuse), use `metadata_provider` —
a callable invoked at each event creation:

```python
from langfuse.decorators import langfuse_context

def get_trace_metadata():
return {"traceId": {"stringValue": langfuse_context.get_current_trace_id() or ""}}

config = AgentCoreMemoryConfig(
memory_id=MEM_ID,
session_id=SESSION_ID,
actor_id=ACTOR_ID,
metadata_provider=get_trace_metadata,
)
session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1')
agent = Agent(session_manager=session_manager)
agent("Hello!") # Event gets the current traceId automatically
```

### Per-call Metadata

You can also pass metadata on individual `create_message` calls. Per-call metadata is merged
with `default_metadata` and `metadata_provider` (per-call values override both for the same key):

```python
session_manager.create_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when we flush messages in a batch and the metadata is different on each message? Does all the metadata get merged?

session_id, agent_id, message,
metadata={"priority": {"stringValue": "high"}},
)
```

> **Note:** The API allows a maximum of 15 metadata key-value pairs per event.
> The keys `stateType` and `agentId` are reserved for internal use.

---

## Message Batching
Expand Down
14 changes: 12 additions & 2 deletions src/bedrock_agentcore/memory/integrations/strands/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Configuration for AgentCore Memory Session Manager."""

from typing import Dict, Optional
from typing import Any, Callable, Dict, Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field


class RetrievalConfig(BaseModel):
Expand Down Expand Up @@ -38,8 +38,16 @@ class AgentCoreMemoryConfig(BaseModel):
Default is "user_context".
filter_restored_tool_context: When True, strip historical toolUse/toolResult blocks from
restored messages before loading them into Strands runtime memory. Default is False.
default_metadata: Optional default metadata key-value pairs to attach to every message event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually solve the customer's ask?
What if they need different metadata for each message event? Also, what exactly do they mean by "message_event" — are they
  referring to memory records, AgentCore Memory events, or individual conversation turns? Are they trying to attach a distinct metadata field to each conversation turn?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure this is the interface the customer is looking for? Could we ask them to send an example code block of the support they want?

Merged with any per-call metadata. Maximum 15 total keys per event (including internal keys).
Example: {"location": {"stringValue": "NYC"}}
metadata_provider: Optional callable that returns metadata key-value pairs. Called at each
event creation, so it can return dynamic values (e.g. current traceId). The returned
dict is merged after default_metadata but before per-call metadata.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

memory_id: str = Field(min_length=1)
session_id: str = Field(min_length=1)
actor_id: str = Field(min_length=1)
Expand All @@ -48,3 +56,5 @@ class AgentCoreMemoryConfig(BaseModel):
flush_interval_seconds: Optional[float] = Field(default=None, gt=0)
context_tag: str = Field(default="user_context", min_length=1)
filter_restored_tool_context: bool = Field(default=False)
default_metadata: Optional[Dict[str, Any]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need Any here instead of the MetadataValue used internally?

metadata_provider: Optional[Callable[[], Dict[str, Any]]] = None
154 changes: 121 additions & 33 deletions src/bedrock_agentcore/memory/integrations/strands/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Optional

import boto3
from botocore.config import Config as BotocoreConfig
Expand All @@ -23,6 +23,7 @@
from bedrock_agentcore.memory.models.filters import (
EventMetadataFilter,
LeftExpression,
MetadataValue,
OperatorType,
RightExpression,
)
Expand All @@ -46,6 +47,22 @@
STATE_TYPE_KEY = "stateType"
AGENT_ID_KEY = "agentId"

# Maximum metadata key-value pairs per event (API limit)
MAX_METADATA_KEYS = 15

# Reserved internal metadata keys that users cannot override
RESERVED_METADATA_KEYS = frozenset({STATE_TYPE_KEY, AGENT_ID_KEY})


class BufferedMessage(NamedTuple):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, I agree with the decision to add some structure here.

"""A pre-processed message waiting to be flushed to AgentCore Memory."""

session_id: str
messages: list[tuple[str, str]]
is_blob: bool
timestamp: datetime
metadata: Optional[Dict[str, MetadataValue]] = None


class StateType(Enum):
"""State type for distinguishing session and agent metadata in events."""
Expand Down Expand Up @@ -129,8 +146,8 @@ def __init__(
session = boto_session or boto3.Session(region_name=region_name)
self.has_existing_agent = False

# Batching support - stores pre-processed messages: (session_id, messages, is_blob, timestamp)
self._message_buffer: list[tuple[str, list[tuple[str, str]], bool, datetime]] = []
# Batching support - stores pre-processed messages
self._message_buffer: list[BufferedMessage] = []
self._message_lock = threading.Lock()

# Agent state buffering - stores all agent state updates: (session_id, agent)
Expand Down Expand Up @@ -169,6 +186,58 @@ def __init__(
if self.config.flush_interval_seconds:
self._start_flush_timer()

def _build_metadata(
self,
internal_metadata: Optional[Dict[str, MetadataValue]] = None,
per_call_metadata: Optional[Dict[str, MetadataValue]] = None,
) -> Optional[Dict[str, MetadataValue]]:
"""Build merged metadata from config defaults, provider, per-call overrides, and internal keys.

Merge precedence (highest wins):
1. internal_metadata (stateType, agentId) — always wins
2. per_call_metadata (passed via **kwargs)
3. metadata_provider() (called at event creation time for dynamic values)
4. self.config.default_metadata (set at config construction time)

Args:
internal_metadata: System-reserved metadata (e.g. stateType, agentId).
per_call_metadata: Caller-supplied metadata for a single operation.

Returns:
Merged metadata dict, or None if empty.

Raises:
ValueError: If user metadata contains reserved keys or total keys exceed MAX_METADATA_KEYS.
"""
merged: Dict[str, MetadataValue] = {}

if self.config.default_metadata:
merged.update(self.config.default_metadata)

if self.config.metadata_provider:
merged.update(self.config.metadata_provider())

if per_call_metadata:
merged.update(per_call_metadata)

# Validate user-supplied keys before merging internal keys
user_reserved = RESERVED_METADATA_KEYS & merged.keys()
if user_reserved:
raise ValueError(
f"Metadata keys {user_reserved} are reserved for internal use. "
f"Reserved keys: {RESERVED_METADATA_KEYS}"
)

if internal_metadata:
merged.update(internal_metadata)

if len(merged) > MAX_METADATA_KEYS:
raise ValueError(
f"Combined metadata has {len(merged)} keys, exceeding the maximum of {MAX_METADATA_KEYS}."
)

return merged or None

# region SessionRepository interface implementation
def create_session(self, session: Session, **kwargs: Any) -> Session:
"""Create a new session in AgentCore Memory.
Expand Down Expand Up @@ -482,6 +551,9 @@ def create_message(

is_blob = self.converter.exceeds_conversational_limit(messages[0])

# Build merged metadata from config defaults + per-call overrides
merged_metadata = self._build_metadata(per_call_metadata=kwargs.get("metadata"))

# Parse the original timestamp and use it as desired timestamp
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)
Expand All @@ -490,7 +562,15 @@ def create_message(
# Buffer the pre-processed message
should_flush = False
with self._message_lock:
self._message_buffer.append((session_id, messages, is_blob, monotonic_timestamp))
self._message_buffer.append(
BufferedMessage(
session_id=session_id,
messages=messages,
is_blob=is_blob,
timestamp=monotonic_timestamp,
metadata=merged_metadata,
)
)
should_flush = len(self._message_buffer) >= self.config.batch_size

# Flush only messages outside the lock to prevent deadlock
Expand All @@ -508,17 +588,19 @@ def create_message(
session_id=session_id,
messages=messages,
event_timestamp=monotonic_timestamp,
metadata=merged_metadata,
)
else:
event = self.memory_client.gmdp_client.create_event(
memoryId=self.config.memory_id,
actorId=self.config.actor_id,
sessionId=session_id,
payload=[
{"blob": json.dumps(messages[0])},
],
eventTimestamp=monotonic_timestamp,
)
create_event_kwargs: dict[str, Any] = {
"memoryId": self.config.memory_id,
"actorId": self.config.actor_id,
"sessionId": session_id,
"payload": [{"blob": json.dumps(messages[0])}],
"eventTimestamp": monotonic_timestamp,
}
if merged_metadata:
create_event_kwargs["metadata"] = merged_metadata
event = self.memory_client.gmdp_client.create_event(**create_event_kwargs)
logger.debug("Created event: %s for message: %s", event.get("eventId"), session_message.message_id)
return event
except Exception as e:
Expand Down Expand Up @@ -790,39 +872,45 @@ def _flush_messages_only(self) -> list[dict[str, Any]]:
return []

# Group all messages by session_id, combining conversational and blob messages
# Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp}}
# Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp, "metadata": {...}}}
session_groups: dict[str, dict[str, Any]] = {}

for session_id, messages, is_blob, monotonic_timestamp in messages_to_send:
if session_id not in session_groups:
session_groups[session_id] = {"payload": [], "timestamp": monotonic_timestamp}
for buffered_msg in messages_to_send:
sid = buffered_msg.session_id
if sid not in session_groups:
session_groups[sid] = {"payload": [], "timestamp": buffered_msg.timestamp, "metadata": {}}

if is_blob:
# Add blob messages to payload
for msg in messages:
session_groups[session_id]["payload"].append({"blob": json.dumps(msg)})
if buffered_msg.is_blob:
for msg in buffered_msg.messages:
session_groups[sid]["payload"].append({"blob": json.dumps(msg)})
else:
# Add conversational messages to payload
for text, role in messages:
session_groups[session_id]["payload"].append(
for text, role in buffered_msg.messages:
session_groups[sid]["payload"].append(
{"conversational": {"content": {"text": text}, "role": role.upper()}}
)

# Use the latest timestamp for the combined event
if monotonic_timestamp > session_groups[session_id]["timestamp"]:
session_groups[session_id]["timestamp"] = monotonic_timestamp
if buffered_msg.timestamp > session_groups[sid]["timestamp"]:
session_groups[sid]["timestamp"] = buffered_msg.timestamp

# Merge metadata (later entries override earlier for same key)
if buffered_msg.metadata:
session_groups[sid]["metadata"].update(buffered_msg.metadata)

results = []
try:
# Send one create_event per session_id with all messages (conversational + blob)
for session_id, group in session_groups.items():
event = self.memory_client.gmdp_client.create_event(
memoryId=self.config.memory_id,
actorId=self.config.actor_id,
sessionId=session_id,
payload=group["payload"],
eventTimestamp=group["timestamp"],
)
create_event_kwargs: dict[str, Any] = {
"memoryId": self.config.memory_id,
"actorId": self.config.actor_id,
"sessionId": session_id,
"payload": group["payload"],
"eventTimestamp": group["timestamp"],
}
if group["metadata"]:
create_event_kwargs["metadata"] = group["metadata"]
event = self.memory_client.gmdp_client.create_event(**create_event_kwargs)
results.append(event)
logger.debug(
"Flushed batched event for session %s with %d messages: %s",
Expand Down
Loading
Loading