Skip to content

Commit b5e76a1

Browse files
committed
cleanup
1 parent 1196551 commit b5e76a1

2 files changed

Lines changed: 58 additions & 51 deletions

File tree

openai_agents/memory/postgres_session.py

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -56,42 +56,6 @@ class PostgresSessionConfig(BaseModel):
5656
idempotence_table: str = "activity_idempotence"
5757

5858

59-
async def init_schema(conn: asyncpg.Connection, config: PostgresSessionConfig) -> None:
60-
"""Initialize the PostgreSQL schema."""
61-
async with conn.transaction():
62-
# Create sessions table
63-
sessions_ddl = f"""
64-
CREATE TABLE IF NOT EXISTS {config.sessions_table} (
65-
session_id TEXT NOT NULL,
66-
created_at TIMESTAMP DEFAULT NOW(),
67-
updated_at TIMESTAMP DEFAULT NOW(),
68-
PRIMARY KEY (session_id)
69-
)
70-
"""
71-
await conn.execute(sessions_ddl)
72-
73-
# Create operation_id sequence
74-
operation_id_ddl = f"""
75-
CREATE SEQUENCE IF NOT EXISTS {config.operation_id_sequence} START 1
76-
"""
77-
await conn.execute(operation_id_ddl)
78-
79-
# Create messages table
80-
messages_ddl = f"""
81-
CREATE TABLE IF NOT EXISTS {config.messages_table} (
82-
session_id TEXT NOT NULL,
83-
operation_id INTEGER NOT NULL DEFAULT nextval('{config.operation_id_sequence}'),
84-
message_data TEXT NOT NULL,
85-
created_at TIMESTAMP DEFAULT NOW(),
86-
deleted_at TIMESTAMP NULL,
87-
PRIMARY KEY (session_id, operation_id),
88-
FOREIGN KEY (session_id)
89-
REFERENCES {config.sessions_table} (session_id)
90-
ON DELETE CASCADE
91-
)
92-
"""
93-
await conn.execute(messages_ddl)
94-
9559

9660
class PostgresSessionGetItemsRequest(BaseModel):
9761
config: PostgresSessionConfig
@@ -284,3 +248,50 @@ def _get_connection():
284248
if _connection_factory is None:
285249
raise ValueError("Connection factory not set")
286250
return _connection_factory()
251+
252+
@staticmethod
253+
async def init_schema(config: PostgresSessionConfig) -> None:
254+
conn = PostgresSession._get_connection()
255+
"""Initialize the PostgreSQL schema."""
256+
async with conn.transaction():
257+
# Create sessions table
258+
sessions_ddl = f"""
259+
CREATE TABLE IF NOT EXISTS {config.sessions_table} (
260+
session_id TEXT NOT NULL,
261+
created_at TIMESTAMP DEFAULT NOW(),
262+
updated_at TIMESTAMP DEFAULT NOW(),
263+
PRIMARY KEY (session_id)
264+
)
265+
"""
266+
await conn.execute(sessions_ddl)
267+
268+
# Create operation_id sequence
269+
operation_id_ddl = f"""
270+
CREATE SEQUENCE IF NOT EXISTS {config.operation_id_sequence} START 1
271+
"""
272+
await conn.execute(operation_id_ddl)
273+
274+
# Create messages table
275+
messages_ddl = f"""
276+
CREATE TABLE IF NOT EXISTS {config.messages_table} (
277+
session_id TEXT NOT NULL,
278+
operation_id INTEGER NOT NULL DEFAULT nextval('{config.operation_id_sequence}'),
279+
message_data TEXT NOT NULL,
280+
created_at TIMESTAMP DEFAULT NOW(),
281+
deleted_at TIMESTAMP NULL,
282+
PRIMARY KEY (session_id, operation_id),
283+
FOREIGN KEY (session_id)
284+
REFERENCES {config.sessions_table} (session_id)
285+
ON DELETE CASCADE
286+
)
287+
"""
288+
await conn.execute(messages_ddl)
289+
290+
@staticmethod
291+
def get_activities() -> list[Callable[[], activity.Activity]]:
292+
return [
293+
postgres_session_get_items_activity,
294+
postgres_session_add_items_activity,
295+
postgres_session_pop_item_activity,
296+
postgres_session_clear_session_activity,
297+
]

openai_agents/memory/run_postgres_session_worker.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
)
1515
from openai_agents.memory.postgres_session import (
1616
PostgresSessionConfig,
17-
init_schema,
1817
PostgresSession,
19-
postgres_session_get_items_activity,
20-
postgres_session_add_items_activity,
21-
postgres_session_pop_item_activity,
22-
postgres_session_clear_session_activity,
2318
)
2419
from openai_agents.memory.db_utils import IdempotenceHelper
2520

@@ -32,12 +27,18 @@ async def main():
3227
messages_table="session_messages",
3328
sessions_table="session",
3429
operation_id_sequence="session_operation_id_sequence",
30+
idempotence_table="activity_idempotence",
3531
)
36-
PostgresSession.set_connection_factory(lambda: db_connection)
37-
await init_schema(db_connection, config=postgres_session_config)
38-
idempotence_helper = IdempotenceHelper(table_name="activity_idempotence")
32+
33+
# Create the idempotence table. This is used to ensure that activities are idempotent with
34+
# respect to database modifications.
35+
idempotence_helper = IdempotenceHelper(table_name=postgres_session_config.idempotence_table)
3936
await idempotence_helper.create_table(db_connection)
37+
38+
# Configure the Postgres Session with the database connection.
39+
# Initialize the schema.
4040
PostgresSession.set_connection_factory(lambda: db_connection)
41+
await PostgresSession.init_schema(config=postgres_session_config)
4142

4243
# Create client connected to server at the given address
4344
client = await Client.connect(
@@ -53,16 +54,11 @@ async def main():
5354

5455
worker = Worker(
5556
client,
56-
task_queue="openai-agents-memory-task-queue",
57+
task_queue="openai-postgres-session-task-queue",
5758
workflows=[
5859
PostgresSessionWorkflow,
5960
],
60-
activities=[
61-
postgres_session_get_items_activity,
62-
postgres_session_add_items_activity,
63-
postgres_session_pop_item_activity,
64-
postgres_session_clear_session_activity,
65-
],
61+
activities=[*PostgresSession.get_activities()],
6662
)
6763
await worker.run()
6864

0 commit comments

Comments
 (0)