Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
import org.apache.ignite.internal.processors.continuous.StartRequestData;
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
Expand Down Expand Up @@ -508,7 +507,6 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(StartRequestData.class);
withNoSchema(StartRoutineDiscoveryMessage.class);
withNoSchema(StartRoutineAckDiscoveryMessage.class);
withNoSchema(StartRoutineDiscoveryMessageV2.class);

// [10600-10800]: Affinity & partition maps.
msgIdx = 10600;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker;
import org.apache.ignite.internal.thread.OomExceptionHandler;
Expand Down Expand Up @@ -211,26 +212,13 @@ public GridContinuousProcessor(GridKernalContext ctx) {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
assert !immutableDiscoCustomMsg;

if (ctx.isStopping())
return;

processStartRequestMutable(snd, msg);
}
});

ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
ClusterNode snd,
StartRoutineDiscoveryMessageV2 msg) {
assert immutableDiscoCustomMsg;

if (ctx.isStopping())
return;

processStartRequestImmutable(topVer, snd, msg);
if (immutableDiscoCustomMsg)
processStartRequestImmutable(topVer, snd, msg);
else
processStartRequestMutable(snd, msg);
}
});

Expand Down Expand Up @@ -992,17 +980,15 @@ private AbstractContinuousMessage createStartMessage(UUID routineId,
reqData.prepareMarshal(ctx);

if (!immutableDiscoCustomMsg) {
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(
routineId,
reqData);
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE);

if (hnd.updateCounters() != null)
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());

return msg;
}
else
return new StartRoutineDiscoveryMessageV2(routineId, reqData);
return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE);
}

/**
Expand Down Expand Up @@ -1468,7 +1454,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart
*/
private void processStartRequestImmutable(final AffinityTopologyVersion topVer,
final ClusterNode snd,
final StartRoutineDiscoveryMessageV2 msg) {
final StartRoutineDiscoveryMessage msg) {
StartRequestData reqData = msg.startRequestData();

ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;

import static org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE;

/**
* Discovery message used for Continuous Query registration.
*/
public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** Discovery message mode. */
enum Mode {
/** Mutable discovery mode. */
MUTABLE,

/** Immutable discovery mode. */
IMMUTABLE
}

/** */
@Order(0)
StartRequestData startReqData;

/** */
/** Errors collected by mutable discovery. */
@Order(1)
Map<UUID, ErrorMessage> errs = new HashMap<>();

Expand All @@ -46,14 +57,20 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
@Order(3)
Map<UUID, Map<Integer, Long>> updateCntrsPerNode;

/** Discovery message mode. */
@Order(4)
Mode mode;

/**
* @param routineId Routine id.
* @param startReqData Start request data.
* @param mode Discovery message mode.
*/
public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) {
super(routineId);

this.startReqData = startReqData;
this.mode = mode;
}

/** */
Expand Down Expand Up @@ -110,11 +127,14 @@ public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {

/** {@inheritDoc} */
@Override public boolean isMutable() {
return true;
return mode == MUTABLE;
}

/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
if (!isMutable())
return null;

return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,6 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo
org.apache.ignite.internal.processors.continuous.StartRequestData
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.DiscoverySpiTestListener;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.P2;
Expand Down Expand Up @@ -1051,7 +1050,7 @@ public void testAsyncOld() throws Exception {
}
}, IllegalStateException.class, null);

lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);

final String topic = "topic";

Expand Down Expand Up @@ -1149,7 +1148,7 @@ public void testAsync() throws Exception {

discoSpi.setInternalListener(lsnr);

lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);

final String topic = "topic";

Expand Down
Loading