Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- snapshot_cache: rename `body` -> `body_gzip` and add a `content_encoding`
-- column so the encoding contract is explicit at the schema layer and a
-- future migration can add e.g. "zstd" without another rename.
--
-- Both operations are PostgreSQL metadata-only:
-- - RENAME COLUMN is instant (no table rewrite).
-- - ADD COLUMN with a constant DEFAULT in PG 11+ is also instant.
-- Safe under concurrent reads/writes; no maintenance window required.

-- AlterTable
ALTER TABLE "snapshot_cache" RENAME COLUMN "body" TO "body_gzip";
ALTER TABLE "snapshot_cache" ADD COLUMN "content_encoding" TEXT NOT NULL DEFAULT 'gzip';
156 changes: 80 additions & 76 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,35 @@ model CrowdfundingCampaign {
}

model Drep {
drepId String @id @map("drep_id")
userId String? @unique @map("user_id")
name String?
paymentAddr String? @map("payment_addr")
iconUrl String? @map("icon_url")
doNotList Boolean? @map("do_not_list")
votingPower BigInt @default(0) @map("voting_power")
delegatorCount Int? @map("delegator_count")
drepId String @id @map("drep_id")
userId String? @unique @map("user_id")
name String?
paymentAddr String? @map("payment_addr")
iconUrl String? @map("icon_url")
doNotList Boolean? @map("do_not_list")
votingPower BigInt @default(0) @map("voting_power")
delegatorCount Int? @map("delegator_count")
// Denormalised — populated by drep-denorm.service after each epoch sync.
// Read by /dreps LIST + /snapshot/dreps to avoid expensive groupBys at request time.
firstSeenEpoch Int? @map("first_seen_epoch")
proposalParticipationPercent Float? @map("proposal_participation_percent")
registered Boolean? @map("registered")
active Boolean? @map("active")
expiresEpoch Int? @map("expires_epoch")
metaUrl String? @map("meta_url")
metaHash String? @map("meta_hash")
firstSeenEpoch Int? @map("first_seen_epoch")
proposalParticipationPercent Float? @map("proposal_participation_percent")
registered Boolean? @map("registered")
active Boolean? @map("active")
expiresEpoch Int? @map("expires_epoch")
metaUrl String? @map("meta_url")
metaHash String? @map("meta_hash")
// CIP-119 metadata fields (from /drep_updates meta_json.body)
bio String? @map("bio")
motivations String? @map("motivations")
objectives String? @map("objectives")
qualifications String? @map("qualifications")
references String? @map("references") // JSON string of references array
createdAt DateTime? @default(now()) @map("created_at")
updatedAt DateTime? @map("updated_at")
user User? @relation(fields: [userId], references: [id])
onchainVotes OnchainVote[]
stakeDelegationStates StakeDelegationState[]
epochSnapshots DrepEpochSnapshot[]
bio String? @map("bio")
motivations String? @map("motivations")
objectives String? @map("objectives")
qualifications String? @map("qualifications")
references String? @map("references") // JSON string of references array
createdAt DateTime? @default(now()) @map("created_at")
updatedAt DateTime? @map("updated_at")
user User? @relation(fields: [userId], references: [id])
onchainVotes OnchainVote[]
stakeDelegationStates StakeDelegationState[]
epochSnapshots DrepEpochSnapshot[]

@@map("drep")
}
Expand Down Expand Up @@ -150,18 +150,22 @@ model StakeDelegationChange {
}

// Pre-rendered snapshot bodies for /snapshot endpoints.
// Each row holds one composed JSON payload (gzipped) keyed by cacheKey.
// Each row holds one composed JSON payload (encoded per `contentEncoding`,
// currently always "gzip") keyed by cacheKey.
// Final chunks are immutable — written once when the chunk's last epoch finalises.
// Manifest + dreps + current chunk are rewritten by snapshotBuilder.rebuildAfterEpoch.
model SnapshotCache {
cacheKey String @id @map("cache_key")
body Bytes
generatedAt DateTime @map("generated_at")
schemaVersion String @map("schema_version")
isFinal Boolean @default(false) @map("is_final")
byteSize Int @map("byte_size")
etag String
lastAccessedAt DateTime @updatedAt @map("last_accessed_at")
cacheKey String @id @map("cache_key")
bodyGzip Bytes @map("body_gzip")
/// IANA Content-Encoding token. Currently "gzip" for every row; the column
/// exists so a future migration can add e.g. "zstd" without another rename.
contentEncoding String @default("gzip") @map("content_encoding")
generatedAt DateTime @map("generated_at")
schemaVersion String @map("schema_version")
isFinal Boolean @default(false) @map("is_final")
byteSize Int @map("byte_size")
etag String
lastAccessedAt DateTime @updatedAt @map("last_accessed_at")

@@index([lastAccessedAt])
@@map("snapshot_cache")
Expand Down Expand Up @@ -227,15 +231,15 @@ model StakeDelegationStaging {
// Tracks per-epoch completion state for the governance analytics sync job.
// This lets the job run frequently but only do heavy work once per epoch.
model EpochAnalyticsSync {
epoch Int @id @map("epoch_no")
drepsSyncedAt DateTime? @map("dreps_synced_at")
drepInfoSyncedAt DateTime? @map("drep_info_synced_at")
drepSnapshotSyncedAt DateTime? @map("drep_snapshot_synced_at")
totalsSyncedAt DateTime? @map("totals_synced_at")
drepLifecycleSyncedAt DateTime? @map("drep_lifecycle_synced_at")
poolGroupsSyncedAt DateTime? @map("pool_groups_synced_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
epoch Int @id @map("epoch_no")
drepsSyncedAt DateTime? @map("dreps_synced_at")
drepInfoSyncedAt DateTime? @map("drep_info_synced_at")
drepSnapshotSyncedAt DateTime? @map("drep_snapshot_synced_at")
totalsSyncedAt DateTime? @map("totals_synced_at")
drepLifecycleSyncedAt DateTime? @map("drep_lifecycle_synced_at")
poolGroupsSyncedAt DateTime? @map("pool_groups_synced_at")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")

@@map("epoch_analytics_sync")
}
Expand Down Expand Up @@ -348,25 +352,25 @@ model PoolGroup {
}

model OnchainVote {
id String @id
txHash String @map("tx_hash")
proposalId String @map("proposal_id")
vote VoteType?
voterType VoterType @map("voter_type")
votingPower BigInt? @map("voting_power")
responseEpoch Int? @map("response_epoch")
anchorUrl String? @map("anchor_url")
anchorHash String? @map("anchor_hash")
rationale String?
id String @id
txHash String @map("tx_hash")
proposalId String @map("proposal_id")
vote VoteType?
voterType VoterType @map("voter_type")
votingPower BigInt? @map("voting_power")
responseEpoch Int? @map("response_epoch")
anchorUrl String? @map("anchor_url")
anchorHash String? @map("anchor_hash")
rationale String?
surveyResponse String? @map("survey_response")
surveyResponseSurveyTxId String? @map("survey_response_survey_tx_id")
surveyResponseResponderRole String? @map("survey_response_responder_role")
votedAt DateTime? @default(now()) @map("voted_at")
createdAt DateTime? @default(now()) @map("created_at")
updatedAt DateTime? @map("updated_at")
drepId String? @map("drep_id")
spoId String? @map("spo_id")
ccId String? @map("cc_id")
votedAt DateTime? @default(now()) @map("voted_at")
createdAt DateTime? @default(now()) @map("created_at")
updatedAt DateTime? @map("updated_at")
drepId String? @map("drep_id")
spoId String? @map("spo_id")
ccId String? @map("cc_id")

cc CC? @relation(fields: [ccId], references: [ccId])
drep Drep? @relation(fields: [drepId], references: [drepId])
Expand Down Expand Up @@ -524,22 +528,22 @@ model ActivityRecent {
}

model ActivityHistorical {
id String @id @default(cuid())
repoId String @map("repo_id")
date DateTime @db.Date
commitCount Int @default(0) @map("commit_count")
prOpened Int @default(0) @map("pr_opened")
prMerged Int @default(0) @map("pr_merged")
prClosed Int @default(0) @map("pr_closed")
issuesOpened Int @default(0) @map("issues_opened")
issuesClosed Int @default(0) @map("issues_closed")
additions Int @default(0)
deletions Int @default(0)
uniqueContributors Int @default(0) @map("unique_contributors")
avgPrMergeHours Float? @map("avg_pr_merge_hours")
avgIssueResolutionHours Float? @map("avg_issue_resolution_hours")
releasesPublished Int @default(0) @map("releases_published")
createdAt DateTime @default(now()) @map("created_at")
id String @id @default(cuid())
repoId String @map("repo_id")
date DateTime @db.Date
commitCount Int @default(0) @map("commit_count")
prOpened Int @default(0) @map("pr_opened")
prMerged Int @default(0) @map("pr_merged")
prClosed Int @default(0) @map("pr_closed")
issuesOpened Int @default(0) @map("issues_opened")
issuesClosed Int @default(0) @map("issues_closed")
additions Int @default(0)
deletions Int @default(0)
uniqueContributors Int @default(0) @map("unique_contributors")
avgPrMergeHours Float? @map("avg_pr_merge_hours")
avgIssueResolutionHours Float? @map("avg_issue_resolution_hours")
releasesPublished Int @default(0) @map("releases_published")
createdAt DateTime @default(now()) @map("created_at")

repository GithubRepository @relation(fields: [repoId], references: [id])

Expand Down
28 changes: 24 additions & 4 deletions src/controllers/data/triggerEpochTotalsSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,34 @@ export const postTriggerEpochTotalsSync = async (
try {
const result = await syncEpochTotalsStep(prisma);

// Mirror the cron path: a downstream best-effort hook failure (denorm
// refresh, snapshot rebuild) should propagate to SyncStatus.lastResult
// as "partial" so monitoring can distinguish a green run from one that
// missed an invalidation.
const partialFailure =
Boolean(result.denormRefreshError)
|| Boolean(result.snapshotRebuildError);
const partialError = [
result.denormRefreshError && `denorm: ${result.denormRefreshError}`,
result.snapshotRebuildError
&& `snapshot-rebuild: ${result.snapshotRebuildError}`,
]
.filter(Boolean)
.join("; ");

await releaseJobLock(
JOB_NAME,
"success",
result.skippedPrevious ? 1 : 2
partialFailure ? "partial" : "success",
result.skippedPrevious ? 1 : 2,
partialFailure ? partialError : null
);

console.log("[Epoch Totals Sync] Completed successfully:", {
currentEpoch: result.currentEpoch, epochToSync: result.epochToSync, skippedPrevious: result.skippedPrevious,
console.log("[Epoch Totals Sync] Completed:", {
currentEpoch: result.currentEpoch,
epochToSync: result.epochToSync,
skippedPrevious: result.skippedPrevious,
denormRefreshError: result.denormRefreshError ?? null,
snapshotRebuildError: result.snapshotRebuildError ?? null,
});
} catch (error) {
console.error("[Epoch Totals Sync] Async processing error:", formatAxiosLikeError(error));
Expand Down
36 changes: 24 additions & 12 deletions src/controllers/drep/getDReps.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import { Request, Response } from "express";
import { VoterType } from "@prisma/client";
import { prisma } from "../../services";
import { GetDRepsResponse, DRepSummary } from "../../responses";
import {
DRepSummary,
GetDRepsResponse,
toAdaString,
toLovelaceString,
} from "../../responses";
import { formatAxiosLikeError } from "../../utils/format-http-client-error";
import { parseIntegerQuery } from "../../utils/query-params";

/**
* Converts lovelace (BigInt) to ADA string with 6 decimal places
*/
function lovelaceToAda(lovelace: bigint): string {
const ada = Number(lovelace) / 1_000_000;
return ada.toFixed(6);
}

/**
* GET /dreps
Expand All @@ -25,8 +24,21 @@ function lovelaceToAda(lovelace: bigint): string {
*/
export const getDReps = async (req: Request, res: Response) => {
try {
const page = Math.max(1, parseInt(req.query.page as string) || 1);
const pageSize = Math.min(1000, Math.max(1, parseInt(req.query.pageSize as string) || 20));
const pageR = parseIntegerQuery(req.query.page, "page", {
min: 1,
default: 1,
});
if (!pageR.ok) return res.status(pageR.status).json(pageR);
const page = pageR.value;

const pageSizeR = parseIntegerQuery(req.query.pageSize, "pageSize", {
min: 1,
max: 1000,
default: 20,
});
if (!pageSizeR.ok) return res.status(pageSizeR.status).json(pageSizeR);
const pageSize = pageSizeR.value;

const sortBy = (req.query.sortBy as string) || "votingPower";
const sortOrder = (req.query.sortOrder as string) === "asc" ? "asc" : "desc";
const search = (req.query.search as string) || "";
Expand Down Expand Up @@ -150,8 +162,8 @@ export const getDReps = async (req: Request, res: Response) => {
drepId: drep.drepId,
name: drep.name,
iconUrl: drep.iconUrl,
votingPower: drep.votingPower.toString(),
votingPowerAda: lovelaceToAda(drep.votingPower),
votingPower: toLovelaceString(drep.votingPower),
votingPowerAda: toAdaString(drep.votingPower),
totalVotesCast: voteCountMap.get(drep.drepId) || 0,
delegatorCount: drep.delegatorCount,
firstSeenEpoch:
Expand Down
52 changes: 52 additions & 0 deletions src/controllers/healthz/getHealthz.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Request, Response } from "express";
import { getSnapshotBootRecoveryStatus } from "../../services/ingestion/snapshot-builder.service";

/**
* GET /healthz — liveness + snapshot boot readiness signal.
*
* Public, unauthenticated. Designed for cloud-deploy readiness probes.
*
* Status field:
* - "ok" — process is up, snapshot boot recovery completed (or was
* a no-op because L2 was already fresh).
* - "starting" — boot recover is still running or hasn't started yet.
* - "skipped" — boot recover skipped (no epoch data, or another replica
* is rebuilding). The API will still serve requests but
* the snapshot may be stale until a successful sync.
* - "degraded" — boot recover failed. The API is up but `/snapshot/*` may
* be empty or stale. Probes should mark NOT READY so the
* load balancer drains traffic.
*
* The HTTP status mirrors the readiness signal: 200 for ok/starting/skipped,
* 503 for degraded. Liveness check (process responsive) always returns 200
* when this controller runs at all.
*/
export const getHealthz = (_req: Request, res: Response) => {
const boot = getSnapshotBootRecoveryStatus();

let status: "ok" | "starting" | "skipped" | "degraded";
switch (boot.state) {
case "ok":
case "fresh":
status = "ok";
break;
case "running":
case "not-started":
status = "starting";
break;
case "skipped":
status = "skipped";
break;
case "failed":
status = "degraded";
break;
}

const httpStatus = status === "degraded" ? 503 : 200;

res.status(httpStatus).json({
status,
snapshotBootRecovery: boot,
serverTime: new Date().toISOString(),
});
};
1 change: 1 addition & 0 deletions src/controllers/healthz/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./getHealthz";
Loading