Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ kotlinx-io = "0.9.0"
ktor = "3.3.3"
logging = "8.0.01"
mockk = "1.14.9"
mokksy = "0.8.1"
mokksy = "0.9.1"
serialization = "1.10.0"
slf4j = "2.0.17"
logback = "1.5.32"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const val MCP_SESSION_ID_HEADER = "MCP-Session-Id"

internal class MockMcp(verbose: Boolean = false) : AutoCloseable {

private val mokksy: Mokksy = Mokksy(verbose = verbose).apply {
private val mokksy = Mokksy(verbose = verbose).apply {
start()
}

Expand Down Expand Up @@ -90,7 +90,7 @@ internal class MockMcp(verbose: Boolean = false) : AutoCloseable {
expectedSessionId: String? = null,
vararg bodyPredicates: (JSONRPCRequest) -> Boolean,
): BuildingStep<JSONRPCRequest> = mokksy.method(
configuration = StubConfiguration(removeAfterMatch = true),
configuration = StubConfiguration.once(),
httpMethod = httpMethod,
requestType = JSONRPCRequest::class,
) {
Expand Down Expand Up @@ -219,20 +219,18 @@ internal class MockMcp(verbose: Boolean = false) : AutoCloseable {
sessionId = sessionId,
) respondsWithSseStream {
headers += MCP_SESSION_ID_HEADER to sessionId
this.flow = block.invoke()
flow = block.invoke()
}
}

fun mockUnsubscribeRequest(sessionId: String) {
mokksy.delete(
configuration = StubConfiguration(removeAfterMatch = true),
configuration = StubConfiguration.once(),
requestType = JSONRPCRequest::class,
) {
path("/mcp")
containsHeader(MCP_SESSION_ID_HEADER, sessionId)
} respondsWith {
body = null
}
} respondsWithStatus HttpStatusCode.OK
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.modelcontextprotocol.kotlin.sdk.client

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContain
import io.kotest.matchers.shouldBe
import io.ktor.http.ContentType
Expand All @@ -9,17 +10,22 @@ import io.ktor.sse.ServerSentEvent
import io.modelcontextprotocol.kotlin.sdk.types.ClientCapabilities
import io.modelcontextprotocol.kotlin.sdk.types.EmptyJsonObject
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
import io.modelcontextprotocol.kotlin.sdk.types.Method
import io.modelcontextprotocol.kotlin.sdk.types.ProgressNotification
import io.modelcontextprotocol.kotlin.sdk.types.ProgressToken
import io.modelcontextprotocol.kotlin.sdk.types.Tool
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import kotlinx.serialization.json.putJsonObject
import org.junit.jupiter.api.TestInstance
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.test.Test
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
Expand All @@ -46,28 +52,29 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
mockMcp.onJSONRPCRequest(
httpMethod = HttpMethod.Post,
jsonRpcMethod = "initialize",
).respondsWithStream {
) respondsWithSseStream {
headers += MCP_SESSION_ID_HEADER to sessionId
flow = flowOf(
"id: ${Uuid.random()}\n",
"data:\n", // empty data
"\n",
"id: ${Uuid.random()}\n",
"data: \t \n", // tabs and spaces
"\n",
"id: ${Uuid.random()}\n",
"event: message\n",
// multiline data
"data: {\n",
"data: \"result\":{\n" +
"data: \"protocolVersion\":\"2025-06-18\",\n" +
"data: \"capabilities\":{},\n" +
"data: \"serverInfo\":{\"name\":\"simple-streamable-http-server\",\"version\":\"1.0.0\"}\n" +
"data: },\n" +
"data: \"jsonrpc\":\"2.0\",\n" +
"data: \"id\":\"7ce065b0678f49e5b04ce5a0fcc7d518\"\n" +
"data: }\n",
"\n",
// empty data — should be skipped by client
chunks += ServerSentEvent(data = "", id = Uuid.random().toString())
// whitespace-only data — should be skipped by client
chunks += ServerSentEvent(data = " \t ", id = Uuid.random().toString())
// valid initialize response with multiline JSON
@Suppress("MaxLineLength")
chunks += ServerSentEvent(
event = "message",
id = Uuid.random().toString(),
//language=json
data = """{
|"result":{
| "protocolVersion":"2025-06-18",
| "capabilities":{},
| "serverInfo":{"name":"simple-streamable-http-server","version":"1.0.0"}
|},
|"jsonrpc":"2.0",
|"id":"7ce065b0678f49e5b04ce5a0fcc7d518"
|}
|
""".trimMargin(),
)
}

Expand Down Expand Up @@ -111,30 +118,30 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {
statusCode = HttpStatusCode.Accepted,
)

@Suppress("MaxLineLength")
mockMcp.handleSubscribeWithGet(sessionId) {
flow {
delay(500.milliseconds)
emit(
ServerSentEvent(
event = "message",
id = "1",
data = @Suppress("MaxLineLength")
//language=json
"""{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""",
),
)
delay(200.milliseconds)
emit(
ServerSentEvent(
data = @Suppress("MaxLineLength")
//language=json
"""{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":50,"total":100}}""",
),
)
for (i in 0..10) {
delay(100.milliseconds)
emit(
ServerSentEvent(
event = "message",
id = "1",
//language=json
data =
"""{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"upload-123","progress":${i * 10},"total":100}}""",
),
)
}
awaitCancellation()
}
}

// TODO: how to get notifications via Client API?
val receivedNotifications = CopyOnWriteArrayList<ProgressNotification>()
client.setNotificationHandler<ProgressNotification>(Method.Defined.NotificationsProgress) {
receivedNotifications.add(it)
CompletableDeferred(Unit)
}

mockMcp.handleWithResult(
jsonRpcMethod = "tools/list",
Expand Down Expand Up @@ -176,6 +183,16 @@ internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() {

connect(client)

eventually(5.seconds) {
receivedNotifications.size shouldBe 11 // 0..100 with step 10

receivedNotifications.forEachIndexed { index, notification ->
notification.params.progressToken shouldBe ProgressToken("upload-123")
notification.params.progress shouldBe index * 10.0
notification.params.total shouldBe 100.0
}
}

val listToolsResult = client.listTools()

listToolsResult.tools shouldContain Tool(
Expand Down
Loading