Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 141 additions & 5 deletions apps/task-poster/src/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ import { validateNumericParams } from "./util.js";
import { KVKey, KVQuery, KVTransactionResult } from "@cross/kv";
import { getTemplate, getTemplates, renderTemplate, escapeHTML } from "./templates.js";

export type TimeSlot = { from: string; to: string };
export type DayName = 'mon'|'tue'|'wed'|'thu'|'fri'|'sat'|'sun';
export type Schedule = Partial<Record<DayName, TimeSlot[]>>;

const DAYS: DayName[] = ['sun','mon','tue','wed','thu','fri','sat'];

const toMinutes = (t: string) => { const [h, m] = t.split(':').map(Number); return h * 60 + m; };

export const isWithinSchedule = (schedule?: Schedule): boolean => {
if (!schedule || Object.keys(schedule).length === 0) return true;
const now = new Date();
const slots = schedule[DAYS[now.getDay()]];
if (!slots?.length) return false;
const mins = now.getHours() * 60 + now.getMinutes();
return slots.some(s => (!s.from && !s.to) || (mins >= toMinutes(s.from) && mins < toMinutes(s.to)));
};

// TODO: can this come out of the protocol package?

type APIResponse = {
Expand Down Expand Up @@ -60,6 +77,9 @@ export type Fetcher = {

// fields for previous step fetchers
previousIndex?: number;

// posting schedule: whitelist of days/time windows when tasks can be posted
schedule?: Schedule;
};

const api = axios.create({
Expand Down Expand Up @@ -219,6 +239,97 @@ export const fetcherForm = async (
</section>
</fieldset>

<fieldset>
<legend>posting schedule</legend>
<p><small>Leave empty to post continuously. Enable day and add time windows to whitelist certain times during the week.</small></p>
<input type="hidden" name="schedule" id="scheduleJson"
value="${escapeHTML(JSON.stringify(values.schedule || {}))}" />
<div id="schedule-editor">
${(['mon','tue','wed','thu','fri','sat','sun'] as DayName[]).map(day => {
const slots: TimeSlot[] = (values.schedule as Schedule)?.[day] || [];
const enabled = slots.length > 0;
return `
<div class="schedule-day" data-day="${day}" style="margin-bottom:0.75rem">
<label class="checkbox">
<input type="checkbox" class="day-toggle" data-day="${day}"
${enabled ? 'checked' : ''} />
<strong>${day.charAt(0).toUpperCase() + day.slice(1)}</strong>
</label>
<div class="day-slots" ${!enabled ? 'style="display:none"' : ''}>
${(enabled ? slots : []).map((slot, i) => `
<div class="time-slot" style="display:flex;gap:0.5rem;align-items:center;margin:0.25rem 0">
<input type="time" class="slot-from" value="${slot.from || ''}" style="margin:0" />
<span>to</span>
<input type="time" class="slot-to" value="${slot.to || ''}" style="margin:0" />
<button type="button" class="remove-slot" style="padding:0.25rem 0.5rem;font-size:1.2rem;line-height:1">&times;</button>
</div>`).join('')}
<button type="button" class="add-slot" data-day="${day}"
style="margin:0.25rem 0;padding:0.25rem 0.5rem">+ Add slot</button>
</div>
</div>`;
}).join('')}
</div>
<script>
(function() {
const editor = document.getElementById('schedule-editor');
const hidden = document.getElementById('scheduleJson');

function syncSchedule() {
const schedule = {};
editor.querySelectorAll('.schedule-day').forEach(dayEl => {
const day = dayEl.dataset.day;
const toggle = dayEl.querySelector('.day-toggle');
if (!toggle.checked) return;
const slots = [];
dayEl.querySelectorAll('.time-slot').forEach(slotEl => {
slots.push({
from: slotEl.querySelector('.slot-from').value,
to: slotEl.querySelector('.slot-to').value
});
});
schedule[day] = slots.length ? slots : [{ from: '', to: '' }];
});
hidden.value = JSON.stringify(schedule);
}

editor.addEventListener('change', syncSchedule);
editor.addEventListener('input', syncSchedule);

editor.addEventListener('click', function(e) {
if (e.target.classList.contains('day-toggle')) {
const dayEl = e.target.closest('.schedule-day');
const slotsDiv = dayEl.querySelector('.day-slots');
slotsDiv.style.display = e.target.checked ? '' : 'none';
if (e.target.checked && !dayEl.querySelector('.time-slot')) {
addSlot(dayEl);
}
syncSchedule();
}
if (e.target.classList.contains('add-slot')) {
addSlot(e.target.closest('.schedule-day'));
syncSchedule();
}
if (e.target.classList.contains('remove-slot')) {
e.target.closest('.time-slot').remove();
syncSchedule();
}
});

function addSlot(dayEl) {
const btn = dayEl.querySelector('.add-slot');
const div = document.createElement('div');
div.className = 'time-slot';
div.style = 'display:flex;gap:0.5rem;align-items:center;margin:0.25rem 0';
div.innerHTML = '<input type="time" class="slot-from" style="margin:0" />'
+ '<span>to</span>'
+ '<input type="time" class="slot-to" style="margin:0" />'
+ '<button type="button" class="remove-slot" style="padding:0.25rem 0.5rem;font-size:1.2rem;line-height:1">&times;</button>';
btn.before(div);
}
})();
</script>
</fieldset>

<fieldset>
<legend>visibility</legend>
<div class="checkbox">
Expand Down Expand Up @@ -369,6 +480,14 @@ export const createFetcher = async (

status: oldFetcher?.status ?? "active",
hidden: fields.hidden === "on" || fields.hidden === true,

schedule: (() => {
try {
const s = typeof fields.schedule === 'string'
? JSON.parse(fields.schedule) : fields.schedule;
return s && Object.keys(s).length > 0 ? s : undefined;
} catch { return undefined; }
})(),
};

// if we are updating an older fetcher, copy over any optional
Expand Down Expand Up @@ -579,6 +698,16 @@ export const processFetcher = async (fetcher: Fetcher) => {
if (fetcher.status !== "active")
return 0;

// always check for results, even outside schedule
if (fetcher.engine === "effectai")
await processResults(fetcher, 20);

const withinSchedule = isWithinSchedule(fetcher.schedule);
if (!withinSchedule) {
console.log(`Fetcher [${fetcher.datasetId}, ${fetcher.index}] outside scheduled time, skipping`);
return 0;
}

publishProgress[fetcher.datasetId] ??= {};

const fid = [fetcher.datasetId, fetcher.index];
Expand All @@ -602,12 +731,7 @@ export const processFetcher = async (fetcher: Fetcher) => {
// process the tasks using the engine
let imported = 0;
if (fetcher.engine === "effectai") {
// import to effect ai
imported = await importTasks(fetcher);

// fetch results from effectai
// TODO: put proper value for result batch size
await processResults(fetcher, 20);
}

// release lock
Expand Down Expand Up @@ -921,6 +1045,18 @@ export const addFetcherRoutes = (app: Express): void => {
<li>Failed: ${failedSize}</li>
<li>Batch / Freq: ${f.batchSize} / ${f.frequency}</li>
<li>Time Limit: ${f.timeLimitSeconds}s</li>
<li>Schedule: ${f.schedule && Object.keys(f.schedule).length > 0
? Object.entries(f.schedule).map(([day, slots]) =>
`${day}: ${(slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ')}`
).join('; ')
: 'continuous (no restrictions)'}

${f.schedule && Object.keys(f.schedule).length > 0
? isWithinSchedule(f.schedule)
? ' <small style="color:green">(active now)</small>'
: ' <small style="color:orange">(outside schedule)</small>'
: ''}
</li>
</ul>

<section style="display: flex; gap: 1.0rem">
Expand Down