From c15e6eac7a277afa8e9d2a0772fcb75a6c820aad Mon Sep 17 00:00:00 2001 From: Jaymiesh Date: Wed, 11 Feb 2026 16:00:59 +0700 Subject: [PATCH] Fix duplicate quest event processing and make events idempotent --- api/_lib/db.ts | 27 ++++++++++++--------- api/quest/events.ts | 43 +++++++++++++++++++++++---------- bot/services/event-processor.ts | 19 +++++++++++---- 3 files changed, 59 insertions(+), 30 deletions(-) diff --git a/api/_lib/db.ts b/api/_lib/db.ts index 5659702..8473f78 100644 --- a/api/_lib/db.ts +++ b/api/_lib/db.ts @@ -265,23 +265,26 @@ export async function createQuestEvent(data: { return rows[0] } -export async function getUnprocessedEvents() { +export async function claimUnprocessedEvents(limit = 20) { const { rows } = await pool.query( - `SELECT * FROM quest_events - WHERE processed = false - ORDER BY created_at ASC - LIMIT 20` + `WITH next_events AS ( + SELECT id + FROM quest_events + WHERE processed = false + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT $1 + ) + UPDATE quest_events q + SET processed = true + FROM next_events n + WHERE q.id = n.id + RETURNING q.*`, + [limit] ) return rows } -export async function markEventProcessed(eventId: string) { - await pool.query( - 'UPDATE quest_events SET processed = true WHERE id = $1', - [eventId] - ) -} - // ============================================ // Chat History // ============================================ diff --git a/api/quest/events.ts b/api/quest/events.ts index 8dd7031..d4b485b 100644 --- a/api/quest/events.ts +++ b/api/quest/events.ts @@ -43,31 +43,33 @@ async function handler(req: ApiRequest, res: ApiResponse) { return res.status(404).json({ error: 'User not found' }) } - // Write event to DB (bot will process it asynchronously) - await createQuestEvent({ - quest_id: questId, - user_id: user.id, - point_id: pointId, - event_type: eventType, - payload, - }) - // Immediate state updates for UI responsiveness let nextAction: string = 'wait' let updatedPoint = null + let shouldCreateEvent = true + const currentPoint = pointId ? await getPointById(pointId) : null switch (eventType) { case 'arrived': { if (pointId) { - await markPointArrived(pointId) - updatedPoint = await getPointById(pointId) + if (currentPoint?.arrived_at) { + shouldCreateEvent = false + updatedPoint = currentPoint + } else { + await markPointArrived(pointId) + updatedPoint = await getPointById(pointId) + } nextAction = 'show_content' } break } case 'point_completed': { if (pointId) { - await updatePointStatus(pointId, 'completed') + if (currentPoint?.status === 'completed' || currentPoint?.status === 'skipped') { + shouldCreateEvent = false + } else { + await updatePointStatus(pointId, 'completed') + } // Check if there are more points in the day const quest = await getActiveQuestForUser(user.id) if (quest) { @@ -87,7 +89,11 @@ async function handler(req: ApiRequest, res: ApiResponse) { } case 'skipped': { if (pointId) { - await updatePointStatus(pointId, 'skipped') + if (currentPoint?.status === 'completed' || currentPoint?.status === 'skipped') { + shouldCreateEvent = false + } else { + await updatePointStatus(pointId, 'skipped') + } nextAction = 'next_point' } break @@ -112,6 +118,17 @@ async function handler(req: ApiRequest, res: ApiResponse) { } } + // Write event to DB (bot will process it asynchronously) + if (shouldCreateEvent) { + await createQuestEvent({ + quest_id: questId, + user_id: user.id, + point_id: pointId, + event_type: eventType, + payload, + }) + } + return res.json({ success: true, updatedPoint, diff --git a/bot/services/event-processor.ts b/bot/services/event-processor.ts index 40ddd23..da5e60f 100644 --- a/bot/services/event-processor.ts +++ b/bot/services/event-processor.ts @@ -1,7 +1,7 @@ import type { Bot } from 'grammy' import type { BotContext } from '../types.js' import { - getUnprocessedEvents, markEventProcessed, + claimUnprocessedEvents, getPointById, getActiveQuestForUser, getActiveDayForQuest, } from '../../api/_lib/db.js' @@ -10,14 +10,25 @@ import { generateNextPoint, generatePointContent } from './quest.service.js' import { t, getLang } from '../i18n/index.js' let processorInterval: ReturnType | null = null +let isProcessing = false export function startEventProcessor(bot: Bot) { + if (processorInterval) { + console.warn('Event processor already started, skipping duplicate start') + return + } + // Poll every 3 seconds for unprocessed events processorInterval = setInterval(async () => { + if (isProcessing) return + isProcessing = true + try { await processEvents(bot) } catch (error) { console.error('Event processor error:', error) + } finally { + isProcessing = false } }, 3000) @@ -32,16 +43,14 @@ export function stopEventProcessor() { } async function processEvents(bot: Bot) { - const events = await getUnprocessedEvents() + // Atomically claim events to prevent double processing across overlapping ticks/processes. + const events = await claimUnprocessedEvents(20) for (const event of events) { try { await processEvent(bot, event) - await markEventProcessed(event.id) } catch (error) { console.error(`Failed to process event ${event.id}:`, error) - // Still mark as processed to avoid infinite retry - await markEventProcessed(event.id) } } }