Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export async function processProposal(
const buyingLoserSide = books[outcome.loser].bids.find((b) => b.price > thresholds.bids);

const fromBlock = Math.max(proposalGapStartBlock, currentBlock - lookbackBlocks);
const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock);
const fills = await getOrderFilledEvents(params, market.clobTokenIds, fromBlock, logger);

const soldWinner = fills[outcome.winner].filter((f) => f.type === "sell" && f.price < thresholds.asks);
const boughtLoser = fills[outcome.loser].filter((f) => f.type === "buy" && f.price > thresholds.bids);
Expand Down Expand Up @@ -324,5 +324,8 @@ export async function monitorTransactionsProposedOrderBook(
})
);

console.log("All proposals have been checked!");
Logger.debug({
at: "PolymarketMonitor",
message: `All ${allProposals.length} proposals have been checked!`,
});
}
275 changes: 274 additions & 1 deletion packages/monitor-v2/src/monitor-polymarket/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ export interface MonitoringParams {
fillEventsProposalGapSeconds: number;
httpClient: ReturnType<typeof createHttpClient>;
orderBookBatchSize: number;
orderBookSubgraphEndpoint: string;
ooV2Addresses: string[];
ooV1Addresses: string[];
aiConfig?: AIConfig;
subgraphSyncTolerance: number;
}
interface PolymarketMarketGraphql {
question: string;
Expand Down Expand Up @@ -374,6 +376,38 @@ export const getPolymarketMarketInformation = async (
});
};

interface OrderFilledEventSubgraph {
id: string;
transactionHash: string;
makerAssetId: string;
takerAssetId: string;
maker: string;
taker: string;
makerAmountFilled: string;
takerAmountFilled: string;
fee: string;
timestamp: string;
orderHash: string;
}

interface SubgraphOrderFilledResponse {
data?: {
orderFilledEvents: OrderFilledEventSubgraph[];
};
errors?: { message: string }[];
}

interface SubgraphMetaResponse {
data?: {
_meta: {
block: {
number: number;
};
};
};
errors?: { message: string }[];
}

const getTradeInfoFromOrderFilledEvent = async (
provider: Provider,
event: any
Expand All @@ -392,7 +426,186 @@ const getTradeInfoFromOrderFilledEvent = async (
};
};

export const getOrderFilledEvents = async (
const getTradeInfoFromSubgraphEvent = (event: OrderFilledEventSubgraph): PolymarketTradeInformation => {
const isBuy = event.makerAssetId === "0";
const makerAmountFilled = BigNumber.from(event.makerAmountFilled);
const takerAmountFilled = BigNumber.from(event.takerAmountFilled);

const numerator = (isBuy ? makerAmountFilled : takerAmountFilled).mul(1000);
const denominator = isBuy ? takerAmountFilled : makerAmountFilled;
const price = numerator.div(denominator).toNumber() / 1000;

return {
price,
type: isBuy ? "buy" : "sell",
timestamp: parseInt(event.timestamp),
// Convert to decimal value with 2 decimals
amount: (isBuy ? takerAmountFilled : makerAmountFilled).div(10_000).toNumber() / 100,
};
};

const querySubgraphOrderFilledEvents = async (
httpClient: AxiosInstance,
subgraphEndpoint: string,
whereField: "takerAssetId" | "makerAssetId",
assetId: string,
pageSize = 1000,
startTimestamp?: number
): Promise<OrderFilledEventSubgraph[]> => {
const allEvents: OrderFilledEventSubgraph[] = [];
let skip = 0;
let hasMore = true;

while (hasMore) {
// Build where clause with optional timestamp filter
const whereClause = startTimestamp
? `{timestamp_gt: ${startTimestamp}, ${whereField}: "${assetId}"}`
: `{${whereField}: "${assetId}"}`;

const query = `
{
orderFilledEvents(
where: ${whereClause},
first: ${pageSize},
skip: ${skip},
orderBy: timestamp,
orderDirection: asc
) {
id
transactionHash
makerAssetId
takerAssetId
maker
taker
makerAmountFilled
takerAmountFilled
fee
timestamp
orderHash
}
}
`;

const response = await httpClient.post<SubgraphOrderFilledResponse>(subgraphEndpoint, { query });

if (response.data.errors?.length) {
throw new Error(response.data.errors.map((e) => e.message).join("; "));
}

if (!response.data.data?.orderFilledEvents) {
throw new Error("Invalid response from subgraph");
}

const events = response.data.data.orderFilledEvents;
allEvents.push(...events);

// If we got fewer events than pageSize, we've reached the end
hasMore = events.length === pageSize;
skip += pageSize;
}

return allEvents;
};

const getOrderFilledEventsFromSubgraph = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startTimestamp?: number
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
// Query 4 combinations: takerAssetId for both tokens, makerAssetId for both tokens
const queries = [
{ whereField: "takerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 },
{ whereField: "takerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 },
{ whereField: "makerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 },
{ whereField: "makerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 },
];

// Execute all queries in parallel
const queryResults = await Promise.all(
queries.map((q) =>
querySubgraphOrderFilledEvents(
params.httpClient,
params.orderBookSubgraphEndpoint,
q.whereField,
q.assetId,
1000,
startTimestamp
)
)
);

// Group events by token index, deduplicating per token (same event can appear for both tokens)
const tokenOneEventIds = new Set<string>();
const tokenTwoEventIds = new Set<string>();
const tokenOneEvents: PolymarketTradeInformation[] = [];
const tokenTwoEvents: PolymarketTradeInformation[] = [];

// Process takerAssetId queries (index 0 and 1)
queryResults[0].forEach((event) => {
if (!tokenOneEventIds.has(event.id)) {
tokenOneEventIds.add(event.id);
tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});
queryResults[1].forEach((event) => {
if (!tokenTwoEventIds.has(event.id)) {
tokenTwoEventIds.add(event.id);
tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});

// Process makerAssetId queries (index 2 and 3)
queryResults[2].forEach((event) => {
if (!tokenOneEventIds.has(event.id)) {
tokenOneEventIds.add(event.id);
tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});
queryResults[3].forEach((event) => {
if (!tokenTwoEventIds.has(event.id)) {
tokenTwoEventIds.add(event.id);
tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});

// Sort by timestamp
const sortByTimestamp = (events: PolymarketTradeInformation[]): PolymarketTradeInformation[] => {
return events.sort((a, b) => a.timestamp - b.timestamp);
};

return [sortByTimestamp(tokenOneEvents), sortByTimestamp(tokenTwoEvents)];
};

const checkSubgraphSyncStatus = async (httpClient: AxiosInstance, subgraphEndpoint: string): Promise<number | null> => {
const query = `
{
_meta {
block {
number
}
}
}
`;

try {
const response = await httpClient.post<SubgraphMetaResponse>(subgraphEndpoint, { query });

if (response.data.errors?.length) {
throw new Error(response.data.errors.map((e) => e.message).join("; "));
}

if (!response.data.data?._meta?.block?.number) {
throw new Error("Invalid response from subgraph meta query");
}

return response.data.data._meta.block.number;
} catch (error) {
// Return null if we can't check sync status, caller should handle gracefully
return null;
}
};

const getOrderFilledEventsSlow = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number
Expand Down Expand Up @@ -439,6 +652,58 @@ export const getOrderFilledEvents = async (
return [outcomeTokenOne, outcomeTokenTwo];
};

export const getOrderFilledEvents = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number,
logger: typeof Logger
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
try {
// Check subgraph sync status first
const subgraphBlockNumber = await checkSubgraphSyncStatus(params.httpClient, params.orderBookSubgraphEndpoint);

if (subgraphBlockNumber !== null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when subgraphBlockNumber === null I don't think we should even try getOrderFilledEventsFromSubgraph below and instead fallback to getOrderFilledEventsSlow as subgraphSyncTolerance cannot be evaluated in this case.

// Get current block from provider
const currentBlockNumber = await params.provider.getBlockNumber();
const blockDifference = currentBlockNumber - subgraphBlockNumber;

// If subgraph is behind by more than tolerance, use slow method
if (blockDifference > params.subgraphSyncTolerance) {
logger.debug({
at: "getOrderFilledEvents",
message: `Falling back to slow method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`,
});
return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also might want to debug log the fallback

}

// Get the block timestamp from startBlockNumber
const startBlock = await params.provider.getBlock(startBlockNumber);
const startTimestamp = startBlock.timestamp;

// Try the fast subgraph version
logger.debug({
at: "getOrderFilledEvents",
message: `Using fast subgraph method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`,
});
return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp);
} else {
// If subgraphBlockNumber is null, we cannot evaluate sync tolerance, so fallback to slow method
logger.debug({
at: "getOrderFilledEvents",
message: "Falling back to slow method: subgraph block number is null, cannot evaluate sync tolerance",
});
return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber);
}
} catch (error) {
// Fallback to the slow version if subgraph fails
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we debug log this for better observability

logger.debug({
at: "getOrderFilledEvents",
message: `Falling back to slow method due to error: ${error instanceof Error ? error.message : String(error)}`,
});
return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber);
}
};

export const calculatePolymarketQuestionID = (ancillaryData: string): string => {
return ethers.utils.keccak256(ancillaryData);
};
Expand Down Expand Up @@ -907,6 +1172,12 @@ export const initMonitoringParams = async (

const orderBookBatchSize = env.ORDER_BOOK_BATCH_SIZE ? Number(env.ORDER_BOOK_BATCH_SIZE) : 499;

const orderBookSubgraphEndpoint =
env.ORDER_BOOK_SUBGRAPH_ENDPOINT ||
"https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 1;

// Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs.
const httpClient = createHttpClient({
axios: { timeout: httpTimeout },
Expand Down Expand Up @@ -948,9 +1219,11 @@ export const initMonitoringParams = async (
fillEventsProposalGapSeconds,
httpClient,
orderBookBatchSize,
orderBookSubgraphEndpoint,
ooV2Addresses,
ooV1Addresses,
aiConfig,
subgraphSyncTolerance,
};
};

Expand Down
3 changes: 2 additions & 1 deletion packages/monitor-v2/test/PolymarketMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,8 @@ describe("PolymarketNotifier", function () {

sandbox.stub(ethers, "Contract").returns({ filters: { OrderFilled: () => ({ topics: [] }) } } as any);

await commonModule.getOrderFilledEvents(params, ["0xdeadbeef", "0xfeedface"], fromBlockParam);
const logger = createNewLogger([new SpyTransport({}, { spy: sinon.spy() })]);
await commonModule.getOrderFilledEvents(params, ["0xdeadbeef", "0xfeedface"], fromBlockParam, logger);

assert.equal(
capturedFromBlock,
Expand Down