Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 96 additions & 7 deletions client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,84 @@ val <Res> Response<Res>.response: Res
val <Res> SendResponse<Res>.sendStatus: SendResponse.SendStatus
get() = this.sendStatus()

/**
* Scope client communication, to send requests to services, virtual objects and workflows within a
* scope. Requires Restate >= 1.7.
*
* Example usage:
* ```kotlin
* val greeter = client.scope("my-scope").service<Greeter>()
* val response = greeter.greet("Alice")
* ```
*
* @param scopeKey the scope key to prepend to all invocation targets
* @return a scoped client
*/
@org.jetbrains.annotations.ApiStatus.Experimental
fun Client.scope(scopeKey: String): ScopedKotlinClient = ScopedKotlinClient(this, scopeKey)

/**
* Scope client communication, to send requests to services, virtual objects and workflows within a
* scope. Requires Restate >= 1.7.
*
* Obtain an instance via [Client.scope].
*/
@org.jetbrains.annotations.ApiStatus.Experimental
class ScopedKotlinClient
@PublishedApi
internal constructor(
@PublishedApi internal val client: Client,
@PublishedApi internal val scopeKey: String,
) {
/** @see Client.service */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> service(): SVC {
return service(client, SVC::class.java, scopeKey)
}

/** @see Client.virtualObject */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> virtualObject(key: String): SVC {
return virtualObject(client, SVC::class.java, key, scopeKey)
}

/** @see Client.workflow */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> workflow(key: String): SVC {
return workflow(client, SVC::class.java, key, scopeKey)
}

/** @see Client.toService */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> toService(): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveServiceAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(client, SVC::class.java, null, scopeKey)
}

/** @see Client.toVirtualObject */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> toVirtualObject(key: String): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveVirtualObjectAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(client, SVC::class.java, key, scopeKey)
}

/** @see Client.toWorkflow */
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> toWorkflow(key: String): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveWorkflowAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(client, SVC::class.java, key, scopeKey)
}
}

/**
* Create a proxy client for a Restate service.
*
Expand Down Expand Up @@ -332,15 +410,15 @@ inline fun <reified SVC : Any> Client.workflow(key: String): SVC {
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> service(client: Client, clazz: Class<SVC>): SVC {
internal fun <SVC : Any> service(client: Client, clazz: Class<SVC>, scope: String? = null): SVC {
ReflectionUtils.mustHaveServiceAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, null).toRequest()
val request = invocation.captureInvocation(serviceName, null, scope).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
Expand All @@ -359,15 +437,20 @@ internal fun <SVC : Any> service(client: Client, clazz: Class<SVC>): SVC {
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> virtualObject(client: Client, clazz: Class<SVC>, key: String): SVC {
internal fun <SVC : Any> virtualObject(
client: Client,
clazz: Class<SVC>,
key: String,
scope: String? = null,
): SVC {
ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, key).toRequest()
val request = invocation.captureInvocation(serviceName, key, scope).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
Expand All @@ -386,15 +469,20 @@ internal fun <SVC : Any> virtualObject(client: Client, clazz: Class<SVC>, key: S
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> workflow(client: Client, clazz: Class<SVC>, key: String): SVC {
internal fun <SVC : Any> workflow(
client: Client,
clazz: Class<SVC>,
key: String,
scope: String? = null,
): SVC {
ReflectionUtils.mustHaveWorkflowAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, key).toRequest()
val request = invocation.captureInvocation(serviceName, key, scope).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
Expand All @@ -418,6 +506,7 @@ internal constructor(
private val client: Client,
private val clazz: Class<SVC>,
private val key: String?,
private val scope: String? = null,
) {
/**
* Create a request by invoking a method on the target.
Expand All @@ -432,7 +521,7 @@ internal constructor(
suspend fun <Res> request(block: suspend SVC.() -> Res): KClientRequest<Any?, Res> {
return KClientRequestImpl(
client,
RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest(),
RequestCaptureProxy(clazz, key, scope).capture(block as suspend SVC.() -> Any?).toRequest(),
)
as KClientRequest<Any?, Res>
}
Expand Down
19 changes: 19 additions & 0 deletions client/src/main/java/dev/restate/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,25 @@ default Response<Output<Res>> getOutput() throws IngressException {
}
}

/**
* <b>EXPERIMENTAL API:</b> Scope client communication, to send requests to services, virtual
* objects and workflows within a scope. Requires Restate >= 1.7.
*
* <pre>{@code
* Client client = Client.connect("http://localhost:8080");
*
* var greeterProxy = client.scope("my-scope").service(Greeter.class);
* GreetingResponse output = greeterProxy.greet(new Greeting("Alice"));
* }</pre>
*
* @param scopeKey the scope key to prepend to all invocation targets
* @return a scoped client
*/
@org.jetbrains.annotations.ApiStatus.Experimental
default ScopedClient scope(String scopeKey) {
return new ScopedClient(this, scopeKey);
}

/**
* <b>EXPERIMENTAL API:</b> Simple API to invoke a Restate service from the ingress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ final class ClientServiceHandleImpl<SVC> implements ClientServiceHandle<SVC> {
private final Class<SVC> clazz;
private final String serviceName;
private final @Nullable String key;
private final @Nullable String scope;

private MethodInfoCollector<SVC> methodInfoCollector;

ClientServiceHandleImpl(Client innerClient, Class<SVC> clazz, @Nullable String key) {
this(innerClient, clazz, key, null);
}

ClientServiceHandleImpl(
Client innerClient, Class<SVC> clazz, @Nullable String key, @Nullable String scope) {
this.innerClient = innerClient;
this.clazz = clazz;
this.serviceName = ReflectionUtils.extractServiceName(clazz);
this.key = key;
this.scope = scope;
}

@SuppressWarnings("unchecked")
Expand All @@ -46,6 +53,7 @@ public <I, O> CompletableFuture<Response<O>> callAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input);
return innerClient.callAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -62,6 +70,7 @@ public <I> CompletableFuture<Response<Void>> callAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input);
return innerClient.callAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -78,6 +87,7 @@ public <O> CompletableFuture<Response<O>> callAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s);
return innerClient.callAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -93,6 +103,7 @@ public CompletableFuture<Response<Void>> callAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s);
return innerClient.callAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -109,6 +120,7 @@ public <I, O> CompletableFuture<SendResponse<O>> sendAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input);
return innerClient.sendAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -126,6 +138,7 @@ public <I> CompletableFuture<SendResponse<Void>> sendAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s, input);
return innerClient.sendAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -143,6 +156,7 @@ public <O> CompletableFuture<SendResponse<O>> sendAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s);
return innerClient.sendAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand All @@ -159,6 +173,7 @@ public CompletableFuture<SendResponse<Void>> sendAsync(
MethodInfo methodInfo = getMethodInfoCollector().resolve(s);
return innerClient.sendAsync(
toRequest(
scope,
serviceName,
key,
methodInfo.getHandlerName(),
Expand Down
Loading
Loading