Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.controller;
package io.javaoperatorsdk.operator.processing.event.source;

public enum ResourceAction {
ADDED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
Expand Down Expand Up @@ -83,26 +84,21 @@ public synchronized void start() {
}

@Override
public synchronized void handleEvent(
ResourceAction action,
T resource,
T oldResource,
Boolean deletedFinalStateUnknown,
boolean filterEvent) {
protected synchronized void handleEvent(
ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) {
try {
if (log.isDebugEnabled()) {
log.debug(
"Event received for resource: {} version: {} uuid: {} action: {} filter event: {}",
"Event received for resource: {} version: {} uuid: {} action: {}",
ResourceID.fromResource(resource),
getVersion(resource),
resource.getMetadata().getUid(),
action,
filterEvent);
action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
if (isAcceptedByFilters(action, resource, oldResource) && !filterEvent) {
if (isAcceptedByFilters(action, resource, oldResource)) {
if (deletedFinalStateUnknown != null) {
getEventHandler()
.handleEvent(
Expand Down Expand Up @@ -138,28 +134,36 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
}

@Override
public void onAdd(T resource) {
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
public synchronized void onAdd(T resource) {
handleOnAddOrUpdate(ResourceAction.ADDED, null, resource);
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
handleEvent(
ResourceAction.UPDATED,
newCustomResource,
oldCustomResource,
null,
handling != EventHandling.NEW);
public synchronized void onUpdate(T oldCustomResource, T newCustomResource) {
handleOnAddOrUpdate(ResourceAction.UPDATED, oldCustomResource, newCustomResource);
}

private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug(
"{} event propagation for action: {} resource id: {} ",
handling,
action,
ResourceID.fromResource(newCustomResource));
}
}

@Override
public void onDelete(T resource, boolean deletedFinalStateUnknown) {
public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up caches on
// delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown, false);
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

/**
* Extends ResourceEvent for informer Delete events, it holds also information if the final state is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

public class ResourceEvent extends Event {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {

private HasMetadata previousResource;

public ExtendedResourceEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata latestResource,
HasMetadata previousResource) {
super(action, resourceID, latestResource);
this.previousResource = previousResource;
}

public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onAdd(R newResource) {
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
onAddOrUpdate(Operation.ADD, newResource, null);
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
}

@Override
Expand All @@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) {
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
}

@Override
Expand All @@ -139,12 +139,8 @@ public synchronized void onDelete(R resource, boolean b) {
}

@Override
public void handleEvent(
ResourceAction action,
R resource,
R oldResource,
Boolean deletedFinalStateUnknown,
boolean filterEvent) {
protected void handleEvent(
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown) {
propagateEvent(resource);
}

Expand All @@ -156,27 +152,27 @@ public synchronized void start() {
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
}

private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);

var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);

if (eventHandling != EventHandling.NEW) {
log.debug(
"{} event propagation for {}. Resource ID: {}",
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
operation,
action,
ResourceID.fromResource(newObject));
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation."
+ " Resource ID: {}",
operation,
action,
resourceID);
propagateEvent(newObject);
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

Expand Down Expand Up @@ -251,11 +247,11 @@ public boolean allowsNamespaceChanges() {
return configuration().followControllerNamespaceChanges();
}

private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
private boolean eventAcceptedByFilter(ResourceAction action, R newObject, R oldObject) {
if (genericFilter != null && !genericFilter.accept(newObject)) {
return false;
}
if (operation == Operation.ADD) {
if (action == ResourceAction.ADDED) {
return onAddFilter == null || onAddFilter.accept(newObject);
} else {
return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject);
Expand All @@ -266,9 +262,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b))
&& (genericFilter == null || genericFilter.accept(resource));
}

private enum Operation {
ADD,
UPDATE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -90,6 +90,7 @@ public void changeNamespaces(Set<String> namespaces) {
* Also makes sure that the even produced by this update is filtered, thus does not trigger the
* reconciliation.
*/
@SuppressWarnings("unchecked")
public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<R> updateMethod) {
ResourceID id = ResourceID.fromResource(resourceToUpdate);
if (log.isDebugEnabled()) {
Expand All @@ -107,32 +108,38 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
id,
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
var updatedForLambda = updatedResource;
res.ifPresent(
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();
// for update we need to have a historic resource, this might be improved to mimic more
// realistic scenario

// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
// would
// reconnect and loose some events in between.
// If that update was not successful we still record the previous version from the
// actual
// event in the ExtendedResourceEvent.
R extendedResourcePrevVersion =
(r instanceof ExtendedResourceEvent)
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
: null;
R prevVersionOfResource =
updatedForLambda != null
? updatedForLambda
: (r.getAction() == ResourceAction.UPDATED ? latestResource : null);
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
handleEvent(
r.getAction(),
latestResource,
prevVersionOfResource,
!(r instanceof ResourceDeleteEvent)
|| ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(),
false);
});
(r instanceof ResourceDeleteEvent)
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
: null);
},
() -> log.debug("No new event present after the filtering update; id: {}", id));
}
}

public abstract void handleEvent(
ResourceAction action,
R resource,
R oldResource,
Boolean deletedFinalStateUnknown,
boolean filterEvent);
protected abstract void handleEvent(
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown);

@SuppressWarnings("unchecked")
@Override
Expand Down
Loading