-
Notifications
You must be signed in to change notification settings - Fork 197
feat(pm-notifier): read order filled events from subgraph #4900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,9 +76,11 @@ export interface MonitoringParams { | |
| fillEventsProposalGapSeconds: number; | ||
| httpClient: ReturnType<typeof createHttpClient>; | ||
| orderBookBatchSize: number; | ||
| orderBookSubgraphEndpoint: string; | ||
| ooV2Addresses: string[]; | ||
| ooV1Addresses: string[]; | ||
| aiConfig?: AIConfig; | ||
| subgraphSyncTolerance: number; | ||
| } | ||
| interface PolymarketMarketGraphql { | ||
| question: string; | ||
|
|
@@ -374,6 +376,38 @@ export const getPolymarketMarketInformation = async ( | |
| }); | ||
| }; | ||
|
|
||
| interface OrderFilledEventSubgraph { | ||
| id: string; | ||
| transactionHash: string; | ||
| makerAssetId: string; | ||
| takerAssetId: string; | ||
| maker: string; | ||
| taker: string; | ||
| makerAmountFilled: string; | ||
| takerAmountFilled: string; | ||
| fee: string; | ||
| timestamp: string; | ||
| orderHash: string; | ||
| } | ||
|
|
||
| interface SubgraphOrderFilledResponse { | ||
| data?: { | ||
| orderFilledEvents: OrderFilledEventSubgraph[]; | ||
| }; | ||
| errors?: { message: string }[]; | ||
| } | ||
|
|
||
| interface SubgraphMetaResponse { | ||
| data?: { | ||
| _meta: { | ||
| block: { | ||
| number: number; | ||
| }; | ||
| }; | ||
| }; | ||
| errors?: { message: string }[]; | ||
| } | ||
|
|
||
| const getTradeInfoFromOrderFilledEvent = async ( | ||
| provider: Provider, | ||
| event: any | ||
|
|
@@ -392,7 +426,186 @@ const getTradeInfoFromOrderFilledEvent = async ( | |
| }; | ||
| }; | ||
|
|
||
| export const getOrderFilledEvents = async ( | ||
| const getTradeInfoFromSubgraphEvent = (event: OrderFilledEventSubgraph): PolymarketTradeInformation => { | ||
| const isBuy = event.makerAssetId === "0"; | ||
| const makerAmountFilled = BigNumber.from(event.makerAmountFilled); | ||
| const takerAmountFilled = BigNumber.from(event.takerAmountFilled); | ||
|
|
||
| const numerator = (isBuy ? makerAmountFilled : takerAmountFilled).mul(1000); | ||
| const denominator = isBuy ? takerAmountFilled : makerAmountFilled; | ||
| const price = numerator.div(denominator).toNumber() / 1000; | ||
|
|
||
| return { | ||
| price, | ||
| type: isBuy ? "buy" : "sell", | ||
| timestamp: parseInt(event.timestamp), | ||
| // Convert to decimal value with 2 decimals | ||
| amount: (isBuy ? takerAmountFilled : makerAmountFilled).div(10_000).toNumber() / 100, | ||
| }; | ||
| }; | ||
|
|
||
| const querySubgraphOrderFilledEvents = async ( | ||
| httpClient: AxiosInstance, | ||
| subgraphEndpoint: string, | ||
| whereField: "takerAssetId" | "makerAssetId", | ||
| assetId: string, | ||
| pageSize = 1000, | ||
| startTimestamp?: number | ||
| ): Promise<OrderFilledEventSubgraph[]> => { | ||
| const allEvents: OrderFilledEventSubgraph[] = []; | ||
| let skip = 0; | ||
| let hasMore = true; | ||
|
|
||
| while (hasMore) { | ||
| // Build where clause with optional timestamp filter | ||
| const whereClause = startTimestamp | ||
| ? `{timestamp_gt: ${startTimestamp}, ${whereField}: "${assetId}"}` | ||
| : `{${whereField}: "${assetId}"}`; | ||
|
|
||
| const query = ` | ||
| { | ||
| orderFilledEvents( | ||
| where: ${whereClause}, | ||
| first: ${pageSize}, | ||
| skip: ${skip}, | ||
| orderBy: timestamp, | ||
| orderDirection: asc | ||
| ) { | ||
| id | ||
| transactionHash | ||
| makerAssetId | ||
| takerAssetId | ||
| maker | ||
| taker | ||
| makerAmountFilled | ||
| takerAmountFilled | ||
| fee | ||
| timestamp | ||
| orderHash | ||
| } | ||
| } | ||
| `; | ||
|
|
||
| const response = await httpClient.post<SubgraphOrderFilledResponse>(subgraphEndpoint, { query }); | ||
|
|
||
| if (response.data.errors?.length) { | ||
| throw new Error(response.data.errors.map((e) => e.message).join("; ")); | ||
| } | ||
|
|
||
| if (!response.data.data?.orderFilledEvents) { | ||
| throw new Error("Invalid response from subgraph"); | ||
| } | ||
|
|
||
| const events = response.data.data.orderFilledEvents; | ||
| allEvents.push(...events); | ||
|
|
||
| // If we got fewer events than pageSize, we've reached the end | ||
| hasMore = events.length === pageSize; | ||
| skip += pageSize; | ||
| } | ||
|
|
||
| return allEvents; | ||
| }; | ||
|
|
||
| const getOrderFilledEventsFromSubgraph = async ( | ||
| params: MonitoringParams, | ||
| clobTokenIds: [string, string], | ||
| startTimestamp?: number | ||
| ): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => { | ||
| // Query 4 combinations: takerAssetId for both tokens, makerAssetId for both tokens | ||
| const queries = [ | ||
| { whereField: "takerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 }, | ||
| { whereField: "takerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 }, | ||
| { whereField: "makerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 }, | ||
| { whereField: "makerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 }, | ||
| ]; | ||
|
|
||
| // Execute all queries in parallel | ||
| const queryResults = await Promise.all( | ||
| queries.map((q) => | ||
| querySubgraphOrderFilledEvents( | ||
| params.httpClient, | ||
| params.orderBookSubgraphEndpoint, | ||
| q.whereField, | ||
| q.assetId, | ||
| 1000, | ||
| startTimestamp | ||
| ) | ||
| ) | ||
| ); | ||
|
|
||
| // Group events by token index, deduplicating per token (same event can appear for both tokens) | ||
| const tokenOneEventIds = new Set<string>(); | ||
| const tokenTwoEventIds = new Set<string>(); | ||
| const tokenOneEvents: PolymarketTradeInformation[] = []; | ||
| const tokenTwoEvents: PolymarketTradeInformation[] = []; | ||
|
|
||
| // Process takerAssetId queries (index 0 and 1) | ||
| queryResults[0].forEach((event) => { | ||
| if (!tokenOneEventIds.has(event.id)) { | ||
| tokenOneEventIds.add(event.id); | ||
| tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event)); | ||
| } | ||
| }); | ||
| queryResults[1].forEach((event) => { | ||
| if (!tokenTwoEventIds.has(event.id)) { | ||
| tokenTwoEventIds.add(event.id); | ||
| tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event)); | ||
| } | ||
| }); | ||
|
|
||
| // Process makerAssetId queries (index 2 and 3) | ||
| queryResults[2].forEach((event) => { | ||
| if (!tokenOneEventIds.has(event.id)) { | ||
| tokenOneEventIds.add(event.id); | ||
| tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event)); | ||
| } | ||
| }); | ||
| queryResults[3].forEach((event) => { | ||
| if (!tokenTwoEventIds.has(event.id)) { | ||
| tokenTwoEventIds.add(event.id); | ||
| tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event)); | ||
| } | ||
| }); | ||
|
|
||
| // Sort by timestamp | ||
| const sortByTimestamp = (events: PolymarketTradeInformation[]): PolymarketTradeInformation[] => { | ||
| return events.sort((a, b) => a.timestamp - b.timestamp); | ||
| }; | ||
|
|
||
| return [sortByTimestamp(tokenOneEvents), sortByTimestamp(tokenTwoEvents)]; | ||
| }; | ||
|
|
||
| const checkSubgraphSyncStatus = async (httpClient: AxiosInstance, subgraphEndpoint: string): Promise<number | null> => { | ||
| const query = ` | ||
| { | ||
| _meta { | ||
| block { | ||
| number | ||
| } | ||
| } | ||
| } | ||
| `; | ||
|
|
||
| try { | ||
| const response = await httpClient.post<SubgraphMetaResponse>(subgraphEndpoint, { query }); | ||
|
|
||
| if (response.data.errors?.length) { | ||
| throw new Error(response.data.errors.map((e) => e.message).join("; ")); | ||
| } | ||
|
|
||
| if (!response.data.data?._meta?.block?.number) { | ||
| throw new Error("Invalid response from subgraph meta query"); | ||
| } | ||
|
|
||
| return response.data.data._meta.block.number; | ||
| } catch (error) { | ||
| // Return null if we can't check sync status, caller should handle gracefully | ||
| return null; | ||
| } | ||
| }; | ||
|
|
||
| const getOrderFilledEventsSlow = async ( | ||
| params: MonitoringParams, | ||
| clobTokenIds: [string, string], | ||
| startBlockNumber: number | ||
|
|
@@ -439,6 +652,58 @@ export const getOrderFilledEvents = async ( | |
| return [outcomeTokenOne, outcomeTokenTwo]; | ||
| }; | ||
|
|
||
| export const getOrderFilledEvents = async ( | ||
| params: MonitoringParams, | ||
| clobTokenIds: [string, string], | ||
| startBlockNumber: number, | ||
| logger: typeof Logger | ||
| ): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => { | ||
| try { | ||
| // Check subgraph sync status first | ||
| const subgraphBlockNumber = await checkSubgraphSyncStatus(params.httpClient, params.orderBookSubgraphEndpoint); | ||
|
|
||
| if (subgraphBlockNumber !== null) { | ||
| // Get current block from provider | ||
| const currentBlockNumber = await params.provider.getBlockNumber(); | ||
| const blockDifference = currentBlockNumber - subgraphBlockNumber; | ||
|
|
||
| // If subgraph is behind by more than tolerance, use slow method | ||
| if (blockDifference > params.subgraphSyncTolerance) { | ||
| logger.debug({ | ||
| at: "getOrderFilledEvents", | ||
| message: `Falling back to slow method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`, | ||
| }); | ||
| return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also might want to debug log the fallback |
||
| } | ||
|
|
||
| // Get the block timestamp from startBlockNumber | ||
| const startBlock = await params.provider.getBlock(startBlockNumber); | ||
| const startTimestamp = startBlock.timestamp; | ||
|
|
||
| // Try the fast subgraph version | ||
| logger.debug({ | ||
| at: "getOrderFilledEvents", | ||
| message: `Using fast subgraph method: subgraph is ${blockDifference} blocks behind (tolerance: ${params.subgraphSyncTolerance})`, | ||
| }); | ||
| return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp); | ||
| } else { | ||
| // If subgraphBlockNumber is null, we cannot evaluate sync tolerance, so fallback to slow method | ||
| logger.debug({ | ||
| at: "getOrderFilledEvents", | ||
| message: "Falling back to slow method: subgraph block number is null, cannot evaluate sync tolerance", | ||
| }); | ||
| return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); | ||
| } | ||
| } catch (error) { | ||
| // Fallback to the slow version if subgraph fails | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we debug log this for better observability |
||
| logger.debug({ | ||
| at: "getOrderFilledEvents", | ||
| message: `Falling back to slow method due to error: ${error instanceof Error ? error.message : String(error)}`, | ||
| }); | ||
| return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber); | ||
| } | ||
| }; | ||
|
|
||
| export const calculatePolymarketQuestionID = (ancillaryData: string): string => { | ||
| return ethers.utils.keccak256(ancillaryData); | ||
| }; | ||
|
|
@@ -907,6 +1172,12 @@ export const initMonitoringParams = async ( | |
|
|
||
| const orderBookBatchSize = env.ORDER_BOOK_BATCH_SIZE ? Number(env.ORDER_BOOK_BATCH_SIZE) : 499; | ||
|
|
||
| const orderBookSubgraphEndpoint = | ||
| env.ORDER_BOOK_SUBGRAPH_ENDPOINT || | ||
| "https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn"; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defaults to the public version (https://docs.polymarket.com/developers/subgraph/overview#hosted-version) |
||
|
|
||
| const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 1; | ||
|
|
||
| // Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs. | ||
| const httpClient = createHttpClient({ | ||
| axios: { timeout: httpTimeout }, | ||
|
|
@@ -948,9 +1219,11 @@ export const initMonitoringParams = async ( | |
| fillEventsProposalGapSeconds, | ||
| httpClient, | ||
| orderBookBatchSize, | ||
| orderBookSubgraphEndpoint, | ||
| ooV2Addresses, | ||
| ooV1Addresses, | ||
| aiConfig, | ||
| subgraphSyncTolerance, | ||
| }; | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when
subgraphBlockNumber === nullI don't think we should even trygetOrderFilledEventsFromSubgraphbelow and instead fallback togetOrderFilledEventsSlowassubgraphSyncTolerancecannot be evaluated in this case.