Fix duplicate quest event processing and make events idempotent
This commit is contained in:
parent
0427e47ba8
commit
c15e6eac7a
@ -265,23 +265,26 @@ export async function createQuestEvent(data: {
|
|||||||
return rows[0]
|
return rows[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getUnprocessedEvents() {
|
export async function claimUnprocessedEvents(limit = 20) {
|
||||||
const { rows } = await pool.query(
|
const { rows } = await pool.query(
|
||||||
`SELECT * FROM quest_events
|
`WITH next_events AS (
|
||||||
|
SELECT id
|
||||||
|
FROM quest_events
|
||||||
WHERE processed = false
|
WHERE processed = false
|
||||||
ORDER BY created_at ASC
|
ORDER BY created_at ASC
|
||||||
LIMIT 20`
|
FOR UPDATE SKIP LOCKED
|
||||||
|
LIMIT $1
|
||||||
|
)
|
||||||
|
UPDATE quest_events q
|
||||||
|
SET processed = true
|
||||||
|
FROM next_events n
|
||||||
|
WHERE q.id = n.id
|
||||||
|
RETURNING q.*`,
|
||||||
|
[limit]
|
||||||
)
|
)
|
||||||
return rows
|
return rows
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function markEventProcessed(eventId: string) {
|
|
||||||
await pool.query(
|
|
||||||
'UPDATE quest_events SET processed = true WHERE id = $1',
|
|
||||||
[eventId]
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================
|
// ============================================
|
||||||
// Chat History
|
// Chat History
|
||||||
// ============================================
|
// ============================================
|
||||||
|
|||||||
@ -43,31 +43,33 @@ async function handler(req: ApiRequest, res: ApiResponse) {
|
|||||||
return res.status(404).json({ error: 'User not found' })
|
return res.status(404).json({ error: 'User not found' })
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write event to DB (bot will process it asynchronously)
|
|
||||||
await createQuestEvent({
|
|
||||||
quest_id: questId,
|
|
||||||
user_id: user.id,
|
|
||||||
point_id: pointId,
|
|
||||||
event_type: eventType,
|
|
||||||
payload,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Immediate state updates for UI responsiveness
|
// Immediate state updates for UI responsiveness
|
||||||
let nextAction: string = 'wait'
|
let nextAction: string = 'wait'
|
||||||
let updatedPoint = null
|
let updatedPoint = null
|
||||||
|
let shouldCreateEvent = true
|
||||||
|
const currentPoint = pointId ? await getPointById(pointId) : null
|
||||||
|
|
||||||
switch (eventType) {
|
switch (eventType) {
|
||||||
case 'arrived': {
|
case 'arrived': {
|
||||||
if (pointId) {
|
if (pointId) {
|
||||||
|
if (currentPoint?.arrived_at) {
|
||||||
|
shouldCreateEvent = false
|
||||||
|
updatedPoint = currentPoint
|
||||||
|
} else {
|
||||||
await markPointArrived(pointId)
|
await markPointArrived(pointId)
|
||||||
updatedPoint = await getPointById(pointId)
|
updatedPoint = await getPointById(pointId)
|
||||||
|
}
|
||||||
nextAction = 'show_content'
|
nextAction = 'show_content'
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'point_completed': {
|
case 'point_completed': {
|
||||||
if (pointId) {
|
if (pointId) {
|
||||||
|
if (currentPoint?.status === 'completed' || currentPoint?.status === 'skipped') {
|
||||||
|
shouldCreateEvent = false
|
||||||
|
} else {
|
||||||
await updatePointStatus(pointId, 'completed')
|
await updatePointStatus(pointId, 'completed')
|
||||||
|
}
|
||||||
// Check if there are more points in the day
|
// Check if there are more points in the day
|
||||||
const quest = await getActiveQuestForUser(user.id)
|
const quest = await getActiveQuestForUser(user.id)
|
||||||
if (quest) {
|
if (quest) {
|
||||||
@ -87,7 +89,11 @@ async function handler(req: ApiRequest, res: ApiResponse) {
|
|||||||
}
|
}
|
||||||
case 'skipped': {
|
case 'skipped': {
|
||||||
if (pointId) {
|
if (pointId) {
|
||||||
|
if (currentPoint?.status === 'completed' || currentPoint?.status === 'skipped') {
|
||||||
|
shouldCreateEvent = false
|
||||||
|
} else {
|
||||||
await updatePointStatus(pointId, 'skipped')
|
await updatePointStatus(pointId, 'skipped')
|
||||||
|
}
|
||||||
nextAction = 'next_point'
|
nextAction = 'next_point'
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@ -112,6 +118,17 @@ async function handler(req: ApiRequest, res: ApiResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write event to DB (bot will process it asynchronously)
|
||||||
|
if (shouldCreateEvent) {
|
||||||
|
await createQuestEvent({
|
||||||
|
quest_id: questId,
|
||||||
|
user_id: user.id,
|
||||||
|
point_id: pointId,
|
||||||
|
event_type: eventType,
|
||||||
|
payload,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return res.json({
|
return res.json({
|
||||||
success: true,
|
success: true,
|
||||||
updatedPoint,
|
updatedPoint,
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import type { Bot } from 'grammy'
|
import type { Bot } from 'grammy'
|
||||||
import type { BotContext } from '../types.js'
|
import type { BotContext } from '../types.js'
|
||||||
import {
|
import {
|
||||||
getUnprocessedEvents, markEventProcessed,
|
claimUnprocessedEvents,
|
||||||
getPointById,
|
getPointById,
|
||||||
getActiveQuestForUser, getActiveDayForQuest,
|
getActiveQuestForUser, getActiveDayForQuest,
|
||||||
} from '../../api/_lib/db.js'
|
} from '../../api/_lib/db.js'
|
||||||
@ -10,14 +10,25 @@ import { generateNextPoint, generatePointContent } from './quest.service.js'
|
|||||||
import { t, getLang } from '../i18n/index.js'
|
import { t, getLang } from '../i18n/index.js'
|
||||||
|
|
||||||
let processorInterval: ReturnType<typeof setInterval> | null = null
|
let processorInterval: ReturnType<typeof setInterval> | null = null
|
||||||
|
let isProcessing = false
|
||||||
|
|
||||||
export function startEventProcessor(bot: Bot<BotContext>) {
|
export function startEventProcessor(bot: Bot<BotContext>) {
|
||||||
|
if (processorInterval) {
|
||||||
|
console.warn('Event processor already started, skipping duplicate start')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Poll every 3 seconds for unprocessed events
|
// Poll every 3 seconds for unprocessed events
|
||||||
processorInterval = setInterval(async () => {
|
processorInterval = setInterval(async () => {
|
||||||
|
if (isProcessing) return
|
||||||
|
isProcessing = true
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await processEvents(bot)
|
await processEvents(bot)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Event processor error:', error)
|
console.error('Event processor error:', error)
|
||||||
|
} finally {
|
||||||
|
isProcessing = false
|
||||||
}
|
}
|
||||||
}, 3000)
|
}, 3000)
|
||||||
|
|
||||||
@ -32,16 +43,14 @@ export function stopEventProcessor() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function processEvents(bot: Bot<BotContext>) {
|
async function processEvents(bot: Bot<BotContext>) {
|
||||||
const events = await getUnprocessedEvents()
|
// Atomically claim events to prevent double processing across overlapping ticks/processes.
|
||||||
|
const events = await claimUnprocessedEvents(20)
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
try {
|
try {
|
||||||
await processEvent(bot, event)
|
await processEvent(bot, event)
|
||||||
await markEventProcessed(event.id)
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(`Failed to process event ${event.id}:`, error)
|
console.error(`Failed to process event ${event.id}:`, error)
|
||||||
// Still mark as processed to avoid infinite retry
|
|
||||||
await markEventProcessed(event.id)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user