Skip to content

Commit 2d306c4

Browse files
jniocheclaude
andauthored
Extract WaitAckCache to deduplicate OpenSearch bulk response handling (#1869)
* Extract WaitAckCache to deduplicate bulk response handling in OpenSearch bolts The waitAck cache logic and bulk response processing were duplicated across DeletionBolt, IndexerBolt, and StatusUpdaterBolt. This extracts the shared logic into a new WaitAckCache class and adds unit tests covering the core scenarios (success, failure, conflicts, eviction, duplicate doc IDs). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix formatting Signed-off-by: Julien Nioche <julien@digitalpebble.com> * checkstyle fix Signed-off-by: Julien Nioche <julien@digitalpebble.com> * checkstyle fix (again - should go back to eating chocolate instead of working in my spare time) Signed-off-by: Julien Nioche <julien@digitalpebble.com> * Addressed comments by reviewer Signed-off-by: Julien Nioche <julien@digitalpebble.com> * Formatting Signed-off-by: Julien Nioche <julien@digitalpebble.com> * Formatting Signed-off-by: Julien Nioche <julien@digitalpebble.com> --------- Signed-off-by: Julien Nioche <julien@digitalpebble.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1548d2b commit 2d306c4

5 files changed

Lines changed: 727 additions & 585 deletions

File tree

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.stormcrawler.opensearch;
19+
20+
import com.github.benmanes.caffeine.cache.Cache;
21+
import com.github.benmanes.caffeine.cache.Caffeine;
22+
import com.github.benmanes.caffeine.cache.RemovalCause;
23+
import com.github.benmanes.caffeine.cache.Ticker;
24+
import java.util.Arrays;
25+
import java.util.HashSet;
26+
import java.util.LinkedList;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Consumer;
32+
import java.util.stream.Collectors;
33+
import org.apache.storm.tuple.Tuple;
34+
import org.apache.stormcrawler.metrics.ScopedCounter;
35+
import org.jetbrains.annotations.Nullable;
36+
import org.opensearch.action.DocWriteRequest;
37+
import org.opensearch.action.bulk.BulkItemResponse;
38+
import org.opensearch.action.bulk.BulkRequest;
39+
import org.opensearch.action.bulk.BulkResponse;
40+
import org.opensearch.core.rest.RestStatus;
41+
import org.slf4j.Logger;
42+
43+
/**
44+
* Thread-safe cache that tracks in-flight tuples awaiting bulk acknowledgment from OpenSearch.
45+
* Provides shared logic for processing bulk responses and failing tuples on error, used by
46+
* IndexerBolt, DeletionBolt, and StatusUpdaterBolt.
47+
*/
48+
public class WaitAckCache {
49+
50+
/** Callback invoked for each tuple when processing a successful bulk response. */
51+
@FunctionalInterface
52+
public interface TupleAction {
53+
void handle(String id, Tuple tuple, BulkItemResponseToFailedFlag selected);
54+
}
55+
56+
private final Cache<String, List<Tuple>> cache;
57+
private final java.util.concurrent.locks.ReentrantLock lock =
58+
new java.util.concurrent.locks.ReentrantLock(true);
59+
private final Logger log;
60+
private final Consumer<Tuple> onEviction;
61+
62+
/** Creates a cache with a fixed 60-second expiry. */
63+
public WaitAckCache(Logger log, Consumer<Tuple> onEviction) {
64+
this(Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS), log, onEviction);
65+
}
66+
67+
/** Creates a cache from a Caffeine spec string (e.g. "expireAfterWrite=300s"). */
68+
public WaitAckCache(String cacheSpec, Logger log, Consumer<Tuple> onEviction) {
69+
this(Caffeine.from(cacheSpec), log, onEviction);
70+
}
71+
72+
/** Creates a cache with a custom ticker for deterministic time control in tests. */
73+
WaitAckCache(String cacheSpec, Logger log, Consumer<Tuple> onEviction, Ticker ticker) {
74+
this(Caffeine.from(cacheSpec).ticker(ticker).executor(Runnable::run), log, onEviction);
75+
}
76+
77+
private WaitAckCache(Caffeine<Object, Object> builder, Logger log, Consumer<Tuple> onEviction) {
78+
this.log = log;
79+
this.onEviction = onEviction;
80+
this.cache =
81+
builder.<String, List<Tuple>>removalListener(
82+
(String key, List<Tuple> value, RemovalCause cause) -> {
83+
if (!cause.wasEvicted()) {
84+
return;
85+
}
86+
if (value != null) {
87+
log.error(
88+
"Purged from waitAck {} with {} values",
89+
key,
90+
value.size());
91+
for (Tuple t : value) {
92+
onEviction.accept(t);
93+
}
94+
} else {
95+
log.error("Purged from waitAck {} with no values", key);
96+
}
97+
})
98+
.build();
99+
}
100+
101+
public long estimatedSize() {
102+
return cache.estimatedSize();
103+
}
104+
105+
/** Adds a tuple to the cache under the given document ID, creating the list if needed. */
106+
public void addTuple(String docID, Tuple tuple) {
107+
lock.lock();
108+
try {
109+
List<Tuple> tt = cache.get(docID, k -> new LinkedList<>());
110+
tt.add(tuple);
111+
if (log.isDebugEnabled()) {
112+
String url = (String) tuple.getValueByField("url");
113+
log.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size());
114+
}
115+
} finally {
116+
lock.unlock();
117+
}
118+
}
119+
120+
/** Returns true if the cache contains an entry for the given document ID. */
121+
public boolean contains(String docID) {
122+
lock.lock();
123+
try {
124+
return cache.getIfPresent(docID) != null;
125+
} finally {
126+
lock.unlock();
127+
}
128+
}
129+
130+
/** Forces pending cache maintenance, triggering eviction listeners for expired entries. */
131+
public void cleanUp() {
132+
cache.cleanUp();
133+
}
134+
135+
/** Fails all remaining tuples in the cache and invalidates all entries. */
136+
public void shutdown() {
137+
lock.lock();
138+
try {
139+
Map<String, List<Tuple>> remaining = cache.asMap();
140+
for (var entry : remaining.entrySet()) {
141+
log.warn(
142+
"Shutdown: failing {} tuple(s) for ID {}",
143+
entry.getValue().size(),
144+
entry.getKey());
145+
for (Tuple t : entry.getValue()) {
146+
onEviction.accept(t);
147+
}
148+
}
149+
cache.invalidateAll();
150+
} finally {
151+
lock.unlock();
152+
}
153+
}
154+
155+
/** Invalidates a single cache entry. */
156+
public void invalidate(String docID) {
157+
lock.lock();
158+
try {
159+
cache.invalidate(docID);
160+
} finally {
161+
lock.unlock();
162+
}
163+
}
164+
165+
/**
166+
* Processes a successful bulk response: classifies each item (conflict vs failure), retrieves
167+
* cached tuples, selects the best response per document ID, and invokes the action for each
168+
* tuple.
169+
*
170+
* @param conflictCounter optional metric counter; if non-null, increments "doc_conflicts" scope
171+
* for each conflict
172+
*/
173+
public void processBulkResponse(
174+
BulkResponse response,
175+
long executionId,
176+
@Nullable ScopedCounter conflictCounter,
177+
TupleAction action) {
178+
179+
var idsToBulkItems =
180+
Arrays.stream(response.getItems())
181+
.map(
182+
bir -> {
183+
BulkItemResponse.Failure f = bir.getFailure();
184+
boolean failed = false;
185+
if (f != null) {
186+
if (f.getStatus().equals(RestStatus.CONFLICT)) {
187+
if (conflictCounter != null) {
188+
conflictCounter.scope("doc_conflicts").incrBy(1);
189+
}
190+
log.debug("Doc conflict ID {}", bir.getId());
191+
} else {
192+
log.error(
193+
"Bulk item failure ID {}: {}", bir.getId(), f);
194+
failed = true;
195+
}
196+
}
197+
return new BulkItemResponseToFailedFlag(bir, failed);
198+
})
199+
.collect(
200+
// https://github.com/apache/stormcrawler/issues/832
201+
Collectors.groupingBy(b -> b.id, Collectors.toUnmodifiableList()));
202+
203+
Map<String, List<Tuple>> presentTuples;
204+
long estimatedSize;
205+
Set<String> debugInfo = null;
206+
lock.lock();
207+
try {
208+
presentTuples = cache.getAllPresent(idsToBulkItems.keySet());
209+
if (!presentTuples.isEmpty()) {
210+
cache.invalidateAll(presentTuples.keySet());
211+
}
212+
estimatedSize = cache.estimatedSize();
213+
if (log.isDebugEnabled() && estimatedSize > 0L) {
214+
debugInfo = new HashSet<>(cache.asMap().keySet());
215+
}
216+
} finally {
217+
lock.unlock();
218+
}
219+
220+
int ackCount = 0;
221+
int failureCount = 0;
222+
223+
for (var entry : presentTuples.entrySet()) {
224+
final var id = entry.getKey();
225+
final var tuples = entry.getValue();
226+
final var bulkItems = idsToBulkItems.get(id);
227+
228+
BulkItemResponseToFailedFlag selected = selectBest(bulkItems, id);
229+
230+
if (tuples != null) {
231+
log.debug("Found {} tuple(s) for ID {}", tuples.size(), id);
232+
for (Tuple t : tuples) {
233+
if (selected.failed) {
234+
failureCount++;
235+
} else {
236+
ackCount++;
237+
}
238+
action.handle(id, t, selected);
239+
}
240+
} else {
241+
log.warn("Could not find unacked tuples for {}", id);
242+
}
243+
}
244+
245+
log.info(
246+
"Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}",
247+
executionId,
248+
idsToBulkItems.size(),
249+
estimatedSize,
250+
ackCount,
251+
failureCount);
252+
253+
if (debugInfo != null) {
254+
for (String k : debugInfo) {
255+
log.debug("Still in wait ack after bulk response [{}] => {}", executionId, k);
256+
}
257+
}
258+
}
259+
260+
/** Processes a failed bulk request by failing all associated tuples. */
261+
public void processFailedBulk(
262+
BulkRequest request, long executionId, Throwable failure, Consumer<Tuple> failAction) {
263+
264+
log.error("Exception with bulk {} - failing the whole lot ", executionId, failure);
265+
266+
final var failedIds =
267+
request.requests().stream()
268+
.map(DocWriteRequest::id)
269+
.collect(Collectors.toUnmodifiableSet());
270+
271+
Map<String, List<Tuple>> failedTupleLists;
272+
lock.lock();
273+
try {
274+
failedTupleLists = cache.getAllPresent(failedIds);
275+
if (!failedTupleLists.isEmpty()) {
276+
cache.invalidateAll(failedTupleLists.keySet());
277+
}
278+
} finally {
279+
lock.unlock();
280+
}
281+
282+
for (var id : failedIds) {
283+
var tuples = failedTupleLists.get(id);
284+
if (tuples != null) {
285+
log.debug("Failed {} tuple(s) for ID {}", tuples.size(), id);
286+
for (Tuple t : tuples) {
287+
failAction.accept(t);
288+
}
289+
} else {
290+
log.warn("Could not find unacked tuple for {}", id);
291+
}
292+
}
293+
}
294+
295+
/**
296+
* Selects the best response when there are multiple bulk items for the same document ID.
297+
* Prefers non-failed responses; warns when there is a mix of success and failure. If all items
298+
* are failed, returns the first one (no warning logged since there is no ambiguity).
299+
*/
300+
private BulkItemResponseToFailedFlag selectBest(
301+
List<BulkItemResponseToFailedFlag> items, String id) {
302+
if (items.size() == 1) {
303+
return items.get(0);
304+
}
305+
306+
BulkItemResponseToFailedFlag best = items.get(0);
307+
int failedCount = 0;
308+
for (var item : items) {
309+
if (item.failed) {
310+
failedCount++;
311+
} else {
312+
best = item;
313+
}
314+
}
315+
if (failedCount > 0 && failedCount < items.size()) {
316+
log.warn(
317+
"The id {} would result in an ack and a failure."
318+
+ " Using only the ack for processing.",
319+
id);
320+
}
321+
return best;
322+
}
323+
}

0 commit comments

Comments
 (0)