Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,27 @@ public int unlisten(String event, EventListener listener) {
return count;
}

/**
* Notify all registered listeners for {@code event} EXCEPT
* {@code ignoredListener}. ANY_EVENT listeners are notified unless they
* are the ignored one.
*
* @return a Future<Integer> resolving to the count of listeners actually
* invoked (the ignored listener is NOT counted)
*/
public Future<Integer> notifyExcept(String event,
EventListener ignoredListener,
@Nullable Object... args) {
return this.notify(event, ignoredListener, args);
}

public Future<Integer> notify(String event, @Nullable Object... args) {
return this.notify(event, null, args);
}

private Future<Integer> notify(String event,
EventListener ignoredListener,
@Nullable Object... args) {
@SuppressWarnings("resource")
ExtendableIterator<EventListener> all = new ExtendableIterator<>();

Expand All @@ -173,8 +193,12 @@ public Future<Integer> notify(String event, @Nullable Object... args) {
int count = 0;
// Notify all listeners, and ignore the results
while (all.hasNext()) {
EventListener listener = all.next();
if (listener == ignoredListener) {
continue;
}
try {
all.next().event(ev);
listener.event(ev);
count++;
} catch (Throwable e) {
LOG.warn("Failed to handle event: {}", ev, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,43 @@ public void testEventNotifyWithArg2() {
Assert.assertEquals(1, count.get());
}

@Test
public void testNotifyExcept() throws Exception {
final String notify = "event-notify";
AtomicInteger listenerACount = new AtomicInteger();
AtomicInteger listenerBCount = new AtomicInteger();
AtomicInteger listenerCCount = new AtomicInteger();

EventListener listenerA = event -> {
event.checkArgs(String.class);
Assert.assertEquals("fake-arg", event.args()[0]);
listenerACount.incrementAndGet();
return true;
};
EventListener listenerB = event -> {
listenerBCount.incrementAndGet();
return true;
};
EventListener listenerC = event -> {
event.checkArgs(String.class);
Assert.assertEquals("fake-arg", event.args()[0]);
listenerCCount.incrementAndGet();
return true;
};

this.eventHub.listen(notify, listenerA);
this.eventHub.listen(notify, listenerB);
this.eventHub.listen(EventHub.ANY_EVENT, listenerC);

Assert.assertEquals(2, (int) this.eventHub
.notifyExcept(notify, listenerB,
"fake-arg")
.get());
Assert.assertEquals(1, listenerACount.get());
Assert.assertEquals(0, listenerBCount.get());
Assert.assertEquals(1, listenerCCount.get());
}

@Test
public void testEventNotifyWithMultiThreads() throws InterruptedException {
final String notify = "event-notify";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
"Expect event action argument");
String action = (String) args[0];
LOG.debug("Event action: {}", action);
if (Cache.ACTION_INVALIDED.equals(action)) {
if (Cache.ACTION_INVALID.equals(action)) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
Object ids = args[2];
Expand All @@ -1410,7 +1410,7 @@ public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
E.checkArgument(false, "Unexpected argument: %s", ids);
}
return true;
} else if (Cache.ACTION_CLEARED.equals(action)) {
} else if (Cache.ACTION_CLEAR.equals(action)) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
LOG.debug("Calling proxy.clear with type: {}", type);
Expand All @@ -1435,17 +1435,20 @@ public void close() {

@Override
public void invalid(HugeType type, Id id) {
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id);
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
Cache.ACTION_INVALID, type, id);
}

@Override
public void invalid2(HugeType type, Object[] ids) {
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids);
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
Cache.ACTION_INVALID, type, ids);
}

@Override
public void clear(HugeType type) {
this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
Cache.ACTION_CLEAR, type);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public interface Cache<K, V> {

String ACTION_INVALID = "invalid";
String ACTION_CLEAR = "clear";
String ACTION_INVALIDED = "invalided";
String ACTION_CLEARED = "cleared";

V get(K id);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.backend.cache;

import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;

/*
* Listener lifetime must cover all active transactions for the graph.
* The holder is removed from the registry and unregistered from EventHub
* only when the last transaction releases it.
*/
final class CacheListenerHolder {

final EventListener listener;
final EventHub hub;
int refCount;

CacheListenerHolder(EventListener listener, EventHub hub) {
this.listener = listener;
this.hub = hub;
this.refCount = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.CachedBackendStore.QueryId;
Expand Down Expand Up @@ -60,11 +62,20 @@ public final class CachedGraphTransaction extends GraphTransaction {
private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
private static final long AVG_EDGE_ENTRY_SIZE = 100L;

/*
* Listener lifetime must cover all active transactions for the graph.
* The holder is removed from the registry and unregistered from EventHub
* only when the last transaction releases it.
*/
private static final ConcurrentMap<String, CacheListenerHolder>
graphCacheEventListeners = new ConcurrentHashMap<>();

private final Cache<Id, Object> verticesCache;
private final Cache<Id, Object> edgesCache;

private EventListener storeEventListener;
private EventListener cacheEventListener;
private CacheListenerHolder holder;

public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
Expand Down Expand Up @@ -138,7 +149,7 @@ private void listenChanges() {
}

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
EventListener listener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
Object[] args = event.args();
Expand Down Expand Up @@ -184,31 +195,67 @@ private void listenChanges() {
}
return false;
};
if (graphCacheListenStatus.putIfAbsent(this.params().spaceGraphName(), true) == null) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
EventHub graphEventHub = this.params().graphEventHub();
String graphName = this.params().spaceGraphName();
CacheListenerHolder acquired = graphCacheEventListeners.compute(
graphName, (key, existing) -> {
if (existing == null || existing.hub != graphEventHub) {
// Graph close/reopen creates a new EventHub for the
// same graph name; replace the stale holder. Old
// transactions skip decrement via identity check.
if (existing != null) {
existing.hub.unlisten(Events.CACHE,
existing.listener);
}
graphEventHub.listen(Events.CACHE, listener);
return new CacheListenerHolder(listener, graphEventHub);
}
existing.refCount++;
return existing;
});
this.holder = acquired;
this.cacheEventListener = acquired.listener;
}

private void unlistenChanges() {
String graphName = this.params().spaceGraphName();
if (graphCacheListenStatus.remove(graphName) != null) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
CacheListenerHolder ours = this.holder;
if (ours != null) {
graphCacheEventListeners.compute(graphName, (key, existing) -> {
if (existing == null || existing != ours) {
return existing;
}
existing.refCount--;
if (existing.refCount == 0) {
existing.hub.unlisten(Events.CACHE, existing.listener);
return null;
}
return existing;
});
this.holder = null;
this.cacheEventListener = null;
}
// TODO (follow-up): storeEventListenStatus has the same owner-first
// close bug this PR fixes for graphCacheEventListeners. A non-owner
// transaction can remove the tracking entry, unlisten its own
// never-registered storeEventListener as a no-op, and leave the
// original store listener registered but untracked. Apply the same
// ref-counted holder pattern in a follow-up PR.
if (storeEventListenStatus.remove(graphName) != null) {
this.store().provider().unlisten(this.storeEventListener);
}
}

private void notifyChanges(String action, HugeType type, Id[] ids) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.notify(Events.CACHE, action, type, ids);
graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
action, type, ids);
}

private void notifyChanges(String action, HugeType type) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.notify(Events.CACHE, action, type);
graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
action, type);
}

public void clearCache(HugeType type, boolean notify) {
Expand All @@ -220,7 +267,7 @@ public void clearCache(HugeType type, boolean notify) {
}

if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null);
this.notifyChanges(Cache.ACTION_CLEAR, null);
}
}

Expand Down Expand Up @@ -397,7 +444,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
this.verticesCache.invalidate(vertex.id());
}
if (vertexOffset > 0) {
this.notifyChanges(Cache.ACTION_INVALIDED,
this.notifyChanges(Cache.ACTION_INVALID,
HugeType.VERTEX, vertexIds);
}
}
Expand All @@ -411,7 +458,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
if (invalidEdgesCache && this.enableCacheEdge()) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
}
}
}
Expand All @@ -425,7 +472,7 @@ public void removeIndex(IndexLabel indexLabel) {
if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
}
}
}
Expand Down
Loading
Loading