Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/actions/get-version/version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ else
export sha1=-$(git rev-parse --short HEAD)
export changelist="-SNAPSHOT"
fi
echo "::set-output name=revision::$rev"
echo "::set-output name=sha1::$sha1"
echo "::set-output name=changelist::$changelist"
echo "revision=$rev" >> "$GITHUB_OUTPUT"
echo "sha1=$sha1" >> "$GITHUB_OUTPUT"
echo "changelist=$changelist" >> "$GITHUB_OUTPUT"
echo "Version will be:"
echo "${rev}${sha1}${changelist}"
2 changes: 0 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ jobs:
run: git fetch --depth=1 origin +refs/tags/*:refs/tags/*
- id: version
uses: ./.github/actions/get-version
with:
versionPattern: '${{ env.JAVA_REFERENCE_VERSION }}.[0-9]*.[0-9]*'
- uses: actions/cache@v4 # cache maven packages to speed up build
with:
path: ~/.m2
Expand Down
55 changes: 34 additions & 21 deletions client/src/main/java/io/opencmw/client/DataSourcePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -317,9 +314,9 @@ protected boolean handleDataSourceSockets() {
return dataAvailable;
}

protected <R> ThePromisedFuture<R, ?> newRequestFuture(final URI endpoint, final Class<R> requestedDomainObjType, final Command requestType, final String requestId) {
protected <R, C> ThePromisedFuture<R, C> newRequestFuture(final URI endpoint, final Class<R> requestedDomainObjType, final Command requestType, final String requestId) {
FilterRegistry.checkClassForNewFilters(requestedDomainObjType);
final ThePromisedFuture<R, ?> requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null);
final ThePromisedFuture<R, C> requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null);
final Object oldEntry = requests.put(requestId, requestFuture);
assert oldEntry == null : "requestID '" + requestId + "' already present in requestFutureMap";
return requestFuture;
Expand Down Expand Up @@ -367,19 +364,19 @@ protected void internalEventHandler(final RingBufferEvent event, final long sequ
replyDomainObject = ioClassSerialiser.deserialiseObject(reqClassType);
ioClassSerialiser.setDataBuffer(byteBuffer); // allow received byte array to be released
}
if (notifyFuture) {
domainObject.future.castAndSetReply(replyDomainObject); // notify callback
final Object contextObject;
if (domainObject.future.contextType == null || domainObject.future.contextType.equals(Map.class)) {
contextObject = QueryParameterParser.getMap(endpointURI.getQuery());
} else {
contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery());
}
if (domainObject.future.listener != null) {
final var finalDomainObj = replyDomainObject;
final Object contextObject;
if (domainObject.future.contextType == null) {
contextObject = QueryParameterParser.getMap(endpointURI.getQuery());
} else {
contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery());
}
executor.submit(() -> domainObject.future.notifyListener(finalDomainObj, contextObject)); // NOPMD - threads are ok, not a webapp
}
if (notifyFuture) {
domainObject.future.castAndSetReplyWithContext(replyDomainObject, contextObject); // notify callback
}
} catch (Exception e) { // NOPMD: exception is forwarded to client
final var sw = new StringWriter();
final var pw = new PrintWriter(sw);
Expand Down Expand Up @@ -453,18 +450,18 @@ private Client() { // accessed via outer class method
clientSocket.connect(inprocCtrl);
}

public <R, C> Future<R> get(URI endpoint, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
public <R, C> ThePromisedFuture<R, C> get(URI endpoint, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
final String requestId = clientId + internalReqIdGenerator.incrementAndGet();
final URI endpointQuery = getEndpointQuery(endpoint, requestContext);
final ThePromisedFuture<R, ?> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId);
final ThePromisedFuture<R, C> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId);
request(requestId, Command.GET_REQUEST, endpointQuery, null, requestContext, rbacProvider);
return rThePromisedFuture;
}

public <R, C> Future<R> set(final URI endpoint, final R requestBody, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
public <R, C> ThePromisedFuture<R, C> set(final URI endpoint, final R requestBody, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
final String requestId = clientId + internalReqIdGenerator.incrementAndGet();
final URI endpointQuery = getEndpointQuery(endpoint, requestContext);
final ThePromisedFuture<R, ?> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId);
final ThePromisedFuture<R, C> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId);
request(requestId, Command.SET_REQUEST, endpointQuery, requestBody, requestContext, rbacProvider);
return rThePromisedFuture;
}
Expand Down Expand Up @@ -529,6 +526,8 @@ private <R, C> void request(final String requestId, final Command requestType, f
msg.add(endpoint.toString());
if (requestBody == null) {
msg.add(EMPTY_ZFRAME);
} else if (requestBody instanceof BinaryData binaryData) {
msg.add(Arrays.copyOfRange(binaryData.data, 0, binaryData.dataSize > 0 ? binaryData.dataSize : binaryData.data.length));
} else {
final Class<? extends IoSerialiser> matchingSerialiser = DataSource.getFactory(endpoint).getMatchingSerialiserType(endpoint);
final IoClassSerialiser serialiser = getSerialiser(); // lazily initialize IoClassSerialiser
Expand All @@ -551,8 +550,9 @@ private <R, C> void request(final String requestId, final Command requestType, f
}
}

protected static class ThePromisedFuture<R, C> extends CustomFuture<R> { // NOPMD - no need for setters/getters here
public static class ThePromisedFuture<R, C> extends CustomFuture<R> { // NOPMD - no need for setters/getters here
private final URI endpoint;
private C replyContext;
private final Class<R> requestedDomainObjType;
private final Class<C> contextType;
private final Command requestType;
Expand Down Expand Up @@ -586,10 +586,12 @@ public String getInternalRequestID() {
}

public void notifyListener(final Object obj, final Object contextObject) {
if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass()) || !contextType.isAssignableFrom(contextObject.getClass())) {
if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass())) {
LOGGER.atError().addArgument(requestedDomainObjType.getName()).addArgument(obj == null ? "null" : obj.getClass().getName()).log("Got wrong type for notification, got {} expected {}");
} else if (contextType != null && !contextType.isAssignableFrom(contextObject.getClass())) {
LOGGER.atError().addArgument(contextObject.getClass().getName()).addArgument(contextType.getName()).log("Got wrong context type for notification, got {} expected {}");
} else {
//noinspection unchecked - cast is checked dynamically
// noinspection unchecked - cast is checked dynamically
listener.dataUpdate((R) obj, (C) contextObject); // NOPMD NOSONAR - cast is checked before implicitly
}
}
Expand All @@ -598,6 +600,17 @@ public void notifyListener(final Object obj, final Object contextObject) {
protected void castAndSetReply(final Object newValue) {
this.setReply((R) newValue);
}

@SuppressWarnings("unchecked")
protected void castAndSetReplyWithContext(final Object newValue, final Object contextObject) {
this.replyContext = (C) contextObject;
this.setReply((R) newValue);
}

public C getReplyContext() throws ExecutionException, InterruptedException {
super.get();
return this.replyContext;
}
}

protected static class InternalDomainObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -257,18 +258,21 @@ void testGetRequest() throws InterruptedException, ExecutionException, TimeoutEx
// get request
final URI requestURI = URI.create(brokerAddress + "/testWorker?ctx=FAIR.SELECTOR.C=3&contentType=application/octet-stream");
LOGGER.atDebug().addArgument(requestURI).log("requesting GET from endpoint: {}");
final Future<TestObject> future;
final DataSourcePublisher.ThePromisedFuture<TestObject, Map<String, List<String>>> future;
try (final DataSourcePublisher.Client client = dataSourcePublisher.getClient()) {
future = client.get(requestURI, null, TestObject.class); // uri_without_query oder serviceName + resolver, requestContext, type
}

// assert result
final TestObject result = future.get(1000, TimeUnit.MILLISECONDS);
assertEquals(referenceObject, result);
final Map<String, List<String>> context = future.getReplyContext();
assertEquals("FAIR.SELECTOR.C=3:P=5", context.get("ctx").get(0));

eventStore.stop();
dataSourcePublisher.stop();
}

@Test
void testGetRequestWithContext() throws InterruptedException, ExecutionException, TimeoutException {
final TestObject referenceObject = new TestObject("asdf", 42);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/opencmw/utils/CustomFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ public boolean isDone() {
* @throws IllegalStateException in case this method has been already called or Future has been cancelled
*/
public void setReply(final T newValue) {
this.reply = newValue;
if (done.getAndSet(true)) {
throw new IllegalStateException("future is not running anymore (either cancelled or already notified)");
}
this.reply = newValue;
notifyListener();
}

public void setException(final Throwable exception) {
this.exception = exception;
if (done.getAndSet(true)) {
throw new IllegalStateException("future is not running anymore (either cancelled or already notified)");
}
this.exception = exception;
notifyListener();
}

Expand Down
4 changes: 1 addition & 3 deletions server/src/main/java/io/opencmw/server/MajordomoWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ protected void serialiseData(final IoClassSerialiser classSerialiser, final IoBu
final URI reqTopic = rawCtx.req.topic;
rawCtx.rep.topic = new URI(reqTopic.getScheme(), reqTopic.getAuthority(), reqTopic.getPath(), replyQuery, reqTopic.getFragment());
} else {
final String oldQuery = rawCtx.rep.topic.getQuery();
final String newQuery = oldQuery == null || oldQuery.isBlank() ? replyQuery : (oldQuery + "&" + replyQuery);
rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), newQuery, null);
rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), replyQuery, null);
}
final MimeType replyMimeType = QueryParameterParser.getMimeType(replyQuery);
// no MIME type given -> stick with the one specified in the request (if it exists) or keep default: copy of raw binary data
Expand Down