Skip to content
Closed
116 changes: 116 additions & 0 deletions workers/main/src/configs/schedules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { Client } from '@temporalio/client';

import { logger } from '../logger';
import { workerConfig } from './worker';

const SCHEDULE_ID = 'weekly-financial-report-schedule';

/**
* Checks if an error is a "not found" error
*/
function validateIsScheduleNotFoundError(error: unknown): boolean {
return (
(error as { code?: number }).code === 5 ||
(error instanceof Error &&
error.message.toLowerCase().includes('not found'))
);
}

/**
* Checks if schedule exists, returns true if it exists
*/
async function validateScheduleExists(client: Client): Promise<boolean> {
try {
const scheduleHandle = client.schedule.getHandle(SCHEDULE_ID);

await scheduleHandle.describe();
logger.info(`Schedule ${SCHEDULE_ID} already exists, skipping creation`);

return true;
} catch (error) {
if (!validateIsScheduleNotFoundError(error)) {
throw error;
}
logger.info(`Schedule ${SCHEDULE_ID} not found, creating new schedule`);

return false;
}
}

/**
* Creates schedule with race condition protection
*/
async function createScheduleWithRaceProtection(client: Client): Promise<void> {
try {
await client.schedule.create({
scheduleId: SCHEDULE_ID,
spec: {
cronExpressions: ['0 13 * * 2'],
timezone: 'America/New_York',
},
action: {
type: 'startWorkflow',
workflowType: 'weeklyFinancialReportsWorkflow',
taskQueue: workerConfig.taskQueue,
workflowId: `weekly-financial-report-scheduled`,
},
policies: {
overlap: 'SKIP',
catchupWindow: '1 day',
},
});

logger.info(
`Successfully created schedule ${SCHEDULE_ID} for weekly financial reports`,
);
} catch (createError) {
// Handle race condition: schedule was created by another worker
const isAlreadyExists =
(createError as { code?: number }).code === 6 ||
(createError instanceof Error &&
(createError.message.toLowerCase().includes('already exists') ||
createError.message.toLowerCase().includes('already running')));

if (isAlreadyExists) {
logger.info(
`Schedule ${SCHEDULE_ID} already exists (created by another worker), treating as success`,
);

return;
}

throw createError;
}
}

/**
* Sets up the weekly financial report schedule
* Schedule runs every Tuesday at 1 PM America/New_York time (EST/EDT)
* @param client - Temporal client instance
*/
export async function setupWeeklyReportSchedule(client: Client): Promise<void> {
try {
const isScheduleExists = await validateScheduleExists(client);

if (isScheduleExists) {
return;
}

await createScheduleWithRaceProtection(client);
} catch (error) {
logger.error(
`Failed to setup schedule ${SCHEDULE_ID}: ${error instanceof Error ? error.message : String(error)}`,
);
throw error;
}
}

/**
* Schedule configuration exported for documentation and testing
*/
export const scheduleConfig = {
scheduleId: SCHEDULE_ID,
cronExpression: '0 13 * * 2',
timezone: 'America/New_York',
description: 'Runs every Tuesday at 1 PM EST/EDT',
} as const;
16 changes: 16 additions & 0 deletions workers/main/src/configs/temporal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@ import { z } from 'zod';

const DEFAULT_TEMPORAL_ADDRESS = 'temporal:7233';

/**
* Temporal connection configuration
* Used by both workers and clients to connect to Temporal server
*/
export const temporalConfig: NativeConnectionOptions = {
address: process.env.TEMPORAL_ADDRESS || DEFAULT_TEMPORAL_ADDRESS,
};

export const temporalSchema = z.object({
TEMPORAL_ADDRESS: z.string().default(DEFAULT_TEMPORAL_ADDRESS),
});

/**
* Schedule Configuration Documentation
*
* Weekly Financial Report Schedule:
* The schedule is automatically created/verified when the worker starts.
*
* For schedule configuration details (schedule ID, cron expression, timezone, etc.),
* see the exported `scheduleConfig` object in ./schedules.ts
*
* Implementation: ./schedules.ts
*/
3 changes: 2 additions & 1 deletion workers/main/src/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import { handleRunError, logger } from './index';
import { handleRunError } from './index';
import { logger } from './logger';

describe('handleRunError', () => {
let processExitSpy: ReturnType<typeof vi.spyOn>;
Expand Down
23 changes: 20 additions & 3 deletions workers/main/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { DefaultLogger, NativeConnection, Worker } from '@temporalio/worker';
import { Client, Connection } from '@temporalio/client';
import { NativeConnection, Worker } from '@temporalio/worker';

import * as activities from './activities';
import { validateEnv } from './common/utils';
import { setupWeeklyReportSchedule } from './configs/schedules';
import { temporalConfig } from './configs/temporal';
import { workerConfig } from './configs/worker';

export const logger = new DefaultLogger('ERROR');
import { logger } from './logger';

validateEnv();

Expand All @@ -25,6 +26,22 @@ export async function createWorker(connection: NativeConnection) {
}

export async function run(): Promise<void> {
// Setup weekly report schedule before starting worker
const clientConnection = await Connection.connect(temporalConfig);

try {
const client = new Client({ connection: clientConnection });

await setupWeeklyReportSchedule(client);
} catch (err) {
logger.error(
`Failed to setup schedule: ${err instanceof Error ? err.message : String(err)}`,
);
} finally {
await clientConnection.close();
}

// Create and run worker
const connection = await createConnection();

try {
Expand Down
8 changes: 8 additions & 0 deletions workers/main/src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { DefaultLogger } from '@temporalio/worker';

/**
* Shared logger instance for the worker
* Using INFO level to capture important operational messages
* including schedule setup, errors, and warnings
*/
export const logger = new DefaultLogger('INFO');
Loading