Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Blockcore.P2P.Protocol.Payloads
/// <summary>
/// A getdata message for an asked hash is not found by the remote peer.
/// </summary>
[Payload("notfound")]
public class NotFoundPayload : Payload, IEnumerable<InventoryVector>
{
private List<InventoryVector> inventory = new List<InventoryVector>();
Expand Down
84 changes: 84 additions & 0 deletions src/Blockcore/Utilities/LruHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Collections.Generic;

namespace Blockcore.Utilities
{
public class LruHashSet<T>
{
private readonly LinkedList<T> lru;

private HashSet<T> items;

private long maxSize;

private long itemCount;

private readonly object lockObject = new object();

public LruHashSet(long maxSize = long.MaxValue)
{
this.lru = new LinkedList<T>();
this.items = new HashSet<T>();

this.maxSize = maxSize;
this.itemCount = 0;
}

public void AddOrUpdate(T item)
{
lock (this.lockObject)
{
// First check if we are performing the 'Update' case. No change to item count.
if (this.items.Contains(item))
{
this.lru.Remove(item);
this.lru.AddLast(item);

return;
}

// Otherwise it's 'Add'.
// First perform the size test.
if ((this.itemCount + 1) > this.maxSize)
{
LinkedListNode<T> tempItem = this.lru.First;
this.lru.RemoveFirst();
this.items.Remove(tempItem.Value);
this.itemCount--;
}

this.lru.AddLast(item);
this.items.Add(item);
this.itemCount++;
}
}

public void Clear()
{
lock (this.lockObject)
{
this.lru.Clear();
this.items.Clear();
this.itemCount = 0;
}
}

public bool Contains(T item)
{
lock (this.lockObject)
{
// Fastest to check the hashmap.
return this.items.Contains(item);
}
}

public void Remove(T item)
{
lock (this.lockObject)
{
this.lru.Remove(item);
this.items.Remove(item);
this.itemCount--;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,24 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData
// TODO: bring logic from core
foreach (InventoryVector item in getDataPayload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_BLOCK)))
{
if (!peer.IsConnected)
continue;

ChainedHeaderBlock chainedHeaderBlock = this.consensusManager.GetBlockData(item.Hash);

if (chainedHeaderBlock?.Block != null)
{
this.logger.LogDebug("Sending block '{0}' to peer '{1}'.", chainedHeaderBlock.ChainedHeader, peer.RemoteSocketEndpoint);

//TODO strip block of witness if node does not support
await peer.SendMessageAsync(new BlockPayload(chainedHeaderBlock.Block.WithOptions(this.ChainIndexer.Network.Consensus.ConsensusFactory, peer.SupportedTransactionOptions))).ConfigureAwait(false);
}
else
{
this.logger.LogDebug("Block with hash '{0}' requested from peer '{1}' was not found in store.", item.Hash, peer.RemoteSocketEndpoint);

// https://btcinformation.org/en/developer-reference#notfound
// https://github.com/bitcoin/bitcoin/pull/2192
await peer.SendMessageAsync(new NotFoundPayload(InventoryType.MSG_BLOCK, item.Hash)).ConfigureAwait(false);
}

// If the peer is syncing using "getblocks" message we are supposed to send
Expand Down
57 changes: 39 additions & 18 deletions src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public class MempoolBehavior : NetworkPeerBehavior
/// </summary>
private const int InventoryBroadcastMax = 7 * InventoryBroadcastInterval;

/// <summary>
/// Prevents a single peer from sending us huge quantities of fake `inv` messages with nonexistent hashes.
/// <remarks>This value is somewhat arbitrarily chosen, it is far greater than the maximum number of transactions that can be stored in any single block.
/// The number of unmined transactions in the mempool should ideally hover around the number in a single block or below, otherwise the mempool will eventually
/// fill completely. However, on some networks it is not uncommon for there to be surges in transaction volume, so we leave some leeway.</remarks>
/// </summary>
private const int MaximumInventoryToTrackCount = 50000;

/// <summary>Memory pool validator for validating transactions.</summary>
private readonly IMempoolValidator validator;

Expand Down Expand Up @@ -75,7 +83,7 @@ public class MempoolBehavior : NetworkPeerBehavior
/// Filter for inventory known.
/// State that is local to the behavior.
/// </summary>
private readonly HashSet<uint256> filterInventoryKnown;
private readonly LruHashSet<uint256> filterInventoryKnown;

/// <summary>
/// Locking object for memory pool behaviour.
Expand Down Expand Up @@ -120,7 +128,7 @@ public MempoolBehavior(

this.lockObject = new object();
this.inventoryTxToSend = new HashSet<uint256>();
this.filterInventoryKnown = new HashSet<uint256>();
this.filterInventoryKnown = new LruHashSet<uint256>(MaximumInventoryToTrackCount);
this.isPeerWhitelistedForRelay = false;
this.isBlocksOnlyMode = false;

Expand Down Expand Up @@ -267,14 +275,14 @@ private async Task SendMempoolPayloadAsync(INetworkPeer peer, MempoolPayload mes
}
}

this.filterInventoryKnown.Add(hash);
this.filterInventoryKnown.AddOrUpdate(hash);
transactionsToSend.Add(hash);
this.logger.LogDebug("Added transaction ID '{0}' to inventory list.", hash);
}
}

this.logger.LogDebug("Sending transaction inventory to peer '{0}'.", peer.RemoteSocketEndpoint);
await this.SendAsTxInventoryAsync(peer, transactionsToSend);
await this.SendAsTxInventoryAsync(peer, transactionsToSend).ConfigureAwait(false);
this.LastMempoolReq = this.mempoolManager.DateTimeProvider.GetTime();
}

Expand Down Expand Up @@ -303,7 +311,13 @@ private async Task ProcessInvAsync(INetworkPeer peer, InvPayload invPayload)
{
foreach (var inv in inventoryTxs)
{
this.filterInventoryKnown.Add(inv.Hash);
// It is unlikely that the transaction will be in the to-send hashmap, but there is no harm attempting to remove it anyway.
this.inventoryTxToSend.Remove(inv.Hash);

// At this point we have no idea whether the proffered hashes exist or are spurious. So we have to rely on the bounded LRU hashmap implementation
// to control the maximum number of extant hashes that we track for this peer. It isn't really worth trying to evict mined transactions from this
// hashmap.
this.filterInventoryKnown.AddOrUpdate(inv.Hash);
}
}

Expand Down Expand Up @@ -344,18 +358,22 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData

foreach (InventoryVector item in getDataPayload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX)))
{
// TODO: check if we need to add support for "not found"
if (!peer.IsConnected)
continue;

TxMempoolInfo trxInfo = await this.mempoolManager.InfoAsync(item.Hash).ConfigureAwait(false);

if (trxInfo != null)
if (trxInfo == null)
{
if (peer.IsConnected)
{
this.logger.LogDebug("Sending transaction '{0}' to peer '{1}'.", item.Hash, peer.RemoteSocketEndpoint);
await peer.SendMessageAsync(new TxPayload(trxInfo.Trx.WithOptions(peer.SupportedTransactionOptions, this.network.Consensus.ConsensusFactory))).ConfigureAwait(false);
}
// https://btcinformation.org/en/developer-reference#notfound
// https://github.com/bitcoin/bitcoin/pull/2192
await peer.SendMessageAsync(new NotFoundPayload(InventoryType.MSG_TX, item.Hash)).ConfigureAwait(false);

continue;
}

this.logger.LogDebug("Sending transaction '{0}' to peer '{1}'.", item.Hash, peer.RemoteSocketEndpoint);
await peer.SendMessageAsync(new TxPayload(trxInfo.Trx.WithOptions(peer.SupportedTransactionOptions, this.network.Consensus.ConsensusFactory))).ConfigureAwait(false);
}
}

Expand All @@ -381,14 +399,14 @@ private async Task ProcessTxPayloadAsync(INetworkPeer peer, TxPayload transactio
// add to local filter
lock (this.lockObject)
{
this.filterInventoryKnown.Add(trxHash);
this.filterInventoryKnown.AddOrUpdate(trxHash);
}
this.logger.LogDebug("Added transaction ID '{0}' to known inventory filter.", trxHash);

var state = new MempoolValidationState(true);
if (!await this.orphans.AlreadyHaveAsync(trxHash) && await this.validator.AcceptToMemoryPool(state, trx))
if (!await this.orphans.AlreadyHaveAsync(trxHash).ConfigureAwait(false) && await this.validator.AcceptToMemoryPool(state, trx).ConfigureAwait(false))
{
await this.validator.SanityCheck();
await this.validator.SanityCheck().ConfigureAwait(false);
this.RelayTransaction(trxHash);

this.signals.Publish(new TransactionReceived(trx));
Expand All @@ -398,7 +416,7 @@ private async Task ProcessTxPayloadAsync(INetworkPeer peer, TxPayload transactio

this.logger.LogDebug("Transaction ID '{0}' accepted to memory pool from peer '{1}' (poolsz {2} txn, {3} kb).", trxHash, peer.RemoteSocketEndpoint, mmsize, memdyn / 1000);

await this.orphans.ProcessesOrphansAsync(this, trx);
await this.orphans.ProcessesOrphansAsync(this, trx).ConfigureAwait(false);
}
else if (state.MissingInputs)
{
Expand Down Expand Up @@ -470,7 +488,10 @@ private void AddTransactionToSend(uint256 hash)
lock (this.lockObject)
{
if (!this.filterInventoryKnown.Contains(hash))
{
{
// We do not need to bound this in the same way as the 'known' map,
// because transactions are only sent/relayed when they are considered valid,
// plus this map is constantly shrinking as txes are sent.
this.inventoryTxToSend.Add(hash);
}
}
Expand Down Expand Up @@ -569,7 +590,7 @@ public async Task SendTrickleAsync()
foreach (uint256 hash in findInMempool)
{
// Not in the mempool anymore? don't bother sending it.
TxMempoolInfo txInfo = await this.mempoolManager.InfoAsync(hash);
TxMempoolInfo txInfo = await this.mempoolManager.InfoAsync(hash).ConfigureAwait(false);
if (txInfo == null)
{
this.logger.LogDebug("Transaction ID '{0}' not added to inventory list, no longer in mempool.", hash);
Expand Down
110 changes: 110 additions & 0 deletions src/Tests/Blockcore.Tests/Utilities/LruHashSetTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using Blockcore.Utilities;
using Xunit;

namespace Blockcore.Tests.Utilities
{
public class LruHashSetTests
{
[Fact]
public void AddOrUpdate_AddsItemToSet()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);

// Act
set.AddOrUpdate(1);

// Assert
Assert.True(set.Contains(1));
}

[Fact]
public void AddOrUpdate_UpdatesExistingItem()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);

// Act
set.AddOrUpdate(1);

// Assert
Assert.True(set.Contains(1));
}

[Fact]
public void AddOrUpdate_RemovesOldestItemWhenMaxSizeExceeded()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);
set.AddOrUpdate(2);

// Act
set.AddOrUpdate(3);

// Assert
Assert.False(set.Contains(1));
Assert.True(set.Contains(2));
Assert.True(set.Contains(3));
}

[Fact]
public void Clear_RemovesAllItemsFromSet()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);
set.AddOrUpdate(2);

// Act
set.Clear();

// Assert
Assert.False(set.Contains(1));
Assert.False(set.Contains(2));
}

[Fact]
public void Contains_ReturnsTrueIfItemInSet()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);

// Act
var contains = set.Contains(1);

// Assert
Assert.True(contains);
}

[Fact]
public void Contains_ReturnsFalseIfItemNotInSet()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);

// Act
var contains = set.Contains(2);

// Assert
Assert.False(contains);
}

[Fact]
public void Remove_RemovesItemFromSet()
{
// Arrange
var set = new LruHashSet<int>(maxSize: 2);
set.AddOrUpdate(1);

// Act
set.Remove(1);

// Assert
Assert.False(set.Contains(1));
}
}
}