Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/memos/mem_scheduler/base_mixins/queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
logger.warning("status_tracker.task_submitted failed", exc_info=True)

if self.disabled_handlers and msg.label in self.disabled_handlers:
logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content)
logger.debug(
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
msg.label,
msg.item_id,
msg.user_id,
msg.mem_cube_id,
)
continue

task_priority = self.orchestrator.get_task_priority(task_label=msg.label)
Expand All @@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
else:
queued_msgs.append(msg)

logger.info(
"Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s",
len(messages),
len(immediate_msgs),
len(queued_msgs),
"redis_queue" if self.use_redis_queue else "local_queue",
)

if immediate_msgs:
for m in immediate_msgs:
emit_monitor_event(
Expand Down Expand Up @@ -199,6 +213,15 @@ def _message_consumer(self) -> None:
if messages:
self.dispatcher.on_messages_enqueued(messages)

if len(messages) >= self.consume_batch:
unique_labels = sorted({msg.label for msg in messages})
logger.debug(
"Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s",
len(messages),
self.consume_batch,
unique_labels,
"redis_queue" if self.use_redis_queue else "local_queue",
)
self.dispatcher.dispatch(messages)
except Exception as e:
logger.error("Error dispatching messages: %s", e)
Expand Down
12 changes: 9 additions & 3 deletions src/memos/mem_scheduler/task_schedule_modules/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
if task_item.item_id in self._running_tasks:
task_item.mark_completed(result)
del self._running_tasks[task_item.item_id]
logger.info(f"Task completed: {task_item.get_execution_info()}")
logger.debug(f"Task completed: {task_item.get_execution_info()}")
return result

except Exception as e:
Expand Down Expand Up @@ -630,12 +630,12 @@ def execute_task(
with self._task_lock:
self._futures.add(future)
future.add_done_callback(self._handle_future_result)
logger.info(
logger.debug(
f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}."
)
else:
# For synchronous execution, the wrapper will run and remove the task upon completion
logger.info(
logger.debug(
f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}."
)
wrapped_handler(msgs)
Expand All @@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]):

# Group messages by user_id and mem_cube_id first
user_cube_groups = group_messages_by_user_and_mem_cube(msg_list)
logger.info(
"Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s",
len(msg_list),
len(user_cube_groups),
sorted({msg.label for msg in msg_list}),
)

# Process each user and mem_cube combination
for user_id, cube_groups in user_cube_groups.items():
Expand Down
20 changes: 17 additions & 3 deletions src/memos/mem_scheduler/task_schedule_modules/local_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ def put(

try:
self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout)
logger.info(
f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}"
logger.debug(
"Local queue enqueued. stream=%s size=%s label=%s item_id=%s",
stream_key,
self.queue_streams[stream_key].qsize(),
message.label,
message.item_id,
)
except Exception as e:
logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True)
Expand All @@ -117,7 +121,7 @@ def get(

# Return empty list if queue does not exist
if stream_key not in self.queue_streams:
logger.error(f"Stream {stream_key} does not exist when trying to get messages.")
logger.debug("Stream %s does not exist when trying to get messages", stream_key)
return []

# Ensure we always request a batch so we get a list back
Expand Down Expand Up @@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
fetched = self.get_nowait(stream_key=stream_key, batch_size=needed)
messages.extend(fetched)

if messages and len(messages) >= batch_size:
logger.debug(
"Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s",
len(messages),
batch_size,
len(stream_keys),
)

return messages

def qsize(self) -> dict:
Expand All @@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None:
if stream_key:
if stream_key in self.queue_streams:
self.queue_streams[stream_key].clear()
logger.info("Cleared local queue stream: %s", stream_key)
else:
for queue in self.queue_streams.values():
queue.clear()
logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams))

@property
def unfinished_tasks(self) -> int:
Expand Down
35 changes: 26 additions & 9 deletions src/memos/mem_scheduler/task_schedule_modules/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
if len(self.message_pack_cache) == 0:
return []
else:
return self.message_pack_cache.popleft()
batch = self.message_pack_cache.popleft()
if len(batch) >= batch_size:
logger.debug(
"[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s",
len(batch),
batch_size,
len(self.message_pack_cache),
len(self.get_stream_keys()),
)
return batch

def _ensure_consumer_group(self, stream_key) -> None:
"""Ensure the consumer group exists for the stream."""
Expand Down Expand Up @@ -449,9 +458,13 @@ def put(
message_id = self._redis_conn.xadd(
stream_key, message_data, maxlen=self.max_len, approximate=True
)

logger.info(
f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..."
logger.debug(
"[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s",
message_id,
stream_key,
message.label,
message.item_id,
len(self._stream_keys_cache),
)

except Exception as e:
Expand Down Expand Up @@ -494,7 +507,11 @@ def ack_message(
# Optionally delete the message from the stream to keep it clean
try:
self._redis_conn.xdel(stream_key, redis_message_id)
logger.info(f"Successfully delete acknowledged message {redis_message_id}")
logger.debug(
"[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s",
redis_message_id,
stream_key,
)
except Exception as e:
logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}")

Expand Down Expand Up @@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di
)
stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix)
if not stream_keys:
logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}")
logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}")
return {}

grouped: dict[str, dict[str, int]] = {}
Expand Down Expand Up @@ -1157,7 +1174,7 @@ def connect(self) -> None:
self._redis_conn.ping()
self._is_connected = True
self._check_xautoclaim_support()
logger.debug("Redis connection established successfully")
logger.info("Redis connection established successfully")
# Start stream keys refresher when connected
self._start_stream_keys_refresh_thread()
except Exception as e:
Expand All @@ -1174,7 +1191,7 @@ def disconnect(self) -> None:
self._stop_stream_keys_refresh_thread()
if self._is_listening:
self.stop_listening()
logger.debug("Disconnected from Redis")
logger.info("Disconnected from Redis")

def __enter__(self):
"""Context manager entry."""
Expand Down Expand Up @@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log(
self._stream_keys_cache = active_stream_keys
self._stream_keys_last_refresh = time.time()
cache_count = len(self._stream_keys_cache)
logger.info(
logger.debug(
f"Refreshed stream keys cache: {cache_count} active keys, "
f"{deleted_count} deleted, {len(candidate_keys)} candidates examined."
)
47 changes: 32 additions & 15 deletions src/memos/mem_scheduler/task_schedule_modules/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
"""Submit messages to the message queue (either local queue or Redis)."""
if isinstance(messages, ScheduleMessageItem):
messages = [messages]
if len(messages) < 1:
logger.error("submit_messages called with empty payload")
return

current_trace_id = get_current_trace_id()

Expand All @@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label
)

if len(messages) < 1:
logger.error("Submit empty")
elif len(messages) == 1:
if len(messages) == 1:
if getattr(messages[0], "timestamp", None) is None:
messages[0].timestamp = get_utc_now()
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
emit_monitor_event(
"enqueue",
messages[0],
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
)
self.memos_message_queue.put(messages[0])
if self.disabled_handlers and messages[0].label in self.disabled_handlers:
logger.debug(
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
messages[0].label,
messages[0].item_id,
messages[0].user_id,
messages[0].mem_cube_id,
)
else:
enqueue_ts = to_iso(getattr(messages[0], "timestamp", None))
emit_monitor_event(
"enqueue",
messages[0],
{"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0},
)
self.memos_message_queue.put(messages[0])
else:
user_cube_groups = group_messages_by_user_and_mem_cube(messages)

Expand All @@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
message.timestamp = get_utc_now()

if self.disabled_handlers and message.label in self.disabled_handlers:
logger.info(
f"Skipping disabled handler: {message.label} - {message.content}"
logger.debug(
"Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s",
message.label,
message.item_id,
message.user_id,
message.mem_cube_id,
)
continue

Expand All @@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
},
)
self.memos_message_queue.put(message)
logger.info(
f"Submitted message to local queue: {message.label} - {message.content}"
)

logger.info(
"Queue submit completed. backend=%s total=%s",
"redis_queue" if self.use_redis_queue else "local_queue",
len(messages),
)

def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
return self.memos_message_queue.get_messages(batch_size=batch_size)
Expand Down
Loading