Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,30 @@ import scalus.cardano.ledger.{CardanoInfo, ProtocolParams, SlotNo, Transaction,
import scalus.cardano.node.{BlockchainProvider, NodeSubmitError, SubmitError, TransactionStatus, UtxoQuery, UtxoQueryError}
import scalus.cardano.node.stream.{BackupDiagnostics, BackupDiagnosticsSnapshot}
import scalus.cardano.network.NetworkMagic
import scalus.cardano.network.chainsync.Point
import scalus.cardano.network.infra.MiniProtocolId
import scalus.cardano.network.n2c.localstatequery.{LocalStateQueryDriver, LocalStateQueryMessage, LsqQuery}
import scalus.cardano.network.n2c.localtxsubmission.{LocalTxSubmissionDriver, LocalTxSubmissionRejection}
import scalus.uplc.builtin.{ByteString, Data}
import scalus.utils.Hex.toHex

import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}

/** Submit-only `BlockchainProvider` over `LocalTxSubmission` against a local cardano-node.
/** `BlockchainProvider` against a local cardano-node over N2C.
*
* Owns its own [[NodeToClientConnection]] (one connection per provider instance) plus a
* [[LocalTxSubmissionDriver]] talking on [[MiniProtocolId.LocalTxSubmission]]. Read methods
* (`findUtxos`, `fetchLatestParams`, `currentSlot`, `getDatum`, `checkTransaction`) raise
* [[UnsupportedOperationException]] until `LocalStateQuery` lands in M12 — pair this backup with a
* `BackupSource.Blockfrost` if read coverage is needed before then.
* Owns its own [[NodeToClientConnection]] plus three mini-protocol drivers:
* - [[LocalTxSubmissionDriver]] on [[MiniProtocolId.LocalTxSubmission]] for `submit`
* - [[LocalStateQueryDriver]] on [[MiniProtocolId.LocalStateQuery]] for `currentSlot` and
* (eventually) the rest of the read surface
* - [[LocalTxSubmissionDriver]]'s connection-root for KeepAlive
Comment on lines +21 to +25
*
* Read methods backed by LSQ today: `currentSlot`. The remaining stubs (`fetchLatestParams`,
* `findUtxos`, `checkTransaction`) raise [[UnsupportedOperationException]] until their per-query
* result decoders land — `getDatum` returns `None` because LSQ has no datum-by-hash query.
*
* The N2C handshake negotiates `query = true` so the server permits LSQ queries.
*
* Connection-sharing with `ChainSyncSource.N2C` (when both point at the same socket) is a planned
* optimisation; today each component opens its own connection.
Expand All @@ -34,6 +42,7 @@ import scala.concurrent.{ExecutionContext, Future}
final class LocalNodeProvider private (
conn: NodeToClientConnection,
driver: LocalTxSubmissionDriver,
lsqDriver: LocalStateQueryDriver,
val cardanoInfo: CardanoInfo,
submitEra: Int,
connectedSinceMillis: Long
Expand Down Expand Up @@ -83,23 +92,59 @@ final class LocalNodeProvider private (

def diagnostics: BackupDiagnosticsSnapshot = diagState.get

/** Tear down the driver + connection. Idempotent. */
def close(): Future[Unit] = driver.close().flatMap(_ => conn.close())
/** Tear down all drivers + connection. Idempotent. */
def close(): Future[Unit] =
lsqDriver.close().flatMap(_ => driver.close()).flatMap(_ => conn.close())

// -------- LSQ-backed reads --------

override def currentSlot: Future[SlotNo] = withLsqSnapshot {
lsqDriver.query(LsqQuery.GetChainPoint).map {
case Point.Origin => 0L
case Point.BlockPoint(slot, _) => slot
}
}

// ---- Read methods deferred to M12 LSQ. ----
/** Async mutex around `acquire → body → release`. Multiple concurrent provider calls (e.g.
* parallel `currentSlot` invocations) would otherwise race the LSQ driver's single-in-flight
* contract. The previous gate is awaited before this op runs; the new gate is completed
* regardless of op outcome.
*/
private val lsqGate = new AtomicReference[Future[Unit]](Future.unit)

private def withLsqLock[A](op: => Future[A]): Future[A] = {
val nextGate = Promise[Unit]()
val prev = lsqGate.getAndSet(nextGate.future)
prev.transformWith(_ => op).andThen { case _ => nextGate.success(()) }
}

private def withLsqSnapshot[A](body: => Future[A]): Future[A] = withLsqLock {
lsqDriver.acquire(LocalStateQueryMessage.AcquireTarget.VolatileTip).flatMap {
case Right(()) =>
body.transformWith { result =>
lsqDriver
.release()
.recover { case _ => () }
.flatMap(_ => Future.fromTry(result))
}
case Left(failure) =>
Future.failed(new RuntimeException(s"LSQ acquire failed: $failure"))
}
}

// -------- Reads still deferred (per-query result CBOR decoders TBD) --------

private def unsupportedRead(name: String): Nothing =
throw new UnsupportedOperationException(
s"$name is not supported by BackupSource.LocalNode — pair with BackupSource.Blockfrost " +
"or wait for M12 (LocalStateQuery)"
s"$name is not yet implemented by BackupSource.LocalNode — pair with " +
"BackupSource.Blockfrost, or wait for the per-query LSQ result decoder"
)

override def fetchLatestParams: Future[ProtocolParams] = unsupportedRead("fetchLatestParams")
override def findUtxos(query: UtxoQuery): Future[Either[UtxoQueryError, Utxos]] =
unsupportedRead("findUtxos")
override def currentSlot: Future[SlotNo] = unsupportedRead("currentSlot")
override def getDatum(datumHash: scalus.cardano.ledger.DataHash): Future[Option[Data]] =
unsupportedRead("getDatum")
Future.successful(None)
override def checkTransaction(txHash: TransactionHash): Future[TransactionStatus] =
unsupportedRead("checkTransaction")
}
Expand All @@ -119,14 +164,20 @@ object LocalNodeProvider {
cardanoInfo: CardanoInfo,
submitEra: Int = 6
)(using ExecutionContext): Future[LocalNodeProvider] = {
Comment on lines 164 to 166
NodeToClientClient.connect(socketPath, networkMagic).map { conn =>
val driver = new LocalTxSubmissionDriver(
val config = ClientConfig.default.copy(query = true)
NodeToClientClient.connect(socketPath, networkMagic, config).map { conn =>
val submitDriver = new LocalTxSubmissionDriver(
conn.channel(MiniProtocolId.LocalTxSubmission),
conn.rootToken
)
val lsqDriver = new LocalStateQueryDriver(
conn.channel(MiniProtocolId.LocalStateQuery),
conn.rootToken
)
new LocalNodeProvider(
conn,
driver,
submitDriver,
lsqDriver,
cardanoInfo,
submitEra,
connectedSinceMillis = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scalus.cardano.network.infra

import io.bullet.borer.{Borer, Cbor as Cborer, Decoder, Encoder}
import scalus.cardano.infra.CancelToken
import scalus.cardano.ledger.OriginalCborByteArray
import scalus.serialization.cbor.Cbor
import scalus.uplc.builtin.ByteString

Expand Down Expand Up @@ -51,7 +52,15 @@ final class CborMessageStream[M](
protocol: MiniProtocolId,
handle: MiniProtocolBytes,
initialCapacity: Int = 512
)(using encoder: Encoder[M], decoder: Decoder[M], ec: ExecutionContext) {
)(using
encoder: Encoder[M],
// Decoder is resolved lazily per decode so message types whose decoders depend on the
// active CBOR buffer (`given OriginalCborByteArray` — see `KeepRaw[A]` and the LSQ
// `MsgResult` decoder) get the right buffer in scope. Callers with a context-free
// `Decoder[M]` are auto-lifted by the compiler.
decoderProvider: OriginalCborByteArray ?=> Decoder[M],
Comment on lines +57 to +61
ec: ExecutionContext
) {

// Accumulator for bytes that arrived but haven't yet been decoded into a message. `storage` is
// the backing array (grows geometrically on demand) and `size` is the logical length of
Expand Down Expand Up @@ -118,7 +127,12 @@ final class CborMessageStream[M](
// stop after the first message instead of failing on leftover bytes — crucial when
// multiple pipelined messages arrive in the same SDU.
val view = java.util.Arrays.copyOfRange(storage, 0, size)
Cborer.decode(view).withPrefixOnly.to[M].valueAndInputEither match {
// Make the buffer available to message decoders that want to capture raw CBOR slices
// (LSQ `MsgResult`, anything wrapped in `KeepRaw[A]`) via the scalus
// `OriginalCborByteArray` pattern. Cheap when unused.
given OriginalCborByteArray = OriginalCborByteArray(view)
val decoder: Decoder[M] = decoderProvider
Cborer.decode(view).withPrefixOnly.to[M](using decoder).valueAndInputEither match {
case Right((value, input)) =>
val cursorPos = input.cursor.toInt
val leftover = size - cursorPos
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package scalus.cardano.network.n2c.localstatequery

import scalus.cardano.infra.CancelToken
import scalus.cardano.network.infra.{CborMessageStream, MiniProtocolBytes, MiniProtocolId}
import scalus.cardano.network.n2c.localstatequery.LocalStateQueryMessage.*
import scalus.serialization.cbor.Cbor
import scalus.uplc.builtin.ByteString

import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

/** Initiator-side driver for the LocalStateQuery mini-protocol (id 7).
*
* State machine (CDDL `local-state-query.cddl`, with V2 ImmutableTip support):
*
* {{{
* Idle ──MsgAcquire(target)────→ Acquiring
* Acquiring ──MsgAcquired──→ Acquired
* Acquiring ──MsgFailure──→ Idle (snapshot rejected)
* Acquired ──MsgQuery─────────────→ Querying
* Querying ──MsgResult──→ Acquired
* Acquired ──MsgRelease───────────→ Idle
* Acquired ──MsgReAcquire(target)─→ Acquiring
* Idle ──MsgDone──────────────→ Done (best-effort on shutdown)
* }}}
*
* Today this driver implements `acquire` / `release` / `close`; `query[A]` and `reAcquire` land
* alongside the `LsqQuery` GADT in a follow-up.
Comment on lines +27 to +28
*
* Single in-flight contract: callers MUST await the previous `Future` before issuing the next
* call. Mirrors the same single-consumer contract as
* [[scalus.cardano.network.n2c.localtxsubmission.LocalTxSubmissionDriver]] /
* [[scalus.cardano.network.chainsync.ChainSyncDriver]] — multi-tenant coordination is the caller's
* job.
*
* Wire-protocol surprises (unexpected message for the current state, peer EOF mid-acquire, decode
* failure) fail the returned future with a typed cause; the driver does not itself fire the
* connection-root cancel. Ownership of fatal-fault propagation lives one level up — the provider
* decides whether a wire-decode failure should bring down the connection (it does, via the
* connection root the caller supplies as `cancelToken`).
*/
final class LocalStateQueryDriver(
handle: MiniProtocolBytes,
cancelToken: CancelToken,
logger: scribe.Logger = LocalStateQueryDriver.defaultLogger
)(using ExecutionContext) {

private val stream =
new CborMessageStream[LocalStateQueryMessage](
MiniProtocolId.LocalStateQuery,
handle
)

@volatile private var closed: Boolean = false
// Logical state — `false` = Idle (no snapshot held), `true` = Acquired. The two transient
// states (Acquiring / Querying) live inside the in-flight Future so we don't need to model
// them here; the single-consumer contract guarantees no observer sees them.
@volatile private var acquired: Boolean = false

/** Acquire a snapshot at `target`. The returned future completes once the server replies.
*
* - `Right(())` on `MsgAcquired` — driver is now in `Acquired`; queries can be issued.
* - `Left(failure)` on `MsgFailure` — driver stays in `Idle`; caller may try a different
* target (typical recovery: drop a stale `SpecificPoint` and acquire `VolatileTip`).
* - `Future.failed(...)` on wire-protocol surprises (unexpected message, peer EOF, decode
* failure, cancel).
*/
def acquire(target: AcquireTarget): Future[Either[AcquireFailure, Unit]] = {
if closed then return Future.failed(new IllegalStateException("driver closed"))
if acquired then
return Future.failed(
new IllegalStateException(
"snapshot already acquired — release() first, or use reAcquire() once it lands"
)
)
stream
.send(MsgAcquire(target), cancelToken)
.flatMap(_ => stream.receive(cancelToken))
.flatMap {
case Some(MsgAcquired) =>
acquired = true
Future.successful(Right(()))
case Some(MsgFailure(f)) =>
// server rejected — we stay in Idle
Future.successful(Left(f))
case Some(other) =>
Future.failed(unexpectedMessage("awaiting Acquired/Failure", other))
case None =>
Future.failed(
new IllegalStateException("peer closed LSQ mid-acquire; no Acquired/Failure")
)
}
}

/** Issue a typed query against the held snapshot. Caller must have completed an [[acquire]]
* (`Right(())`) before calling.
*
* The future fails on:
* - `IllegalStateException` if the driver is closed or no snapshot is held
* - `IllegalStateException` if the peer sends an unexpected message in `Querying`
* - whatever the per-query result decoder raises on malformed result bytes
* - `IllegalStateException` if the peer EOFs mid-query
*/
def query[A](q: LsqQuery[A]): Future[A] = {
if closed then return Future.failed(new IllegalStateException("driver closed"))
if !acquired then
return Future.failed(
new IllegalStateException("no snapshot acquired; call acquire() first")
)
val queryBytes = ByteString.fromArray(Cbor.encode(q: LsqQuery[?]))
stream
.send(MsgQuery(queryBytes), cancelToken)
.flatMap(_ => stream.receive(cancelToken))
.flatMap {
case Some(MsgResult(resultBytes)) =>
try Future.successful(LsqQuery.decodeResult(q, resultBytes.bytes))
catch case NonFatal(t) => Future.failed(t)
case Some(other) =>
Future.failed(unexpectedMessage("awaiting Result", other))
case None =>
Future.failed(new IllegalStateException("peer closed LSQ mid-query"))
}
}

/** Release the current snapshot, returning to `Idle`. One-way notification — the server does
* not reply. Caller may follow with another [[acquire]] or [[close]].
*
* Local `acquired` flag is reset before the send so that, on a wire failure mid-release, the
* next [[acquire]] surfaces the connection error rather than a stale "already acquired" guard.
*/
def release(): Future[Unit] = {
if closed then return Future.failed(new IllegalStateException("driver closed"))
if !acquired then return Future.failed(new IllegalStateException("no snapshot acquired"))
acquired = false
stream.send(MsgRelease, cancelToken)
}

/** Send `MsgDone` best-effort and mark the driver closed. If a snapshot is currently acquired,
* sends `MsgRelease` first to reach `Idle` (the only state from which `MsgDone` is valid per
* the protocol). Both sends are best-effort — failures are logged at debug, because the
* connection is being torn down anyway. Idempotent.
*/
def close(): Future[Unit] = {
if closed then return Future.unit
closed = true
val toIdle: Future[Unit] =
if acquired then {
acquired = false
stream
.send(MsgRelease, CancelToken.never)
.recover { case t =>
logger.debug(s"MsgRelease best-effort send failed: $t")
()
}
} else Future.unit
toIdle.flatMap { _ =>
stream.send(MsgDone, CancelToken.never).recover { case t =>
logger.debug(s"MsgDone best-effort send failed: $t")
()
}
}
}

/** True when a snapshot is held and queries can be issued. */
def isAcquired: Boolean = acquired

private def unexpectedMessage(state: String, msg: LocalStateQueryMessage): Throwable =
new IllegalStateException(s"unexpected $msg in state $state")
}

object LocalStateQueryDriver {

private val defaultLogger: scribe.Logger =
scribe.Logger("scalus.cardano.network.n2c.localstatequery")
}
Loading