Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ However, when using zero-conf, this event may be emitted before the `channel-con

See #3237 for more details.

### Major changes to the AuditDb

We make a collection of backwards-incompatible changes to all tables of the `audit` database.
The main change is that it is way more relevant to track statistics for peer nodes instead of individual channels, so we want to track the `node_id` associated with each event.
We also track more data about transactions we make and relayed payments, to more easily score peers based on the fees we're earning vs the fees we're paying (for on-chain transactions or for liquidity purchases).

Note that we cannot migrate existing data (since it is lacking information that we now need), so we simply rename older tables with a `_before_v14` suffix and create new ones.
Past data will thus not be accessible through the APIs, but can be queried directly using SQL if necessary.
It should be acceptable, since liquidity decisions should be taken based on relatively recent data (a few weeks) in order to be economically relevant (nodes that generated fees months ago but aren't generating any new fees since then are probably not good peers).

We expose a now `relaystats` API that ranks peers based on the routing fees they're generating.
See #3245 for more details.

### Plugin validation of interactive transactions

We add a new `ValidateInteractiveTxPlugin` trait that can be extended by plugins that want to perform custom validation of remote inputs and outputs added to interactive transactions.
Expand Down Expand Up @@ -86,6 +99,7 @@ eclair.relay.reserved-for-accountable = 0.0

- `findroute`, `findroutetonode` and `findroutebetweennodes` now include a `maxCltvExpiryDelta` parameter (#3234)
- `channel-opened` was removed from the websocket in favor of `channel-funding-created`, `channel-confirmed` and `channel-ready` (#3237 and #3256)
- `networkfees` and `channelstats` are removed in favor in `relaystats` (#3245)

### Miscellaneous improvements and bug fixes

Expand Down
14 changes: 7 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient.{AddressType, D
import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerByte, FeeratePerKw}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.AuditDb.RelayStats
import fr.acinq.eclair.db.{IncomingPayment, OfferData, OutgoingPayment, OutgoingPaymentStatus}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, OpenChannelResponse, PeerInfo}
import fr.acinq.eclair.io._
Expand Down Expand Up @@ -160,9 +160,9 @@ trait Eclair {

def audit(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[AuditResponse]

def networkFees(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[NetworkFee]]
def relayStats(remoteNodeId: PublicKey, from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[RelayStats]

def channelStats(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Seq[Stats]]
def relayStats(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Seq[RelayStats]]

def getInvoice(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[Invoice]]

Expand Down Expand Up @@ -600,12 +600,12 @@ class EclairImpl(val appKit: Kit) extends Eclair with Logging with SpendFromChan
))
}

override def networkFees(from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[Seq[NetworkFee]] = {
Future(appKit.nodeParams.db.audit.listNetworkFees(from.toTimestampMilli, to.toTimestampMilli))
override def relayStats(remoteNodeId: PublicKey, from: TimestampSecond, to: TimestampSecond)(implicit timeout: Timeout): Future[RelayStats] = {
Future(appKit.nodeParams.db.audit.relayStats(remoteNodeId, from.toTimestampMilli, to.toTimestampMilli))
}

override def channelStats(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Seq[Stats]] = {
Future(appKit.nodeParams.db.audit.stats(from.toTimestampMilli, to.toTimestampMilli, paginated_opt))
override def relayStats(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Seq[RelayStats]] = {
Future(appKit.nodeParams.db.audit.relayStats(from.toTimestampMilli, to.toTimestampMilli, paginated_opt))
}

override def allInvoices(from: TimestampSecond, to: TimestampSecond, paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Seq[Invoice]] = Future {
Expand Down
7 changes: 7 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/Paginated.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ case class Paginated(count: Int, skip: Int) {
require(count >= 0, "count must be a positive number")
require(skip >= 0, "skip must be a positive number")
}

object Paginated {
def paginate[T](results: Seq[T], paginated_opt: Option[Paginated]): Seq[T] = paginated_opt match {
case Some(paginated) => results.slice(paginated.skip, paginated.skip + paginated.count)
case None => results
}
}
139 changes: 127 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, TxId}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, TxId}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, PublishedTransaction, Stats}
import fr.acinq.eclair.db.AuditDb.{ConfirmedTransaction, PublishedTransaction, RelayStats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.{PathFindingExperimentMetrics, PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.{Paginated, TimestampMilli}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.wire.protocol.LiquidityAds
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, Paginated, TimestampMilli}

trait AuditDb {

Expand All @@ -44,24 +46,137 @@ trait AuditDb {

def listPublished(channelId: ByteVector32): Seq[PublishedTransaction]

def listPublished(remoteNodeId: PublicKey, from: TimestampMilli, to: TimestampMilli): Seq[PublishedTransaction]
Comment thread
pm47 marked this conversation as resolved.

def listConfirmed(channelId: ByteVector32): Seq[ConfirmedTransaction]

def listConfirmed(remoteNodeId: PublicKey, from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated]): Seq[ConfirmedTransaction]

def listConfirmed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[ConfirmedTransaction]

def listChannelEvents(channelId: ByteVector32, from: TimestampMilli, to: TimestampMilli): Seq[ChannelEvent]

def listChannelEvents(remoteNodeId: PublicKey, from: TimestampMilli, to: TimestampMilli): Seq[ChannelEvent]

def listSent(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentSent]

def listReceived(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentReceived]

def listRelayed(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[PaymentRelayed]

def listNetworkFees(from: TimestampMilli, to: TimestampMilli): Seq[NetworkFee]

def stats(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[Stats]
def relayStats(remoteNodeId: PublicKey, from: TimestampMilli, to: TimestampMilli): RelayStats = {
val relayed = listRelayed(from, to).filter(e => e.incoming.exists(_.remoteNodeId == remoteNodeId) || e.outgoing.exists(_.remoteNodeId == remoteNodeId))
val relayFeeEarned = relayed.map(e => {
// When using MPP and trampoline, payments can be relayed through multiple nodes at once.
// We split the fee according to the proportional amount relayed through the requested node.
e.relayFee * (e.outgoing.filter(_.remoteNodeId == remoteNodeId).map(_.amount).sum.toLong.toDouble / e.amountOut.toLong)
}).sum
val incomingPayments = relayed.flatMap(_.incoming).filter(_.remoteNodeId == remoteNodeId)
val outgoingPayments = relayed.flatMap(_.outgoing).filter(_.remoteNodeId == remoteNodeId)
val confirmedTransactions = listConfirmed(remoteNodeId, from, to, None)
val onChainFeePaid = confirmedTransactions.map(_.onChainFeePaid).sum
val liquidityFeeEarned = confirmedTransactions.flatMap(_.liquidityPurchase_opt).filter(_.isSeller).map(_.fees.total).sum
val liquidityFeePaid = confirmedTransactions.flatMap(_.liquidityPurchase_opt).filter(_.isBuyer).map(_.fees.total).sum
RelayStats(remoteNodeId, incomingPayments.size, incomingPayments.map(_.amount).sum, outgoingPayments.size, outgoingPayments.map(_.amount).sum, relayFeeEarned, confirmedTransactions.size, onChainFeePaid, liquidityFeeEarned, liquidityFeePaid, from, to)
}

def relayStats(from: TimestampMilli, to: TimestampMilli, paginated_opt: Option[Paginated] = None): Seq[RelayStats] = {
// We fill payment data from all relayed payments.
val perNodeStats = listRelayed(from, to).foldLeft(Map.empty[PublicKey, RelayStats]) {
case (perNodeStats, e) =>
val withIncoming = e.incoming.foldLeft(perNodeStats) {
case (perNodeStats, i) =>
val current = perNodeStats.getOrElse(i.remoteNodeId, RelayStats(i.remoteNodeId, from, to))
val updated = current.copy(incomingPaymentCount = current.incomingPaymentCount + 1, totalAmountIn = current.totalAmountIn + i.amount)
perNodeStats + (i.remoteNodeId -> updated)
}
val withOutgoing = e.outgoing.foldLeft(withIncoming) {
case (perNodeStats, o) =>
val current = perNodeStats.getOrElse(o.remoteNodeId, RelayStats(o.remoteNodeId, from, to))
val updated = current.copy(outgoingPaymentCount = current.outgoingPaymentCount + 1, totalAmountOut = current.totalAmountOut + o.amount)
perNodeStats + (o.remoteNodeId -> updated)
}
val withRelayFee = e.outgoing.map(_.remoteNodeId).toSet.foldLeft(withOutgoing) {
case (perNodeStats, remoteNodeId) =>
val current = perNodeStats.getOrElse(remoteNodeId, RelayStats(remoteNodeId, from, to))
val updated = current.copy(relayFeeEarned = current.relayFeeEarned + e.relayFee * (e.outgoing.filter(_.remoteNodeId == remoteNodeId).map(_.amount).sum.toLong.toDouble / e.amountOut.toLong))
perNodeStats + (remoteNodeId -> updated)
}
withRelayFee
}.values.toSeq.sortBy(_.relayFeeEarned)(Ordering[MilliSatoshi].reverse)
// We add on-chain fees paid for each node.
val confirmedTransactions = listConfirmed(from, to)
Paginated.paginate(perNodeStats.map(stats => {
val transactionsWithPeer = confirmedTransactions.filter(_.remoteNodeId == stats.remoteNodeId)
val onChainFeePaid = transactionsWithPeer.map(_.onChainFeePaid).sum
val liquidityFeeEarned = transactionsWithPeer.flatMap(_.liquidityPurchase_opt).filter(_.isSeller).map(_.fees.total).sum
val liquidityFeePaid = transactionsWithPeer.flatMap(_.liquidityPurchase_opt).filter(_.isBuyer).map(_.fees.total).sum
stats.copy(onChainTransactionsCount = transactionsWithPeer.size, onChainFeePaid = onChainFeePaid, liquidityFeeEarned = liquidityFeeEarned, liquidityFeePaid = liquidityFeePaid)
}), paginated_opt)
}

}

object AuditDb {

case class PublishedTransaction(txId: TxId, desc: String, miningFee: Satoshi)

case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: TimestampMilli)

case class Stats(channelId: ByteVector32, direction: String, avgPaymentAmount: Satoshi, paymentCount: Int, relayFee: Satoshi, networkFee: Satoshi)
case class PublishedTransaction(txId: TxId, desc: String, localMiningFee: Satoshi, remoteMiningFee: Satoshi, feerate: FeeratePerKw, liquidityPurchase_opt: Option[LiquidityAds.PurchaseBasicInfo], timestamp: TimestampMilli)

object PublishedTransaction {
def apply(tx: TransactionPublished): PublishedTransaction = PublishedTransaction(tx.tx.txid, tx.desc, tx.localMiningFee, tx.remoteMiningFee, tx.feerate, tx.liquidityPurchase_opt, tx.timestamp)
}

case class ConfirmedTransaction(remoteNodeId: PublicKey, channelId: ByteVector32, txId: TxId, onChainFeePaid: Satoshi, txType: String, liquidityPurchase_opt: Option[LiquidityAds.PurchaseBasicInfo], timestamp: TimestampMilli)

case class RelayStats(remoteNodeId: PublicKey, incomingPaymentCount: Int, totalAmountIn: MilliSatoshi, outgoingPaymentCount: Int, totalAmountOut: MilliSatoshi, relayFeeEarned: MilliSatoshi, onChainTransactionsCount: Int, onChainFeePaid: Satoshi, liquidityFeeEarned: Satoshi, liquidityFeePaid: Satoshi, from: TimestampMilli, to: TimestampMilli)

object RelayStats {
def apply(remoteNodeId: PublicKey, from: TimestampMilli, to: TimestampMilli): RelayStats = RelayStats(remoteNodeId, 0, 0 msat, 0, 0 msat, 0 msat, 0, 0 sat, 0 sat, 0 sat, from, to)
}

case class RelayedPart(channelId: ByteVector32, remoteNodeId: PublicKey, amount: MilliSatoshi, direction: String, relayType: String, timestamp: TimestampMilli)

def relayType(e: PaymentRelayed): String = e match {
case _: ChannelPaymentRelayed => "channel"
case _: TrampolinePaymentRelayed => "trampoline"
case _: OnTheFlyFundingPaymentRelayed => "on-the-fly-funding"
}

private def incomingParts(parts: Seq[RelayedPart]): Seq[PaymentEvent.IncomingPayment] = {
parts.filter(_.direction == "IN").map(p => PaymentEvent.IncomingPayment(p.channelId, p.remoteNodeId, p.amount, p.timestamp)).sortBy(_.receivedAt)
}

private def outgoingParts(parts: Seq[RelayedPart]): Seq[PaymentEvent.OutgoingPayment] = {
parts.filter(_.direction == "OUT").map(p => PaymentEvent.OutgoingPayment(p.channelId, p.remoteNodeId, p.amount, p.timestamp)).sortBy(_.settledAt)
}

private def verifyInAndOut(parts: Seq[RelayedPart]): Boolean = {
parts.exists(_.direction == "IN") && parts.exists(_.direction == "OUT")
}

def listRelayedInternal(relayedByHash: Map[ByteVector32, Seq[RelayedPart]], trampolineDetails: Map[ByteVector32, (PublicKey, MilliSatoshi)], paginated_opt: Option[Paginated]): Seq[PaymentRelayed] = {
Paginated.paginate(relayedByHash.flatMap {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash with different relay types.
// That's fine, we simply separate each part into the correct event.
val channelParts = parts.filter(_.relayType == "channel")
val trampolineParts = parts.filter(_.relayType == "trampoline")
val onTheFlyParts = parts.filter(_.relayType == "on-the-fly-funding")
val channelRelayed_opt = if (verifyInAndOut(channelParts)) {
Some(ChannelPaymentRelayed(paymentHash, incomingParts(channelParts), outgoingParts(channelParts)))
} else {
None
}
val trampolineRelayed_opt = trampolineDetails.get(paymentHash) match {
case Some((nextTrampolineNode, nextTrampolineAmount)) if verifyInAndOut(trampolineParts) => Some(TrampolinePaymentRelayed(paymentHash, incomingParts(trampolineParts), outgoingParts(trampolineParts), nextTrampolineNode, nextTrampolineAmount))
case _ => None
}
val onTheFlyRelayed_opt = if (verifyInAndOut(onTheFlyParts)) {
Some(OnTheFlyFundingPaymentRelayed(paymentHash, incomingParts(onTheFlyParts), outgoingParts(onTheFlyParts)))
} else {
None
}
channelRelayed_opt.toSeq ++ trampolineRelayed_opt.toSeq ++ onTheFlyRelayed_opt.toSeq
}.toSeq.sortBy(_.settledAt), paginated_opt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ object Databases extends Logging {

if (urlFile.exists()) {
val oldUrl = readString(urlFile.toPath)
if (oldUrl != url)
if (url != null && oldUrl != null && oldUrl != url) {
Comment thread
pm47 marked this conversation as resolved.
throw JdbcUrlChanged(oldUrl, url)
}
} else {
writeString(urlFile.toPath, url)
}
Expand Down
Loading
Loading