diff --git a/docs/docs/api/appkit/Class.Plugin.md b/docs/docs/api/appkit/Class.Plugin.md index 34034537f..a5002a097 100644 --- a/docs/docs/api/appkit/Class.Plugin.md +++ b/docs/docs/api/appkit/Class.Plugin.md @@ -211,6 +211,11 @@ Plugin initialization phase. abortActiveOperations(): void; ``` +Cancel in-flight work (abort signals, SSE streams). Runs in the first +phase of graceful shutdown, BEFORE any plugin's `shutdown()` hook — +so it must not tear down shared resources (e.g. connection pools) +that other plugins' hooks may still need. Put teardown in `shutdown()`. + #### Returns `void` diff --git a/packages/appkit/src/core/plugin-context.ts b/packages/appkit/src/core/plugin-context.ts index eda3f05a5..5ce6d22c8 100644 --- a/packages/appkit/src/core/plugin-context.ts +++ b/packages/appkit/src/core/plugin-context.ts @@ -24,6 +24,18 @@ interface RouteTarget { type ToolProviderPlugin = BasePlugin & ToolProvider & { asUser: (req: IAppRequest) => ToolProvider }; +/** + * Lifecycle events emitted through {@link PluginContext.emitLifecycle}. + * + * - `"setup:complete"` — emitted by AppKit core after every plugin's + * `setup()` has finished. + * - `"server:ready"` — emitted when the HTTP server is listening. + * - `"shutdown"` — emitted by the server plugin during graceful shutdown, + * AFTER all plugin `shutdown()` hooks have completed and BEFORE remaining + * connections are force-closed. The emit is bounded by a short timeout + * (see the server plugin's shutdown budget), so subscribers must not + * start long-running async work — finish quickly or be cut off. + */ type LifecycleEvent = "setup:complete" | "server:ready" | "shutdown"; /** @@ -222,6 +234,10 @@ export class PluginContext { /** * Register a lifecycle hook callback. + * + * See {@link LifecycleEvent} for event semantics. In particular, + * `"shutdown"` subscribers run inside a bounded shutdown phase and must + * not start long-running async work. */ onLifecycle(event: LifecycleEvent, fn: () => void | Promise): void { let hooks = this.lifecycleHooks.get(event); @@ -237,7 +253,8 @@ export class PluginContext { * Errors in individual callbacks are logged but do not prevent * other callbacks from running. * - * @internal Called by AppKit core only. + * @internal Called by AppKit core (`setup:complete`) and the server + * plugin (`shutdown`, during graceful shutdown). */ async emitLifecycle(event: LifecycleEvent): Promise { const hooks = this.lifecycleHooks.get(event); diff --git a/packages/appkit/src/plugins/lakebase/lakebase.ts b/packages/appkit/src/plugins/lakebase/lakebase.ts index b8b1b16be..1c4edd404 100644 --- a/packages/appkit/src/plugins/lakebase/lakebase.ts +++ b/packages/appkit/src/plugins/lakebase/lakebase.ts @@ -169,15 +169,23 @@ export class LakebasePlugin extends Plugin implements ToolProvider { /** * Gracefully drains and closes all connection pools (SP + OBO). - * Called automatically by AppKit during shutdown. + * + * Runs as the plugin's `shutdown()` hook (phase 3 of the server's graceful + * shutdown), NOT in `abortActiveOperations()` (phase 1): other plugins' + * `shutdown()` hooks may still need database connections to drain state, + * so the pools must outlive the abort phase. `pg.Pool#end()` waits for + * checked-out clients to be released, so hooks running concurrently with + * this one can still finish their in-flight queries. Errors are caught + * and logged; this hook never throws. */ - abortActiveOperations(): void { - super.abortActiveOperations(); + async shutdown(): Promise { if (this.pool) { logger.info("Closing Lakebase SP pool"); - this.pool.end().catch((err) => { + try { + await this.pool.end(); + } catch (err) { logger.error("Error closing Lakebase SP pool: %O", err); - }); + } this.pool = null; } if (this.oboPoolManager) { @@ -185,9 +193,11 @@ export class LakebasePlugin extends Plugin implements ToolProvider { "Closing all Lakebase OBO pools (%d)", this.oboPoolManager.size, ); - this.oboPoolManager.closeAll().catch((err) => { + try { + await this.oboPoolManager.closeAll(); + } catch (err) { logger.error("Error closing Lakebase OBO pools: %O", err); - }); + } this.oboPoolManager = null; } } diff --git a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts index 5f96d6efe..b8564ff8e 100644 --- a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts +++ b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts @@ -212,6 +212,53 @@ describe("LakebasePlugin — readOnly enforcement", () => { }); }); +describe("LakebasePlugin — shutdown", () => { + test("closes the SP pool and all OBO pools via shutdown()", async () => { + const { createLakebasePool, createLakebasePoolManager } = await import( + "../../../connectors/lakebase" + ); + const plugin = makePlugin({}); + await plugin.setup(); + + const spPool = vi.mocked(createLakebasePool).mock.results.at(-1)?.value as { + end: ReturnType; + }; + const oboManager = vi.mocked(createLakebasePoolManager).mock.results.at(-1) + ?.value as { closeAll: ReturnType }; + + await plugin.shutdown(); + + expect(spPool.end).toHaveBeenCalledTimes(1); + expect(oboManager.closeAll).toHaveBeenCalledTimes(1); + + // Idempotent: a second call has nothing left to close. + await plugin.shutdown(); + expect(spPool.end).toHaveBeenCalledTimes(1); + expect(oboManager.closeAll).toHaveBeenCalledTimes(1); + }); + + test("abortActiveOperations() does NOT tear down the pools (teardown is shutdown()'s job)", async () => { + const { createLakebasePool, createLakebasePoolManager } = await import( + "../../../connectors/lakebase" + ); + const plugin = makePlugin({}); + await plugin.setup(); + + const spPool = vi.mocked(createLakebasePool).mock.results.at(-1)?.value as { + end: ReturnType; + }; + const oboManager = vi.mocked(createLakebasePoolManager).mock.results.at(-1) + ?.value as { closeAll: ReturnType }; + + plugin.abortActiveOperations(); + + // Other plugins' shutdown() hooks may still need database connections + // to drain state — the pools must survive the abort phase. + expect(spPool.end).not.toHaveBeenCalled(); + expect(oboManager.closeAll).not.toHaveBeenCalled(); + }); +}); + describe("LakebasePlugin — destructive mode", () => { test("does NOT wrap in read-only transaction when readOnly: false", async () => { const queryMock = vi.fn((_text: string, _values?: unknown[]) => diff --git a/packages/appkit/src/plugins/server/index.ts b/packages/appkit/src/plugins/server/index.ts index e66abf5ad..c1024bed0 100644 --- a/packages/appkit/src/plugins/server/index.ts +++ b/packages/appkit/src/plugins/server/index.ts @@ -4,13 +4,14 @@ import path from "node:path"; import dotenv from "dotenv"; import express from "express"; import getPort, { portNumbers } from "get-port"; -import type { PluginClientConfigs, PluginPhase } from "shared"; +import type { BasePlugin, PluginClientConfigs, PluginPhase } from "shared"; +import { CacheManager } from "../../cache"; import { ServerError } from "../../errors"; import { TelemetryReporter } from "../../internal-telemetry"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; -import { instrumentations } from "../../telemetry"; +import { instrumentations, TelemetryManager } from "../../telemetry"; import { sanitizeClientConfig } from "./client-config-sanitizer"; import manifest from "./manifest.json"; import { RemoteTunnelController } from "./remote-tunnel/remote-tunnel-controller"; @@ -54,6 +55,36 @@ export class ServerPlugin extends Plugin { port: Number(process.env.DATABRICKS_APP_PORT) || 8000, }; + /** + * Overall graceful-shutdown budget before the process is force-exited. + * + * Budget arithmetic: plugin `shutdown()` hooks run concurrently and are + * bounded by {@link PLUGIN_SHUTDOWN_TIMEOUT_MS} (10s); the lifecycle emit + * is bounded by {@link PHASE_SHUTDOWN_TIMEOUT_MS} (2s); the cache storage + * close and the telemetry flush run concurrently, each bounded by + * {@link PHASE_SHUTDOWN_TIMEOUT_MS} (2s). Worst case is + * 10s + 2s + max(2s, 2s) = 14s, leaving ~1s of margin for the remaining + * steps (aborts, socket teardown) before this timer force-exits. + * + * The `server.close()` await (after `closeAllConnections()`) is unbounded + * by design: `closeAllConnections()` runs immediately before it, so it is + * expected to resolve promptly, and the force-exit timer is the backstop + * if it does not. + */ + private static readonly SHUTDOWN_TIMEOUT_MS = 15_000; + /** + * Per-plugin budget for `shutdown()` hooks. Sized to cover the longest + * built-in drain (the files plugin waits up to 10s for in-flight writes). + */ + private static readonly PLUGIN_SHUTDOWN_TIMEOUT_MS = 10_000; + /** + * Budget for each non-plugin shutdown phase (the `"shutdown"` lifecycle + * emit, the cache storage close, and the telemetry flush). Keeps the + * worst-case total under {@link SHUTDOWN_TIMEOUT_MS} — see the arithmetic + * there. + */ + private static readonly PHASE_SHUTDOWN_TIMEOUT_MS = 2_000; + /** Plugin manifest declaring metadata and resource requirements */ static manifest = manifest as PluginManifest<"server">; private serverApplication: express.Application; @@ -65,6 +96,18 @@ export class ServerPlugin extends Plugin { protected declare config: ServerConfig; private serverExtensions: ((app: express.Application) => void)[] = []; private rawBodyPaths: Set = new Set(); + /** + * Guards against re-entrant shutdown (e.g. SIGTERM followed by SIGINT). + * The flag set in `_gracefulShutdown` must remain synchronous and first — + * any `await` before it would open a window for a second signal to + * re-enter the sequence. + */ + private isShuttingDown = false; + /** + * Name of the shutdown phase currently in flight, so the force-exit log + * can say where shutdown got stuck without extra bookkeeping. + */ + private shutdownPhase = "not started"; static phase: PluginPhase = "deferred"; constructor(config: ServerConfig) { @@ -160,8 +203,13 @@ export class ServerPlugin extends Plugin { // attach server to remote tunnel controller this.remoteTunnelController.setServer(server); - process.on("SIGTERM", () => this._gracefulShutdown()); - process.on("SIGINT", () => this._gracefulShutdown()); + // With a server present, this plugin owns the telemetry flush: it is + // awaited inside _gracefulShutdown() after plugin hooks have run. + // Remove the TelemetryManager's standalone signal handlers so they + // cannot start the flush early (see TelemetryManager.disownSignalHandlers). + TelemetryManager.getInstance().disownSignalHandlers(); + process.once("SIGTERM", () => this._gracefulShutdown()); + process.once("SIGINT", () => this._gracefulShutdown()); if (process.env.NODE_ENV === "development") { const allRoutes = getRoutes(this.serverApplication._router.stack); @@ -399,22 +447,50 @@ export class ServerPlugin extends Plugin { } private async _gracefulShutdown() { + // Must stay synchronous and first: any await before the flag is set + // would let a second signal re-enter the shutdown sequence. + if (this.isShuttingDown) return; + this.isShuttingDown = true; + logger.info("Starting graceful shutdown..."); - if (this.viteDevServer) { - await this.viteDevServer.close(); - } + // Force exit once the overall budget is spent. Exit 0 is deliberate: + // a force-timeout still happens on a routine deploy (deliberate + // shutdown, not a crash), and orchestrators record nonzero exits on + // deploys as crashes. The error log below is the stuck-shutdown + // signal instead of the exit code. + const forceExitTimer = setTimeout(() => { + logger.error( + "Graceful shutdown did NOT complete within the %dms budget (phase in flight: %s); force-exiting with code 0.", + ServerPlugin.SHUTDOWN_TIMEOUT_MS, + this.shutdownPhase, + ); + process.exit(0); + }, ServerPlugin.SHUTDOWN_TIMEOUT_MS); + forceExitTimer.unref(); - if (this.remoteTunnelController) { - this.remoteTunnelController.cleanup(); - } + try { + this.shutdownPhase = "dev servers and tunnel cleanup"; + if (this.viteDevServer) { + await this.viteDevServer.close(); + } + + if (this.remoteTunnelController) { + this.remoteTunnelController.cleanup(); + } + + TelemetryReporter.getInstance()?.stop(); - TelemetryReporter.getInstance()?.stop(); + const plugins = this.context + ? Array.from(this.context.getPlugins().values()) + : []; - // 1. abort active operations from plugins - const shutdownPlugins = this.context?.getPlugins(); - if (shutdownPlugins) { - for (const plugin of shutdownPlugins.values()) { + // 1. abort active operations from plugins (in-flight executions, + // SSE streams). Cancellation only — resource teardown (e.g. the + // lakebase pools) belongs in plugin shutdown() hooks so other + // plugins can still drain state through them. + this.shutdownPhase = "aborting active operations"; + for (const plugin of plugins) { if (plugin.abortActiveOperations) { try { plugin.abortActiveOperations(); @@ -427,22 +503,149 @@ export class ServerPlugin extends Plugin { } } } - } - // 2. close the server - if (this.server) { - this.server.close(() => { + // 2. stop accepting new connections and drop idle keep-alive sockets. + // Without this, any connected browser pins `server.close()` open + // until the force-exit timeout fires. + let serverClosed: Promise | undefined; + if (this.server) { + const server = this.server; + serverClosed = new Promise((resolve) => { + server.close(() => resolve()); + }); + server.closeIdleConnections(); + } + + // 3. run every plugin's shutdown() hook concurrently, each bounded + // by a per-plugin timeout so one hung plugin cannot stall exit. + this.shutdownPhase = "plugin shutdown() hooks"; + await Promise.all( + plugins + .filter((plugin) => typeof plugin.shutdown === "function") + .map((plugin) => this.runPluginShutdown(plugin)), + ); + + // 4. notify lifecycle subscribers, bounded so a slow subscriber + // cannot eat the remaining budget. + this.shutdownPhase = "shutdown lifecycle emit"; + try { + await this.raceWithTimeout( + this.context?.emitLifecycle("shutdown"), + ServerPlugin.PHASE_SHUTDOWN_TIMEOUT_MS, + "shutdown lifecycle emit", + ); + } catch (err) { + logger.error("Error emitting shutdown lifecycle event: %O", err); + } + + // 5. force-close whatever sockets remain (aborted SSE responses, + // keep-alive connections) so `server.close()` can complete. + this.shutdownPhase = "closing remaining connections"; + if (this.server) { + this.server.closeAllConnections(); + } + if (serverClosed) { + await serverClosed; logger.debug("Server closed gracefully"); - process.exit(0); - }); - - // 3. timeout to force shutdown after 15 seconds - setTimeout(() => { - logger.debug("Force shutdown after timeout"); - process.exit(1); - }, 15000); - } else { - process.exit(0); + } + + // 6. close the cache manager's storage (drains the persistent + // Lakebase pool; no-op for in-memory storage) and flush telemetry. + // Runs after the lifecycle emit so subscribers can still read the + // cache. The two are independent (the flush never touches the + // cache), so they run concurrently — each bounded so a stuck pool + // drain or stalled OTLP export cannot eat the remaining budget. + // The flush runs inside the orchestrated shutdown instead of + // racing a standalone TelemetryManager signal handler against + // process.exit (see disownSignalHandlers in start()). + this.shutdownPhase = "cache storage close + telemetry flush"; + const closeCacheStorage = async () => { + let cache: CacheManager; + try { + cache = CacheManager.getInstanceSync(); + } catch { + // Cache was never initialized — nothing to close. + return; + } + try { + await this.raceWithTimeout( + cache.close(), + ServerPlugin.PHASE_SHUTDOWN_TIMEOUT_MS, + "cache storage close", + ); + } catch (err) { + logger.error("Error closing cache storage during shutdown: %O", err); + } + }; + const flushTelemetry = async () => { + try { + await this.raceWithTimeout( + TelemetryManager.getInstance().shutdown(), + ServerPlugin.PHASE_SHUTDOWN_TIMEOUT_MS, + "telemetry flush", + ); + } catch (err) { + logger.error("Error flushing telemetry during shutdown: %O", err); + } + }; + await Promise.all([closeCacheStorage(), flushTelemetry()]); + } catch (err) { + logger.error("Error during graceful shutdown: %O", err); + clearTimeout(forceExitTimer); + process.exit(1); + return; + } + + clearTimeout(forceExitTimer); + logger.info("Graceful shutdown complete"); + process.exit(0); + } + + /** + * Race `work` against a timeout. Rejects with a labeled error when the + * timeout wins. A no-op rejection handler is attached to the work promise + * before racing so a branch that rejects after the timeout already won + * does not surface as an unhandledRejection. + */ + private async raceWithTimeout( + work: Promise | T, + timeoutMs: number, + label: string, + ): Promise { + const promise = Promise.resolve(work); + promise.catch(() => {}); + let timer: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timer = setTimeout( + () => reject(new Error(`${label} timed out after ${timeoutMs}ms`)), + timeoutMs, + ); + timer.unref(); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } + } + + /** + * Run a single plugin's `shutdown()` hook bounded by + * {@link ServerPlugin.PLUGIN_SHUTDOWN_TIMEOUT_MS}. Errors and timeouts + * are logged but never thrown so one misbehaving plugin cannot block + * the rest of the shutdown sequence. + */ + private async runPluginShutdown(plugin: BasePlugin): Promise { + try { + await this.raceWithTimeout( + plugin.shutdown?.(), + ServerPlugin.PLUGIN_SHUTDOWN_TIMEOUT_MS, + "shutdown()", + ); + } catch (err) { + logger.error("Error shutting down plugin %s: %O", plugin.name, err); } } diff --git a/packages/appkit/src/plugins/server/tests/server.test.ts b/packages/appkit/src/plugins/server/tests/server.test.ts index bbc961723..552e23e66 100644 --- a/packages/appkit/src/plugins/server/tests/server.test.ts +++ b/packages/appkit/src/plugins/server/tests/server.test.ts @@ -1,5 +1,13 @@ import type { BasePlugin } from "shared"; -import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { + afterEach, + beforeEach, + describe, + expect, + type MockInstance, + test, + vi, +} from "vitest"; import { PluginContext } from "../../../core/plugin-context"; // Use vi.hoisted for mocks that need to be available before module loading @@ -12,6 +20,8 @@ const { } = vi.hoisted(() => { const httpServer = { close: vi.fn((cb: any) => cb?.()), + closeIdleConnections: vi.fn(), + closeAllConnections: vi.fn(), on: vi.fn(), address: vi.fn().mockReturnValue({ port: 8000 }), }; @@ -85,6 +95,10 @@ vi.mock("express", () => { // Mock dependencies before imports vi.mock("../../../telemetry", () => ({ TelemetryManager: { + getInstance: vi.fn().mockReturnValue({ + shutdown: vi.fn().mockResolvedValue(undefined), + disownSignalHandlers: vi.fn(), + }), getProvider: vi.fn().mockReturnValue({ getTracer: vi.fn().mockReturnValue({ startActiveSpan: vi.fn() }), getMeter: vi.fn().mockReturnValue({ @@ -107,6 +121,7 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), + close: vi.fn().mockResolvedValue(undefined), }), }, })); @@ -186,6 +201,8 @@ vi.mock("../client-config-sanitizer", () => ({ import fs from "node:fs"; import express from "express"; +import { CacheManager } from "../../../cache"; +import { TelemetryManager } from "../../../telemetry"; import { sanitizeClientConfig } from "../client-config-sanitizer"; import { ServerPlugin } from "../index"; import { RemoteTunnelController } from "../remote-tunnel/remote-tunnel-controller"; @@ -695,12 +712,22 @@ describe("ServerPlugin", () => { }); describe("_gracefulShutdown", () => { - test("aborts plugin operations (with error isolation) and closes server", async () => { - vi.useFakeTimers(); + let exitSpy: MockInstance; + + beforeEach(() => { mockLoggerError.mockClear(); - const exitSpy = vi + exitSpy = vi .spyOn(process, "exit") .mockImplementation(((_code?: number) => undefined) as any); + }); + + afterEach(() => { + exitSpy.mockRestore(); + vi.useRealTimers(); + }); + + test("aborts plugin operations (with error isolation) and closes server", async () => { + vi.useFakeTimers(); const plugin = new ServerPlugin({ context: createContextWithPlugins({ @@ -725,10 +752,296 @@ describe("ServerPlugin", () => { expect(mockLoggerError).toHaveBeenCalled(); expect(mockHttpServer.close).toHaveBeenCalled(); - expect(exitSpy).toHaveBeenCalled(); + expect(exitSpy).toHaveBeenCalledWith(0); + }); - exitSpy.mockRestore(); - vi.useRealTimers(); + test("calls plugin shutdown() hooks concurrently during graceful shutdown", async () => { + const shutdownA = vi.fn().mockResolvedValue(undefined); + const shutdownB = vi.fn().mockResolvedValue(undefined); + const noHooks = { name: "no-hooks" }; + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + a: { name: "a", shutdown: shutdownA }, + b: { name: "b", shutdown: shutdownB }, + "no-hooks": noHooks, + }), + } as any); + (plugin as any).server = mockHttpServer; + + await (plugin as any)._gracefulShutdown(); + + expect(shutdownA).toHaveBeenCalledTimes(1); + expect(shutdownB).toHaveBeenCalledTimes(1); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("a failing plugin shutdown() is logged and does not abort shutdown", async () => { + const failing = vi.fn().mockRejectedValue(new Error("drain failed")); + const healthy = vi.fn().mockResolvedValue(undefined); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + failing: { name: "failing", shutdown: failing }, + healthy: { name: "healthy", shutdown: healthy }, + }), + } as any); + (plugin as any).server = mockHttpServer; + + await (plugin as any)._gracefulShutdown(); + + expect(failing).toHaveBeenCalledTimes(1); + expect(healthy).toHaveBeenCalledTimes(1); + expect( + mockLoggerError.mock.calls.some((c) => + String(c[0]).includes("Error shutting down plugin"), + ), + ).toBe(true); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("a hanging plugin shutdown() does not block past its timeout", async () => { + vi.useFakeTimers(); + + const hanging = vi.fn(() => new Promise(() => {})); + const fast = vi.fn().mockResolvedValue(undefined); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + hanging: { name: "hanging", shutdown: hanging }, + fast: { name: "fast", shutdown: fast }, + }), + } as any); + (plugin as any).server = mockHttpServer; + + const done = (plugin as any)._gracefulShutdown(); + await vi.advanceTimersByTimeAsync(10_000); + await done; + + expect(hanging).toHaveBeenCalledTimes(1); + expect(fast).toHaveBeenCalledTimes(1); + expect( + mockLoggerError.mock.calls.some( + (c) => + String(c[0]).includes("Error shutting down plugin") && + c[1] === "hanging" && + String(c[2]).includes("timed out"), + ), + ).toBe(true); + // graceful path completed before the 15s force-exit timer + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("emits the 'shutdown' lifecycle event", async () => { + const ctx = createContextWithPlugins({}); + const hook = vi.fn(); + ctx.onLifecycle("shutdown", hook); + + const plugin = new ServerPlugin({ context: ctx } as any); + (plugin as any).server = mockHttpServer; + + await (plugin as any)._gracefulShutdown(); + + expect(hook).toHaveBeenCalledTimes(1); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("closes idle and remaining connections so close() can complete", async () => { + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + (plugin as any).server = mockHttpServer; + + await (plugin as any)._gracefulShutdown(); + + expect(mockHttpServer.closeIdleConnections).toHaveBeenCalledTimes(1); + expect(mockHttpServer.closeAllConnections).toHaveBeenCalledTimes(1); + expect(mockHttpServer.close).toHaveBeenCalledTimes(1); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("is not re-entrant — a second signal is a no-op", async () => { + const shutdownHook = vi.fn().mockResolvedValue(undefined); + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + a: { name: "a", shutdown: shutdownHook }, + }), + } as any); + (plugin as any).server = mockHttpServer; + + await Promise.all([ + (plugin as any)._gracefulShutdown(), + (plugin as any)._gracefulShutdown(), + ]); + await (plugin as any)._gracefulShutdown(); + + expect(shutdownHook).toHaveBeenCalledTimes(1); + expect(mockHttpServer.close).toHaveBeenCalledTimes(1); + }); + + test("exits 0 without a server instance", async () => { + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + + await (plugin as any)._gracefulShutdown(); + + expect(exitSpy).toHaveBeenCalledWith(0); + expect(mockHttpServer.close).not.toHaveBeenCalled(); + }); + + test("a hanging telemetry flush cannot hang shutdown — the flush timeout still exits 0", async () => { + vi.useFakeTimers(); + + const hangingFlush = vi.fn(() => new Promise(() => {})); + vi.mocked(TelemetryManager.getInstance).mockReturnValueOnce({ + shutdown: hangingFlush, + disownSignalHandlers: vi.fn(), + } as any); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + (plugin as any).server = mockHttpServer; + + const done = (plugin as any)._gracefulShutdown(); + // The per-phase budget (2s) is what unblocks the flush — well before + // the 15s force-exit timer. + await vi.advanceTimersByTimeAsync(2_000); + await done; + + expect(hangingFlush).toHaveBeenCalledTimes(1); + expect( + mockLoggerError.mock.calls.some( + (c) => + String(c[0]).includes("Error flushing telemetry") && + String(c[1]).includes("timed out"), + ), + ).toBe(true); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("runs phases in order: abort → closeIdle → plugin hooks → lifecycle emit → closeAll → cache close + flush (concurrent) → exit", async () => { + const order: string[] = []; + + const ctx = createContextWithPlugins({ + a: { + name: "a", + abortActiveOperations: vi.fn(() => { + order.push("abort"); + }), + shutdown: vi.fn(async () => { + order.push("plugin-shutdown"); + }), + }, + }); + ctx.onLifecycle("shutdown", () => { + order.push("lifecycle"); + }); + + mockHttpServer.closeIdleConnections.mockImplementationOnce(() => { + order.push("closeIdle"); + }); + mockHttpServer.closeAllConnections.mockImplementationOnce(() => { + order.push("closeAll"); + }); + vi.mocked(TelemetryManager.getInstance).mockReturnValueOnce({ + shutdown: vi.fn(async () => { + order.push("flush"); + }), + disownSignalHandlers: vi.fn(), + } as any); + exitSpy.mockImplementationOnce(((_code?: number) => { + order.push("exit"); + }) as any); + + const plugin = new ServerPlugin({ context: ctx } as any); + (plugin as any).server = mockHttpServer; + // Set after construction: the Plugin constructor itself consumes one + // getInstanceSync() call when binding the cache eagerly. + vi.mocked(CacheManager.getInstanceSync).mockReturnValueOnce({ + close: vi.fn(async () => { + order.push("cache-close"); + }), + } as any); + + await (plugin as any)._gracefulShutdown(); + + expect(order.slice(0, 5)).toEqual([ + "abort", + "closeIdle", + "plugin-shutdown", + "lifecycle", + "closeAll", + ]); + // The cache close and the telemetry flush run concurrently, so only + // assert that both happen after closeAll and before exit — their + // relative order is unspecified. + expect(order.slice(5, 7).sort()).toEqual(["cache-close", "flush"]); + expect(order[7]).toBe("exit"); + expect(order).toHaveLength(8); + }); + + test("a hanging cache close cannot hang shutdown — the close timeout still exits 0", async () => { + vi.useFakeTimers(); + + const hangingClose = vi.fn(() => new Promise(() => {})); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + (plugin as any).server = mockHttpServer; + // Set after construction: the Plugin constructor itself consumes one + // getInstanceSync() call when binding the cache eagerly. + vi.mocked(CacheManager.getInstanceSync).mockReturnValueOnce({ + close: hangingClose, + } as any); + + const done = (plugin as any)._gracefulShutdown(); + // The per-phase budget (2s) is what unblocks the close — well before + // the 15s force-exit timer. + await vi.advanceTimersByTimeAsync(2_000); + await done; + + expect(hangingClose).toHaveBeenCalledTimes(1); + expect( + mockLoggerError.mock.calls.some( + (c) => + String(c[0]).includes("Error closing cache storage") && + String(c[1]).includes("timed out"), + ), + ).toBe(true); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("a plugin shutdown() that rejects after its timeout already won does not crash", async () => { + vi.useFakeTimers(); + + let rejectLate: ((err: Error) => void) | undefined; + const lateRejecting = vi.fn( + () => + new Promise((_, reject) => { + rejectLate = reject; + }), + ); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({ + late: { name: "late", shutdown: lateRejecting }, + }), + } as any); + (plugin as any).server = mockHttpServer; + + const done = (plugin as any)._gracefulShutdown(); + await vi.advanceTimersByTimeAsync(10_000); + await done; + + // The hook loses the race, then rejects afterwards — must be + // swallowed by the pre-attached no-op handler, not crash the process. + rejectLate?.(new Error("late rejection")); + await vi.advanceTimersByTimeAsync(0); + + expect(exitSpy).toHaveBeenCalledWith(0); }); }); }); diff --git a/packages/appkit/src/telemetry/telemetry-manager.ts b/packages/appkit/src/telemetry/telemetry-manager.ts index 6660b6b2c..65c0a4a9e 100644 --- a/packages/appkit/src/telemetry/telemetry-manager.ts +++ b/packages/appkit/src/telemetry/telemetry-manager.ts @@ -35,6 +35,9 @@ export class TelemetryManager { private static instance?: TelemetryManager; private sdk?: NodeSDK; + private shutdownPromise?: Promise; + /** Signal handler installed by {@link registerShutdown}, kept so {@link disownSignalHandlers} can remove it. */ + private signalShutdownFn?: () => Promise; /** * Create a scoped telemetry provider for a specific plugin. @@ -158,24 +161,62 @@ export class TelemetryManager { ]; } + /** + * Install standalone signal handlers that flush telemetry on + * SIGTERM/SIGINT. These handlers own the flush only in server-less usage: + * when a ServerPlugin starts, it calls {@link disownSignalHandlers} and + * its orchestrated graceful shutdown becomes the single owner of the + * flush (awaiting {@link shutdown} after plugin hooks have run). + */ private registerShutdown() { const shutdownFn = async () => { await TelemetryManager.getInstance().shutdown(); }; + this.signalShutdownFn = shutdownFn; process.once("SIGTERM", shutdownFn); process.once("SIGINT", shutdownFn); } - private async shutdown(): Promise { - if (!this.sdk) { - return; - } + /** + * Remove the signal handlers installed by {@link registerShutdown}. + * + * Called by the ServerPlugin when it starts: with a server present, the + * server's graceful shutdown owns the telemetry flush and awaits + * {@link shutdown} after plugin shutdown() hooks have run. Left in + * place, the standalone handler would start the flush at signal time — + * before plugin hooks have produced their final telemetry — defeating + * the orchestration. Server-less apps never call this, so the + * standalone handlers keep working for them. Idempotent. + */ + disownSignalHandlers(): void { + if (!this.signalShutdownFn) return; + process.removeListener("SIGTERM", this.signalShutdownFn); + process.removeListener("SIGINT", this.signalShutdownFn); + this.signalShutdownFn = undefined; + } - try { - await this.sdk.shutdown(); + /** + * Flush and shut down the OpenTelemetry SDK. + * + * Idempotent: the SDK reference is cleared synchronously and concurrent + * or repeated calls await the same in-flight flush. Public so the server + * plugin can await the flush inside its orchestrated graceful shutdown + * instead of racing the signal handler registered in + * {@link registerShutdown} against `process.exit`. + */ + async shutdown(): Promise { + if (this.sdk) { + const sdk = this.sdk; this.sdk = undefined; - } catch (error) { - logger.error("Error shutting down: %O", error); + this.shutdownPromise = (async () => { + try { + await sdk.shutdown(); + } catch (error) { + logger.error("Error shutting down: %O", error); + } + })(); } + + return this.shutdownPromise; } } diff --git a/packages/shared/src/plugin.ts b/packages/shared/src/plugin.ts index da4800548..68c11b408 100644 --- a/packages/shared/src/plugin.ts +++ b/packages/shared/src/plugin.ts @@ -16,8 +16,26 @@ export type { ResourceFieldEntry, DiscoveryDescriptor, PluginScaffoldingRules }; export interface BasePlugin { name: string; + /** + * Cancel in-flight work (abort signals, SSE streams). Runs in the first + * phase of graceful shutdown, BEFORE any plugin's `shutdown()` hook — + * so it must not tear down shared resources (e.g. connection pools) + * that other plugins' hooks may still need. Put teardown in `shutdown()`. + */ abortActiveOperations?(): void; + /** + * Optional graceful-shutdown hook. + * + * Invoked by the server plugin during graceful shutdown (SIGTERM/SIGINT), + * after `abortActiveOperations()` has run and the server has stopped + * accepting new connections. Use it to drain in-flight work or release + * resources. Each plugin's hook is bounded by a per-plugin timeout and + * runs concurrently with other plugins' hooks; errors are logged and + * never abort the shutdown. + */ + shutdown?(): Promise | void; + setup(): Promise; injectRoutes(router: express.Router): void;