diff --git a/apps/sim/lib/copilot/request/session/buffer.test.ts b/apps/sim/lib/copilot/request/session/buffer.test.ts index 951cdbc648d..6aa9bc0752a 100644 --- a/apps/sim/lib/copilot/request/session/buffer.test.ts +++ b/apps/sim/lib/copilot/request/session/buffer.test.ts @@ -149,7 +149,7 @@ describe('mothership-stream-outbox', () => { expect(replayed.map((entry) => entry.payload.text)).toEqual(['world']) }) - it('does not trim active stream history while appending events', async () => { + it('trims active stream history to eventLimit on every append', async () => { const cursor = await allocateCursor('stream-1') await appendEvent( @@ -163,7 +163,11 @@ describe('mothership-stream-outbox', () => { }) ) - expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled() + expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith( + 'mothership_stream:stream-1:events', + 0, + -5_001 + ) }) it('clears persisted stream state during teardown cleanup', async () => { diff --git a/apps/sim/lib/copilot/request/session/buffer.ts b/apps/sim/lib/copilot/request/session/buffer.ts index 6ee42bedc97..810b0cd5a88 100644 --- a/apps/sim/lib/copilot/request/session/buffer.ts +++ b/apps/sim/lib/copilot/request/session/buffer.ts @@ -144,6 +144,7 @@ export async function appendEvents( zaddArgs.push(envelope.seq, JSON.stringify(envelope)) } pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array])) + pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1) pipeline.expire(key, config.ttlSeconds) pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds) await pipeline.exec() diff --git a/apps/sim/lib/core/idempotency/service.ts b/apps/sim/lib/core/idempotency/service.ts index b5428075a07..c4a0e9f25fb 100644 --- a/apps/sim/lib/core/idempotency/service.ts +++ b/apps/sim/lib/core/idempotency/service.ts @@ -16,6 +16,14 @@ export interface IdempotencyConfig { namespace?: string /** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */ retryFailures?: boolean + /** + * When false, only `{ success, status, error? }` is persisted — not the + * operation's return value. Duplicate calls still short-circuit but + * resolve to `undefined`. Use when callers don't consume the cached + * body (e.g. webhook receivers, where the provider just wants a 2xx). + * Defaults to true. + */ + storeResultBody?: boolean /** * Force a specific storage backend regardless of the environment's * auto-detection. Use `'database'` for correctness-critical flows @@ -77,6 +85,7 @@ export class IdempotencyService { ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL, namespace: config.namespace ?? 'default', retryFailures: config.retryFailures ?? false, + storeResultBody: config.storeResultBody ?? true, } this.storageMethod = config.forceStorage ?? getStorageMethod() logger.info(`IdempotencyService using ${this.storageMethod} storage`, { @@ -441,7 +450,9 @@ export class IdempotencyService { await this.storeResult( claimResult.normalizedKey, - { success: true, result, status: 'completed' }, + this.config.storeResultBody + ? { success: true, result, status: 'completed' } + : { success: true, status: 'completed' }, claimResult.storageMethod ) @@ -510,15 +521,22 @@ export class IdempotencyService { } } +/** + * As a webhook receiver we only need a "we saw this delivery" marker — + * the provider's retry just needs a 2xx, not our cached response body. + * TTL must exceed the longest provider retry window (Gmail / Pub-Sub: 7d). + */ export const webhookIdempotency = new IdempotencyService({ namespace: 'webhook', ttlSeconds: 60 * 60 * 24 * 7, // 7 days + storeResultBody: false, }) export const pollingIdempotency = new IdempotencyService({ namespace: 'polling', ttlSeconds: 60 * 60 * 24 * 3, // 3 days retryFailures: true, + storeResultBody: false, }) /**