Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions migrations/1766519554009_principal-tx-counts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/** @param { import("node-pg-migrate").MigrationBuilder } pgm */
exports.up = pgm => {
pgm.createTable('principal_tx_counts', {
principal: {
type: 'text',
notNull: true,
primaryKey: true,
},
count: {
type: 'integer',
notNull: true,
default: 0,
},
});
pgm.sql(`
INSERT INTO principal_tx_counts (principal, count)
(SELECT principal, COUNT(*) AS count FROM principal_txs GROUP BY principal)
`);
};

exports.down = pgm => {
pgm.dropTable('principal_tx_counts');
};
4 changes: 3 additions & 1 deletion src/datastore/pg-store-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,9 @@ export class PgStoreV2 extends BasePgStoreModule {
p.nft_burn_event_count AS nft_burn,
p.stx_sent,
p.stx_received,
COUNT(*) OVER()::int AS count
(
SELECT COALESCE(count, 0) FROM principal_tx_counts WHERE principal = ${args.address}
) AS count
FROM principal_txs AS p
INNER JOIN txs AS t USING (tx_id, index_block_hash, microblock_hash)
WHERE p.principal = ${args.address}
Expand Down
310 changes: 31 additions & 279 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1525,8 +1525,20 @@ export class PgWriteStore extends PgStore {
}
for (const batch of batchIterate(values, INSERT_BATCH_SIZE)) {
await sql`
INSERT INTO principal_txs ${sql(batch)}
ON CONFLICT ON CONSTRAINT principal_txs_unique DO NOTHING
WITH inserts AS (
INSERT INTO principal_txs ${sql(batch)}
ON CONFLICT ON CONSTRAINT principal_txs_unique DO NOTHING
RETURNING principal, canonical
),
count_deltas AS (
SELECT principal, COUNT(*) AS count
FROM inserts
WHERE canonical = true
GROUP BY principal
)
INSERT INTO principal_tx_counts (principal, count)
(SELECT principal, count FROM count_deltas)
ON CONFLICT (principal) DO UPDATE SET count = principal_tx_counts.count + EXCLUDED.count
`;
}
}
Expand Down Expand Up @@ -3377,10 +3389,23 @@ export class PgWriteStore extends PgStore {
}
if (txResult.count) {
await sql`
UPDATE principal_txs
SET canonical = ${canonical}
WHERE tx_id IN ${sql(txs.map(t => t.txId))}
AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical}
WITH updates AS (
UPDATE principal_txs
SET canonical = ${canonical}
WHERE tx_id IN ${sql(txs.map(t => t.txId))}
AND index_block_hash = ${indexBlockHash}
AND canonical != ${canonical}
RETURNING principal
),
count_deltas AS (
SELECT principal, COUNT(*) AS count
FROM updates
GROUP BY principal
)
UPDATE principal_tx_counts AS pc
SET count = ${canonical ? sql`pc.count + cd.count` : sql`pc.count - cd.count`}
FROM count_deltas AS cd
WHERE pc.principal = cd.principal
`;
}
});
Expand Down Expand Up @@ -3873,279 +3898,6 @@ export class PgWriteStore extends PgStore {
return updatedEntities;
}

/**
* batch operations (mainly for event-replay)
*/

async insertBlockBatch(sql: PgSqlClient, blocks: DbBlock[]) {
const values: BlockInsertValues[] = blocks.map(block => ({
block_hash: block.block_hash,
block_time: block.block_time,
index_block_hash: block.index_block_hash,
parent_index_block_hash: block.parent_index_block_hash,
parent_block_hash: block.parent_block_hash,
parent_microblock_hash: block.parent_microblock_hash,
parent_microblock_sequence: block.parent_microblock_sequence,
block_height: block.block_height,
burn_block_time: block.burn_block_time,
burn_block_hash: block.burn_block_hash,
burn_block_height: block.burn_block_height,
miner_txid: block.miner_txid,
canonical: block.canonical,
execution_cost_read_count: block.execution_cost_read_count,
execution_cost_read_length: block.execution_cost_read_length,
execution_cost_runtime: block.execution_cost_runtime,
execution_cost_write_count: block.execution_cost_write_count,
execution_cost_write_length: block.execution_cost_write_length,
tx_total_size: block.tx_total_size,
tx_count: block.tx_count,
signer_bitvec: block.signer_bitvec,
signer_signatures: block.signer_signatures,
tenure_height: block.tenure_height,
}));
await sql`
INSERT INTO blocks ${sql(values)}
`;
}

async insertMicroblock(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
const values: MicroblockInsertValues[] = microblocks.map(mb => ({
canonical: mb.canonical,
microblock_canonical: mb.microblock_canonical,
microblock_hash: mb.microblock_hash,
microblock_sequence: mb.microblock_sequence,
microblock_parent_hash: mb.microblock_parent_hash,
parent_index_block_hash: mb.parent_index_block_hash,
block_height: mb.block_height,
parent_block_height: mb.parent_block_height,
parent_block_hash: mb.parent_block_hash,
index_block_hash: mb.index_block_hash,
block_hash: mb.block_hash,
parent_burn_block_height: mb.parent_burn_block_height,
parent_burn_block_hash: mb.parent_burn_block_hash,
parent_burn_block_time: mb.parent_burn_block_time,
}));
const mbResult = await sql`
INSERT INTO microblocks ${sql(values)}
`;
if (mbResult.count !== microblocks.length) {
throw new Error(
`Unexpected row count after inserting microblocks: ${mbResult.count} vs ${values.length}`
);
}
}

// alias to insertMicroblock
async insertMicroblockBatch(sql: PgSqlClient, microblocks: DbMicroblock[]): Promise<void> {
return this.insertMicroblock(sql, microblocks);
}

async insertTxBatch(sql: PgSqlClient, txs: DbTx[]): Promise<void> {
const values: TxInsertValues[] = txs.map(tx => ({
tx_id: tx.tx_id,
raw_tx: tx.raw_result,
tx_index: tx.tx_index,
index_block_hash: tx.index_block_hash,
parent_index_block_hash: tx.parent_index_block_hash,
block_hash: tx.block_hash,
parent_block_hash: tx.parent_block_hash,
block_height: tx.block_height,
block_time: tx.block_time ?? 0,
burn_block_height: tx.burn_block_height,
burn_block_time: tx.burn_block_time,
parent_burn_block_time: tx.parent_burn_block_time,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
canonical: tx.canonical,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
microblock_canonical: tx.microblock_canonical,
microblock_sequence: tx.microblock_sequence,
microblock_hash: tx.microblock_hash,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
coinbase_vrf_proof: tx.coinbase_vrf_proof ?? null,
tenure_change_tenure_consensus_hash: tx.tenure_change_tenure_consensus_hash ?? null,
tenure_change_prev_tenure_consensus_hash: tx.tenure_change_prev_tenure_consensus_hash ?? null,
tenure_change_burn_view_consensus_hash: tx.tenure_change_burn_view_consensus_hash ?? null,
tenure_change_previous_tenure_end: tx.tenure_change_previous_tenure_end ?? null,
tenure_change_previous_tenure_blocks: tx.tenure_change_previous_tenure_blocks ?? null,
tenure_change_cause: tx.tenure_change_cause ?? null,
tenure_change_pubkey_hash: tx.tenure_change_pubkey_hash ?? null,
raw_result: tx.raw_result,
event_count: tx.event_count,
execution_cost_read_count: tx.execution_cost_read_count,
execution_cost_read_length: tx.execution_cost_read_length,
execution_cost_runtime: tx.execution_cost_runtime,
execution_cost_write_count: tx.execution_cost_write_count,
execution_cost_write_length: tx.execution_cost_write_length,
vm_error: tx.vm_error ?? null,
}));
await sql`INSERT INTO txs ${sql(values)}`;
}

async insertPrincipalTxsBatch(sql: PgSqlClient, values: PrincipalTxsInsertValues[]) {
await sql`
INSERT INTO principal_txs ${sql(values)}
`;
}

async insertContractEventBatch(sql: PgSqlClient, values: SmartContractEventInsertValues[]) {
await sql`
INSERT INTO contract_logs ${sql(values)}
`;
}

async insertFtEventBatch(sql: PgSqlClient, values: FtEventInsertValues[]) {
await sql`
INSERT INTO ft_events ${sql(values)}
`;
}

async insertNftEventBatch(sql: PgSqlClient, values: NftEventInsertValues[]) {
await sql`INSERT INTO nft_events ${sql(values)}`;
}

async insertNameBatch(sql: PgSqlClient, values: BnsNameInsertValues[]) {
await sql`
INSERT INTO names ${sql(values)}
`;
}

async insertNamespace(
sql: PgSqlClient,
blockData: {
index_block_hash: string;
parent_index_block_hash: string;
microblock_hash: string;
microblock_sequence: number;
microblock_canonical: boolean;
},
bnsNamespace: DbBnsNamespace
) {
const values: BnsNamespaceInsertValues = {
namespace_id: bnsNamespace.namespace_id,
launched_at: bnsNamespace.launched_at ?? null,
address: bnsNamespace.address,
reveal_block: bnsNamespace.reveal_block,
ready_block: bnsNamespace.ready_block,
buckets: bnsNamespace.buckets,
base: bnsNamespace.base.toString(),
coeff: bnsNamespace.coeff.toString(),
nonalpha_discount: bnsNamespace.nonalpha_discount.toString(),
no_vowel_discount: bnsNamespace.no_vowel_discount.toString(),
lifetime: bnsNamespace.lifetime,
status: bnsNamespace.status ?? null,
tx_index: bnsNamespace.tx_index,
tx_id: bnsNamespace.tx_id,
canonical: bnsNamespace.canonical,
index_block_hash: blockData.index_block_hash,
parent_index_block_hash: blockData.parent_index_block_hash,
microblock_hash: blockData.microblock_hash,
microblock_sequence: blockData.microblock_sequence,
microblock_canonical: blockData.microblock_canonical,
};
await sql`
INSERT INTO namespaces ${sql(values)}
`;
}

async insertZonefileBatch(sql: PgSqlClient, values: BnsZonefileInsertValues[]) {
await sql`
INSERT INTO zonefiles ${sql(values)}
`;
}

async insertRawEventRequestBatch(
sql: PgSqlClient,
events: RawEventRequestInsertValues[]
): Promise<void> {
await sql`
INSERT INTO event_observer_requests ${this.sql(events)}
`;
}

/**
* (event-replay) Enable or disable indexes for DB tables.
*/
async toggleAllTableIndexes(sql: PgSqlClient, state: IndexesState): Promise<void> {
const enable: boolean = Boolean(state);
const dbName = sql.options.database;
const tableSchema = sql.options.connection.search_path ?? 'public';
const tablesQuery = await sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);

// Exclude subdomains table since its constraints
// are need to handle the ingestion of attachments_new events.
const filtered = tables.filter(item => item !== 'subdomains');

const result = await sql`
UPDATE pg_index
SET ${sql({ indisready: enable, indisvalid: enable })}
WHERE indrelid = ANY (
SELECT oid FROM pg_class
WHERE relname IN ${sql(filtered)}
AND relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname = ${tableSchema}
)
)
`;
if (result.count === 0) {
throw new Error(`No updates made while toggling table indexes`);
}
}

/**
* (event-replay) Reindex all DB tables.
*/
async reindexAllTables(sql: PgSqlClient): Promise<void> {
const dbName = sql.options.database;
const tableSchema = sql.options.connection.search_path ?? 'public';
const tablesQuery = await sql<{ tablename: string }[]>`
SELECT tablename FROM pg_catalog.pg_tables
WHERE tablename != ${MIGRATIONS_TABLE}
AND schemaname = ${tableSchema}`;
if (tablesQuery.length === 0) {
const errorMsg = `No tables found in database '${dbName}', schema '${tableSchema}'`;
console.error(errorMsg);
throw new Error(errorMsg);
}
const tables: string[] = tablesQuery.map((r: { tablename: string }) => r.tablename);

for (const table of tables) {
const result = await sql`REINDEX TABLE ${sql(table)}`;
if (result.count === 0) {
throw new Error(`No updates made while toggling table indexes`);
}
}
}

async getLastIngestedSnpRedisMsgId(): Promise<string> {
const [{ last_redis_msg_id }] = await this.sql<
{ last_redis_msg_id: string }[]
Expand Down