Skip to content
8 changes: 5 additions & 3 deletions lib/db/sqlite/firo_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:sqlite3/sqlite3.dart';
import 'package:uuid/uuid.dart';

import '../../electrumx_rpc/electrumx_client.dart';
import '../../models/electrumx_response/spark_models.dart';
import '../../utilities/extensions/extensions.dart';
import '../../utilities/logger.dart';
import '../../utilities/stack_file_system.dart';
Expand All @@ -30,7 +31,7 @@ void _debugLog(Object? object) {
}

abstract class _FiroCache {
static const int _setCacheVersion = 1;
static const int _setCacheVersion = 2;
static const int _tagsCacheVersion = 2;

static final networks = [
Expand Down Expand Up @@ -134,7 +135,7 @@ abstract class _FiroCache {
blockHash TEXT NOT NULL,
setHash TEXT NOT NULL,
groupId INTEGER NOT NULL,
timestampUTC INTEGER NOT NULL,
size INTEGER NOT NULL,
UNIQUE (blockHash, setHash, groupId)
);

Expand All @@ -143,7 +144,8 @@ abstract class _FiroCache {
serialized TEXT NOT NULL,
txHash TEXT NOT NULL,
context TEXT NOT NULL,
UNIQUE(serialized, txHash, context)
groupId INTEGER NOT NULL,
UNIQUE(serialized, txHash, context, groupId)
);

CREATE TABLE SparkSetCoins (
Expand Down
174 changes: 120 additions & 54 deletions lib/db/sqlite/firo_cache_coordinator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ typedef LTagPair = ({String tag, String txid});
/// background isolate and [FiroCacheCoordinator] should manage that isolate
abstract class FiroCacheCoordinator {
static final Map<CryptoCurrencyNetwork, _FiroCacheWorker> _workers = {};
static final Map<CryptoCurrencyNetwork, Mutex> _tagLocks = {};
static final Map<CryptoCurrencyNetwork, Mutex> _setLocks = {};

static bool _init = false;
static Future<void> init() async {
Expand All @@ -15,6 +17,8 @@ abstract class FiroCacheCoordinator {
_init = true;
await _FiroCache.init();
for (final network in _FiroCache.networks) {
_tagLocks[network] = Mutex();
_setLocks[network] = Mutex();
_workers[network] = await _FiroCacheWorker.spawn(network);
}
}
Expand All @@ -31,11 +35,23 @@ abstract class FiroCacheCoordinator {
final usedTagsCacheFile = File(
"${dir.path}/${_FiroCache.sparkUsedTagsCacheFileName(network)}",
);
final int bytes =
((await setCacheFile.exists()) ? await setCacheFile.length() : 0) +
((await usedTagsCacheFile.exists())
? await usedTagsCacheFile.length()
: 0);

final setSize =
(await setCacheFile.exists()) ? await setCacheFile.length() : 0;
final tagsSize = (await usedTagsCacheFile.exists())
? await usedTagsCacheFile.length()
: 0;

Logging.instance.log(
"Spark cache used tags size: $tagsSize",
level: LogLevel.Debug,
);
Logging.instance.log(
"Spark cache anon set size: $setSize",
level: LogLevel.Debug,
);

final int bytes = tagsSize + setSize;

if (bytes < 1024) {
return '$bytes B';
Expand All @@ -55,43 +71,96 @@ abstract class FiroCacheCoordinator {
ElectrumXClient client,
CryptoCurrencyNetwork network,
) async {
final count = await FiroCacheCoordinator.getUsedCoinTagsCount(network);
final unhashedTags = await client.getSparkUnhashedUsedCoinsTagsWithTxHashes(
startNumber: count,
);
if (unhashedTags.isNotEmpty) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkUsedTagsWith,
data: unhashedTags,
),
await _tagLocks[network]!.protect(() async {
final count = await FiroCacheCoordinator.getUsedCoinTagsCount(network);
final unhashedTags =
await client.getSparkUnhashedUsedCoinsTagsWithTxHashes(
startNumber: count,
);
}
if (unhashedTags.isNotEmpty) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkUsedTagsWith,
data: unhashedTags,
),
);
}
});
}

static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
int groupId,
ElectrumXClient client,
CryptoCurrencyNetwork network,
void Function(int countFetched, int totalCount)? progressUpdated,
) async {
final blockhashResult =
await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
network,
);
final blockHash = blockhashResult?.blockHash ?? "";
await _setLocks[network]!.protect(() async {
const sectorSize =
1500; // chosen as a somewhat decent value. Could be changed in the future if wanted/needed
final prevMeta = await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
network,
);

final json = await client.getSparkAnonymitySet(
coinGroupId: groupId.toString(),
startBlockHash: blockHash.toHexReversedFromBase64,
);
final prevSize = prevMeta?.size ?? 0;

await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (groupId, json),
),
);
final meta = await client.getSparkAnonymitySetMeta(
coinGroupId: groupId,
);

progressUpdated?.call(prevSize, meta.size);

if (prevMeta?.blockHash == meta.blockHash) {
Logging.instance.log(
"prevMeta?.blockHash == meta.blockHash",
level: LogLevel.Debug,
);
return;
}

final numberOfCoinsToFetch = meta.size - prevSize;

final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
final remainder = numberOfCoinsToFetch % sectorSize;

final List<dynamic> coins = [];

for (int i = 0; i < fullSectorCount; i++) {
final start = (i * sectorSize);
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: start,
endIndex: start + sectorSize,
);
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);

coins.addAll(data);
}

if (remainder > 0) {
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: numberOfCoinsToFetch - remainder,
endIndex: numberOfCoinsToFetch,
);
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);

coins.addAll(data);
}

final result = coins
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();

await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (meta, result),
),
);
});
}

// ===========================================================================
Expand Down Expand Up @@ -165,41 +234,37 @@ abstract class FiroCacheCoordinator {
);
}

static Future<
List<
({
String serialized,
String txHash,
String context,
})>> getSetCoinsForGroupId(
static Future<List<RawSparkCoin>> getSetCoinsForGroupId(
int groupId, {
int? newerThanTimeStamp,
String? afterBlockHash,
required CryptoCurrencyNetwork network,
}) async {
final resultSet = await _Reader._getSetCoinsForGroupId(
groupId,
db: _FiroCache.setCacheDB(network),
newerThanTimeStamp: newerThanTimeStamp,
);
final resultSet = afterBlockHash == null
? await _Reader._getSetCoinsForGroupId(
groupId,
db: _FiroCache.setCacheDB(network),
)
: await _Reader._getSetCoinsForGroupIdAndBlockHash(
groupId,
afterBlockHash,
db: _FiroCache.setCacheDB(network),
);

return resultSet
.map(
(row) => (
(row) => RawSparkCoin(
serialized: row["serialized"] as String,
txHash: row["txHash"] as String,
context: row["context"] as String,
groupId: groupId,
),
)
.toList()
.reversed
.toList();
}

static Future<
({
String blockHash,
String setHash,
int timestampUTC,
})?> getLatestSetInfoForGroupId(
static Future<SparkAnonymitySetMeta?> getLatestSetInfoForGroupId(
int groupId,
CryptoCurrencyNetwork network,
) async {
Expand All @@ -212,10 +277,11 @@ abstract class FiroCacheCoordinator {
return null;
}

return (
return SparkAnonymitySetMeta(
coinGroupId: groupId,
blockHash: result.first["blockHash"] as String,
setHash: result.first["setHash"] as String,
timestampUTC: result.first["timestampUTC"] as int,
size: result.first["size"] as int,
);
}

Expand Down
45 changes: 34 additions & 11 deletions lib/db/sqlite/firo_cache_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@ abstract class _Reader {
static Future<ResultSet> _getSetCoinsForGroupId(
int groupId, {
required Database db,
int? newerThanTimeStamp,
}) async {
String query = """
SELECT sc.serialized, sc.txHash, sc.context
final query = """
SELECT sc.serialized, sc.txHash, sc.context, sc.groupId
FROM SparkSet AS ss
JOIN SparkSetCoins AS ssc ON ss.id = ssc.setId
JOIN SparkCoin AS sc ON ssc.coinId = sc.id
WHERE ss.groupId = $groupId
WHERE ss.groupId = $groupId;
""";

if (newerThanTimeStamp != null) {
query += " AND ss.timestampUTC"
" > $newerThanTimeStamp";
}

return db.select("$query;");
}

Expand All @@ -31,16 +25,45 @@ abstract class _Reader {
required Database db,
}) async {
final query = """
SELECT ss.blockHash, ss.setHash, ss.timestampUTC
SELECT ss.blockHash, ss.setHash, ss.size
FROM SparkSet ss
WHERE ss.groupId = $groupId
ORDER BY ss.timestampUTC DESC
ORDER BY ss.size DESC
LIMIT 1;
""";

return db.select("$query;");
}

static Future<ResultSet> _getSetCoinsForGroupIdAndBlockHash(
int groupId,
String blockHash, {
required Database db,
}) async {
const query = """
WITH TargetBlock AS (
SELECT id
FROM SparkSet
WHERE blockHash = ?
),
TargetSets AS (
SELECT id AS setId
FROM SparkSet
WHERE groupId = ? AND id > (SELECT id FROM TargetBlock)
)
SELECT
SparkCoin.serialized,
SparkCoin.txHash,
SparkCoin.context,
SparkCoin.groupId
FROM SparkSetCoins
JOIN SparkCoin ON SparkSetCoins.coinId = SparkCoin.id
WHERE SparkSetCoins.setId IN (SELECT setId FROM TargetSets);
""";

return db.select("$query;", [blockHash, groupId]);
}

static Future<bool> _checkSetInfoForGroupIdExists(
int groupId, {
required Database db,
Expand Down
9 changes: 7 additions & 2 deletions lib/db/sqlite/firo_cache_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ class _FiroCacheWorker {
try {
await Isolate.spawn(
_startWorkerIsolate,
(initPort.sendPort, setCacheFilePath, usedTagsCacheFilePath),
(
initPort.sendPort,
setCacheFilePath,
usedTagsCacheFilePath,
),
);
} catch (_) {
initPort.close();
Expand Down Expand Up @@ -90,7 +94,8 @@ class _FiroCacheWorker {
final FCResult result;
switch (task.func) {
case FCFuncName._updateSparkAnonSetCoinsWith:
final data = task.data as (int, Map<String, dynamic>);
final data =
task.data as (SparkAnonymitySetMeta, List<RawSparkCoin>);
result = _updateSparkAnonSetCoinsWith(
setCacheDb,
data.$2,
Expand Down
Loading
Loading