Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.adk.summarizer;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.adk.events.Event;
import com.google.adk.events.EventCompaction;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.Session;
import com.google.genai.types.Content;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class performs event compaction by retaining the tail of the event stream.
*
* <ul>
* <li>Keeps the {@code retentionSize} most recent events raw.
* <li>Compacts all events that never compacted and older than the retained tail, including the
* most recent compaction event, into a new summary event in a sliding window fashion.
* <li>The new summary event is generated by the {@link BaseEventSummarizer}.
* <li>Appends this new summary event to the end of the event stream.
* </ul>
*/
public final class TailRetentionEventCompactor implements EventCompactor {

private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class);

private final BaseEventSummarizer summarizer;
private final int retentionSize;

public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) {
this.summarizer = summarizer;
this.retentionSize = retentionSize;
}

@Override
public Completable compact(Session session, BaseSessionService sessionService) {
checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction");
logger.debug("Running tail retention event compaction for session {}", session.id());

return Completable.fromMaybe(
getCompactionEvents(session.events())
.flatMap(summarizer::summarizeEvents)
.flatMapSingle(e -> sessionService.appendEvent(session, e)));
}

/**
* Identifies events to be compacted based on the tail retention strategy.
*
* <p>This method iterates backwards through the event list to find the most recent compaction
* event (if any) and collects all uncompacted events that occurred after the range covered by
* that compaction. It then applies the retention policy, excluding the most recent {@code
* retentionSize} events from the summary.
*
* <p><b>Example Scenario:</b>
*
* <p>Consider a case where retention size is 3. An event (E4) appears before an older compaction
* event (C1) in the list due to previous retention, but is not covered by C1. Later, a newer
* compaction (C2) occurs covering E2 and E3.
*
* <ul>
* <li>T=1: E1
* <li>T=2: E2
* <li>T=3: E3
* <li>T=4: E4
* <li>T=5: C1 (Covers T=1). <i>List: E2, E3, E4 </i> are preserved.
* <li>T=6: E6
* <li>T=7: E7
* <li>T=8: C2 (Covers T=2 to T=3). <i>List: E4, E6, E7</i> are preserved. The compaction events
* in this round is <i>List: C1, E2, E3</i>
* <li>T=9: E9.
* </ul>
*
* <p><b>Execution with Retention = 3:</b>
*
* <ol>
* <li>The method scans backward: E7, C2, E6, E5, C1, E4...
* <li><b>C2</b> is identified as the most recent compaction event (end timestamp T=3).
* <li><b>E7, E6, E5</b> are collected as they are newer than T=3.
* <li><b>C1</b> is ignored as we only care about the boundary set by the latest compaction.
* <li><b>E4</b> (T=4) is collected because it is newer than T=3.
* <li>Scanning stops at E3 (or earlier) as it is covered by C2 (timestamp <= T=3).
* <li>The initial list of events to summarize (reversed back to chronological order): <b>[C2,
* E4, E5, E6, E7]</b>.
* <li>Applying retention (keep last 3): <b>E5, E6, E7</b> are removed from the summary list.
* <li><b>Final Output:</b> {@code [C2, E4]}. E4 and the previous summary C2 will be compacted
* together.
* </ol>
*/
private Maybe<List<Event>> getCompactionEvents(List<Event> events) {
// If there are not enough events to summarize, we can return early.
if (events.size() <= retentionSize) {
return Maybe.empty();
}

long compactionEndTimestamp = Long.MIN_VALUE;
Event lastCompactionEvent = null;
List<Event> eventsToSummarize = new ArrayList<>();

// Iterate backwards from the end of the window to summarize.
ListIterator<Event> iter = events.listIterator(events.size());
while (iter.hasPrevious()) {
Event event = iter.previous();

if (!isCompactEvent(event)) {
// Only include events that are strictly after the last compaction range.
if (event.timestamp() > compactionEndTimestamp) {
eventsToSummarize.add(event);
continue;
} else {
// Exit early if we have reached the last event of last compaction range.
break;
}
}
EventCompaction compaction = event.actions().compaction().orElse(null);

// We only rely on the most recent compaction event to set the boundary. Older compaction
// events are ignored.
if (lastCompactionEvent == null) {
compactionEndTimestamp = compaction.endTimestamp();
lastCompactionEvent = event;
}
}

if (eventsToSummarize.size() <= retentionSize) {
return Maybe.empty();
}

// Add the last compaction event to the list of events to summarize.
// This is to ensure that the last compaction event is included in the summary.
if (lastCompactionEvent != null) {
// Use the compacted content for the compaction event.
Content content = lastCompactionEvent.actions().compaction().get().compactedContent();
eventsToSummarize.add(lastCompactionEvent.toBuilder().content(content).build());
}

Collections.reverse(eventsToSummarize);

// Apply retention: keep the most recent 'retentionSize' events out of the summary.
// We do this by removing them from the list of events to be summarized.
eventsToSummarize
.subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size())
.clear();
return Maybe.just(eventsToSummarize);
}

private static boolean isCompactEvent(Event event) {
return event.actions() != null && event.actions().compaction().isPresent();
}
}
Loading