From f0351cd04091815c8db8571328e197b80fa23519 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 15 May 2026 17:16:50 -0700 Subject: [PATCH 1/3] improvement(redis): strip idempotency body and cap mothership stream zsets --- .../copilot/request/session/buffer.test.ts | 8 +++-- .../sim/lib/copilot/request/session/buffer.ts | 1 + apps/sim/lib/core/idempotency/service.ts | 31 +++++++++++++++++-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/copilot/request/session/buffer.test.ts b/apps/sim/lib/copilot/request/session/buffer.test.ts index 951cdbc648d..8cfafa7e943 100644 --- a/apps/sim/lib/copilot/request/session/buffer.test.ts +++ b/apps/sim/lib/copilot/request/session/buffer.test.ts @@ -149,7 +149,7 @@ describe('mothership-stream-outbox', () => { expect(replayed.map((entry) => entry.payload.text)).toEqual(['world']) }) - it('does not trim active stream history while appending events', async () => { + it('trims active stream history to eventLimit on every append', async () => { const cursor = await allocateCursor('stream-1') await appendEvent( @@ -163,7 +163,11 @@ describe('mothership-stream-outbox', () => { }) ) - expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled() + expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith( + 'mothership_stream:stream-1:events', + 0, + expect.any(Number) + ) }) it('clears persisted stream state during teardown cleanup', async () => { diff --git a/apps/sim/lib/copilot/request/session/buffer.ts b/apps/sim/lib/copilot/request/session/buffer.ts index 6ee42bedc97..810b0cd5a88 100644 --- a/apps/sim/lib/copilot/request/session/buffer.ts +++ b/apps/sim/lib/copilot/request/session/buffer.ts @@ -144,6 +144,7 @@ export async function appendEvents( zaddArgs.push(envelope.seq, JSON.stringify(envelope)) } pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array])) + pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1) pipeline.expire(key, config.ttlSeconds) pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds) await pipeline.exec() diff --git a/apps/sim/lib/core/idempotency/service.ts b/apps/sim/lib/core/idempotency/service.ts index b5428075a07..23a202140de 100644 --- a/apps/sim/lib/core/idempotency/service.ts +++ b/apps/sim/lib/core/idempotency/service.ts @@ -16,6 +16,16 @@ export interface IdempotencyConfig { namespace?: string /** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */ retryFailures?: boolean + /** + * When false, the operation's return value is not persisted alongside + * the dedupe marker — only `{ success, status, error? }` is stored. + * Duplicate calls still short-circuit, but `executeWithIdempotency` + * resolves to `undefined` on the dedupe path. Use for webhook/polling + * flows where the cached body is large (multi-KB execution results) + * and callers don't consume the value of a duplicated delivery. + * Defaults to true. + */ + storeResultBody?: boolean /** * Force a specific storage backend regardless of the environment's * auto-detection. Use `'database'` for correctness-critical flows @@ -77,6 +87,7 @@ export class IdempotencyService { ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL, namespace: config.namespace ?? 'default', retryFailures: config.retryFailures ?? false, + storeResultBody: config.storeResultBody ?? true, } this.storageMethod = config.forceStorage ?? getStorageMethod() logger.info(`IdempotencyService using ${this.storageMethod} storage`, { @@ -441,7 +452,9 @@ export class IdempotencyService { await this.storeResult( claimResult.normalizedKey, - { success: true, result, status: 'completed' }, + this.config.storeResultBody + ? { success: true, result, status: 'completed' } + : { success: true, status: 'completed' }, claimResult.storageMethod ) @@ -510,15 +523,29 @@ export class IdempotencyService { } } +/** + * Webhook idempotency. We're the receiver of provider-initiated webhooks, + * not the originator — duplicate deliveries from the provider's retry + * machinery just need a "we saw this" marker, not a replayable response + * body. `storeResultBody: false` drops the cached workflow result from + * each key, eliminating the long tail of large gmail/outlook payloads + * that pushed Redis Cloud into OOM on 2026-05-15. + * + * TTL stays at 7 days because that's the longest provider retry window + * we care about (Gmail / Pub/Sub). With body-stripping the per-key cost + * is ~150 bytes, so the long TTL is essentially free. + */ export const webhookIdempotency = new IdempotencyService({ namespace: 'webhook', - ttlSeconds: 60 * 60 * 24 * 7, // 7 days + ttlSeconds: 60 * 60 * 24 * 7, // 7 days — must exceed Gmail/Pub-Sub retry window + storeResultBody: false, }) export const pollingIdempotency = new IdempotencyService({ namespace: 'polling', ttlSeconds: 60 * 60 * 24 * 3, // 3 days retryFailures: true, + storeResultBody: false, }) /** From 26ffeb958fe17c7022beb52b0f92408b465ed601 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 15 May 2026 17:17:49 -0700 Subject: [PATCH 2/3] chore(redis): trim verbose comments on idempotency body-strip --- apps/sim/lib/core/idempotency/service.ts | 25 ++++++++---------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/apps/sim/lib/core/idempotency/service.ts b/apps/sim/lib/core/idempotency/service.ts index 23a202140de..c4a0e9f25fb 100644 --- a/apps/sim/lib/core/idempotency/service.ts +++ b/apps/sim/lib/core/idempotency/service.ts @@ -17,12 +17,10 @@ export interface IdempotencyConfig { /** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */ retryFailures?: boolean /** - * When false, the operation's return value is not persisted alongside - * the dedupe marker — only `{ success, status, error? }` is stored. - * Duplicate calls still short-circuit, but `executeWithIdempotency` - * resolves to `undefined` on the dedupe path. Use for webhook/polling - * flows where the cached body is large (multi-KB execution results) - * and callers don't consume the value of a duplicated delivery. + * When false, only `{ success, status, error? }` is persisted — not the + * operation's return value. Duplicate calls still short-circuit but + * resolve to `undefined`. Use when callers don't consume the cached + * body (e.g. webhook receivers, where the provider just wants a 2xx). * Defaults to true. */ storeResultBody?: boolean @@ -524,20 +522,13 @@ export class IdempotencyService { } /** - * Webhook idempotency. We're the receiver of provider-initiated webhooks, - * not the originator — duplicate deliveries from the provider's retry - * machinery just need a "we saw this" marker, not a replayable response - * body. `storeResultBody: false` drops the cached workflow result from - * each key, eliminating the long tail of large gmail/outlook payloads - * that pushed Redis Cloud into OOM on 2026-05-15. - * - * TTL stays at 7 days because that's the longest provider retry window - * we care about (Gmail / Pub/Sub). With body-stripping the per-key cost - * is ~150 bytes, so the long TTL is essentially free. + * As a webhook receiver we only need a "we saw this delivery" marker — + * the provider's retry just needs a 2xx, not our cached response body. + * TTL must exceed the longest provider retry window (Gmail / Pub-Sub: 7d). */ export const webhookIdempotency = new IdempotencyService({ namespace: 'webhook', - ttlSeconds: 60 * 60 * 24 * 7, // 7 days — must exceed Gmail/Pub-Sub retry window + ttlSeconds: 60 * 60 * 24 * 7, // 7 days storeResultBody: false, }) From 78b3d96d772fc50224a6d33dd1ef4e7ea8e6a3b9 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 15 May 2026 17:28:23 -0700 Subject: [PATCH 3/3] test(buffer): pin exact ZREMRANGEBYRANK stop arg Pinning -5_001 (= -(DEFAULT_EVENT_LIMIT) - 1) so the off-by-one boundary is directly validated; expect.any(Number) would have passed a wrong formula like -eventLimit. Co-Authored-By: Claude Opus 4.7 --- apps/sim/lib/copilot/request/session/buffer.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/lib/copilot/request/session/buffer.test.ts b/apps/sim/lib/copilot/request/session/buffer.test.ts index 8cfafa7e943..6aa9bc0752a 100644 --- a/apps/sim/lib/copilot/request/session/buffer.test.ts +++ b/apps/sim/lib/copilot/request/session/buffer.test.ts @@ -166,7 +166,7 @@ describe('mothership-stream-outbox', () => { expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith( 'mothership_stream:stream-1:events', 0, - expect.any(Number) + -5_001 ) })