From 790f05bdb6e692979857da29d266f12f602cc0d7 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 12:40:15 -0500 Subject: [PATCH 01/10] Add diagnostics_channel TracingChannel support to pg and pg-pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enables instrumentation libraries (OpenTelemetry, etc.) to subscribe to structured events without monkey-patching. Uses TracingChannel for async context propagation and plain channels for simple events. Channels: - pg:query (TracingChannel) — query lifecycle with result enrichment - pg:connection (TracingChannel) — client connect lifecycle - pg:pool:connect (TracingChannel) — pool checkout lifecycle - pg:pool:release (plain) — client released back to pool - pg:pool:remove (plain) — client removed from pool All instrumentation is guarded by hasSubscribers for zero overhead when unused. Gracefully degrades to no-ops on Node < 19.9 or non-Node environments. Closes #3619 --- packages/pg-pool/diagnostics.js | 27 +++ packages/pg-pool/index.js | 34 +++ packages/pg-pool/test/diagnostics.js | 192 +++++++++++++++++ packages/pg/lib/client.js | 72 +++++-- packages/pg/lib/diagnostics.js | 23 ++ .../pg/test/unit/client/diagnostics-tests.js | 197 ++++++++++++++++++ 6 files changed, 532 insertions(+), 13 deletions(-) create mode 100644 packages/pg-pool/diagnostics.js create mode 100644 packages/pg-pool/test/diagnostics.js create mode 100644 packages/pg/lib/diagnostics.js create mode 100644 packages/pg/test/unit/client/diagnostics-tests.js diff --git a/packages/pg-pool/diagnostics.js b/packages/pg-pool/diagnostics.js new file mode 100644 index 000000000..99c4ce4d9 --- /dev/null +++ b/packages/pg-pool/diagnostics.js @@ -0,0 +1,27 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let poolConnectChannel = noopChannel +let poolReleaseChannel = noopChannel +let poolRemoveChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + poolConnectChannel = dc.tracingChannel('pg:pool:connect') + } + if (typeof dc.channel === 'function') { + poolReleaseChannel = dc.channel('pg:pool:release') + poolRemoveChannel = dc.channel('pg:pool:remove') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index ab514fa88..42c02fb6b 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -1,5 +1,6 @@ 'use strict' const EventEmitter = require('events').EventEmitter +const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } = require('./diagnostics') const NOOP = function () {} @@ -178,6 +179,10 @@ class Pool extends EventEmitter { this._clients = this._clients.filter((c) => c !== client) const context = this + if (poolRemoveChannel.hasSubscribers) { + poolRemoveChannel.publish({ client: { processID: client.processID } }) + } + client.end(() => { context.emit('remove', client) @@ -196,6 +201,31 @@ class Pool extends EventEmitter { const response = promisify(this.Promise, cb) const result = response.result + if (poolConnectChannel.hasSubscribers) { + const context = { + pool: { + totalCount: this.totalCount, + idleCount: this.idleCount, + waitingCount: this.waitingCount, + maxSize: this.options.max, + }, + } + const origCb = response.callback + const enrichedCb = (err, client, done) => { + if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount } + return origCb(err, client, done) + } + poolConnectChannel.traceCallback( + (tracedCb) => { + response.callback = tracedCb + }, + 0, + context, + null, + enrichedCb + ) + } + // if we don't have to connect a new client, don't do so if (this._isFull() || this._idle.length) { // if we have idle clients schedule a pulse immediately @@ -388,6 +418,10 @@ class Pool extends EventEmitter { this.emit('release', err, client) + if (poolReleaseChannel.hasSubscribers) { + poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined }) + } + // TODO(bmc): expose a proper, public interface _queryable and _ending if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { if (client._poolUseCount >= this.options.maxUses) { diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js new file mode 100644 index 000000000..ab0b344ac --- /dev/null +++ b/packages/pg-pool/test/diagnostics.js @@ -0,0 +1,192 @@ +'use strict' + +const expect = require('expect.js') +const EventEmitter = require('events').EventEmitter +const describe = require('mocha').describe +const it = require('mocha').it +const dc = require('diagnostics_channel') +const Pool = require('../') + +function mockClient(methods) { + return function () { + const client = new EventEmitter() + client.end = function (cb) { + if (cb) process.nextTick(cb) + } + client._queryable = true + client._ending = false + client.processID = 12345 + Object.assign(client, methods) + return client + } +} + +describe('diagnostics channels', function () { + describe('pg:pool:connect', function () { + it('publishes start event when connect is called', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let capturedContext + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(capturedContext).to.be.ok() + expect(capturedContext.pool).to.be.ok() + expect(capturedContext.pool.maxSize).to.be(10) + expect(capturedContext.pool.totalCount).to.be.a('number') + + channel.unsubscribe(subs) + done() + }) + }) + }) + + it('enriches context with client info on asyncEnd', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: () => {}, + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + expect(ctx.client).to.be.ok() + expect(ctx.client.processID).to.be(12345) + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end() + }) + }) + }) + + describe('pg:pool:release', function () { + it('publishes when a client is released', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.client).to.be.ok() + expect(releaseMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + + it('includes error when released with error', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + const releaseError = new Error('test error') + release(releaseError) + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.error).to.be(releaseError) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) + + describe('pg:pool:remove', function () { + it('publishes when a client is removed', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let removeMessage + const channel = dc.channel('pg:pool:remove') + const onMessage = (msg) => { + removeMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + // release with error to trigger removal + release(new Error('force remove')) + pool.end(() => { + expect(removeMessage).to.be.ok() + expect(removeMessage.client).to.be.ok() + expect(removeMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index d6c57194c..70d300638 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -9,6 +9,7 @@ const Query = require('./query') const defaults = require('./defaults') const Connection = require('./connection') const crypto = require('./crypto/utils') +const { queryChannel, connectionChannel } = require('./diagnostics') const activeQueryDeprecationNotice = nodeUtils.deprecate( () => {}, @@ -220,19 +221,33 @@ class Client extends EventEmitter { connect(callback) { if (callback) { - this._connect(callback) + if (connectionChannel.hasSubscribers) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) + } else { + this._connect(callback) + } return } - return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve(this) - } + const connectPromise = () => + new this._Promise((resolve, reject) => { + this._connect((error) => { + if (error) reject(error) + else resolve(this) + }) }) - }) + + if (connectionChannel.hasSubscribers) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + return connectionChannel.tracePromise(connectPromise, context) + } + + return connectPromise() } _attachListeners(con) { @@ -705,11 +720,42 @@ class Client extends EventEmitter { return result } - if (this._queryQueue.length > 0) { - queryQueueLengthDeprecationNotice() + const enqueue = () => { + if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice() + this._queryQueue.push(query) + this._pulseQueryQueue() + } + + if (queryChannel.hasSubscribers) { + const context = { + query: { text: query.text, name: query.name, rowMode: query._rowMode }, + client: { + database: this.database, + host: this.host, + port: this.port, + user: this.user, + processID: this.processID, + ssl: !!this.ssl, + }, + } + const origCb = query.callback + const enrichedCb = (err, res) => { + if (res) context.result = { rowCount: res.rowCount, command: res.command } + return origCb(err, res) + } + queryChannel.traceCallback( + (tracedCb) => { + query.callback = tracedCb + enqueue() + }, + 0, + context, + null, + enrichedCb + ) + } else { + enqueue() } - this._queryQueue.push(query) - this._pulseQueryQueue() return result } diff --git a/packages/pg/lib/diagnostics.js b/packages/pg/lib/diagnostics.js new file mode 100644 index 000000000..ca991b535 --- /dev/null +++ b/packages/pg/lib/diagnostics.js @@ -0,0 +1,23 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let queryChannel = noopChannel +let connectionChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + queryChannel = dc.tracingChannel('pg:query') + connectionChannel = dc.tracingChannel('pg:connection') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +module.exports = { queryChannel, connectionChannel } diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js new file mode 100644 index 000000000..41bd264fd --- /dev/null +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -0,0 +1,197 @@ +'use strict' +const helper = require('./test-helper') +const assert = require('assert') +const dc = require('diagnostics_channel') + +const suite = new helper.Suite() +const test = suite.test.bind(suite) + +test('query diagnostics channel', function () { + test('publishes start and asyncEnd on successful query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + // asyncEnd fires after the callback, so check everything here + assert.equal(events.length, 2) + assert.equal(events[0].type, 'start') + assert.equal(events[0].context.query.text, 'SELECT 1') + assert.equal(events[0].context.client.database, client.database) + + assert.equal(events[1].type, 'asyncEnd') + assert.equal(events[1].context.result.command, 'SELECT') + assert.equal(events[1].context.result.rowCount, 1) + + channel.unsubscribe(subs) + done() + }, + error: (ctx) => events.push({ type: 'error', context: ctx }), + } + + channel.subscribe(subs) + + client.query('SELECT 1', (err, res) => { + assert.ifError(err) + }) + + // simulate query execution + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + test('publishes error on failed query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: (ctx) => { + events.push({ type: 'error', context: ctx }) + + const startEvent = events.find((e) => e.type === 'start') + assert.ok(startEvent) + assert.equal(startEvent.context.query.text, 'BAD QUERY') + + channel.unsubscribe(subs) + done() + }, + } + + channel.subscribe(subs) + + client.query('BAD QUERY', (err) => { + assert.ok(err) + }) + + // simulate error + client.connection.emit('errorMessage', { + severity: 'ERROR', + message: 'syntax error', + }) + }) + + test('query context includes client info', function (done) { + const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) + client.connection.emit('readyForQuery') + + let capturedContext + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.equal(capturedContext.client.host, 'localhost') + assert.equal(capturedContext.client.user, 'testuser') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1', () => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + test('promise query publishes diagnostics', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + assert.ok(events.find((e) => e.type === 'start')) + assert.equal(events[0].context.query.text, 'SELECT 1') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1').then(() => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) +}) + +test('connection diagnostics channel', function () { + test('publishes start on connect with callback', function (done) { + const Connection = require('../../../lib/connection') + const { Client } = helper + + let capturedContext + const channel = dc.tracingChannel('pg:connection') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.ok(capturedContext) + assert.equal(capturedContext.connection.database, 'testdb') + assert.equal(capturedContext.connection.host, 'myhost') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + const connection = new Connection({ stream: 'no' }) + connection.startup = function () {} + connection.connect = function () {} + const client = new Client({ connection: connection, database: 'testdb', host: 'myhost', port: 5432 }) + + client.connect((err) => { + assert.ifError(err) + }) + + // simulate successful connection + connection.emit('connect') + connection.emit('readyForQuery') + }) +}) From d391ee537c8c515e7a39fcea4f92009a07aa2f85 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 13:13:29 -0500 Subject: [PATCH 02/10] Skip TracingChannel tests on Node < 19.9 TracingChannel is not available on Node 18 LTS. Skip the tracing-dependent tests gracefully instead of failing. --- packages/pg-pool/test/diagnostics.js | 6 ++++-- .../pg/test/unit/client/diagnostics-tests.js | 18 +++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index ab0b344ac..1ce6cb6c0 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -7,6 +7,8 @@ const it = require('mocha').it const dc = require('diagnostics_channel') const Pool = require('../') +const hasTracingChannel = typeof dc.tracingChannel === 'function' + function mockClient(methods) { return function () { const client = new EventEmitter() @@ -23,7 +25,7 @@ function mockClient(methods) { describe('diagnostics channels', function () { describe('pg:pool:connect', function () { - it('publishes start event when connect is called', function (done) { + ;(hasTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { @@ -61,7 +63,7 @@ describe('diagnostics channels', function () { }) }) - it('enriches context with client info on asyncEnd', function (done) { + ;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js index 41bd264fd..46aed29f9 100644 --- a/packages/pg/test/unit/client/diagnostics-tests.js +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -3,11 +3,15 @@ const helper = require('./test-helper') const assert = require('assert') const dc = require('diagnostics_channel') +const hasTracingChannel = typeof dc.tracingChannel === 'function' + const suite = new helper.Suite() const test = suite.test.bind(suite) +// pass undefined as callback to skip when TracingChannel is unavailable +const testTracing = (name, cb) => test(name, hasTracingChannel ? cb : undefined) -test('query diagnostics channel', function () { - test('publishes start and asyncEnd on successful query', function (done) { +testTracing('query diagnostics channel', function () { + testTracing('publishes start and asyncEnd on successful query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -50,7 +54,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('publishes error on failed query', function (done) { + testTracing('publishes error on failed query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -87,7 +91,7 @@ test('query diagnostics channel', function () { }) }) - test('query context includes client info', function (done) { + testTracing('query context includes client info', function (done) { const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) client.connection.emit('readyForQuery') @@ -120,7 +124,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('promise query publishes diagnostics', function (done) { + testTracing('promise query publishes diagnostics', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -154,8 +158,8 @@ test('query diagnostics channel', function () { }) }) -test('connection diagnostics channel', function () { - test('publishes start on connect with callback', function (done) { +testTracing('connection diagnostics channel', function () { + testTracing('publishes start on connect with callback', function (done) { const Connection = require('../../../lib/connection') const { Client } = helper From ada6cabb972a8c8426014bed4713bbc4a17082df Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 14:04:40 -0500 Subject: [PATCH 03/10] fix: format --- packages/pg-pool/test/diagnostics.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index 1ce6cb6c0..357bb7b69 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -62,7 +62,6 @@ describe('diagnostics channels', function () { }) }) }) - ;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({ From f78909bf7532dd4c97e6bd9fafbb4951f1946ef5 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Sun, 15 Mar 2026 03:36:31 -0400 Subject: [PATCH 04/10] fix: use shouldTrace guard for Node 18 compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Node 18 backported TracingChannel but without the aggregated `hasSubscribers` getter (it returns `undefined` instead of a boolean). Raw truthiness checks treat `undefined` as "no subscribers" which silently disables tracing on Node 18. Replace all `channel.hasSubscribers` guards with `shouldTrace(channel)` which checks `hasSubscribers !== false` — treating `undefined` (Node 18) as "might have subscribers, trace unconditionally" and `false` (Node 20+) as "definitely no subscribers, skip". Also removes the now-unnecessary test skip logic since TracingChannel does exist on Node 18. --- packages/pg-pool/diagnostics.js | 10 +++++++++- packages/pg-pool/index.js | 8 ++++---- packages/pg-pool/test/diagnostics.js | 6 ++---- packages/pg/lib/client.js | 8 ++++---- packages/pg/lib/diagnostics.js | 10 +++++++++- .../pg/test/unit/client/diagnostics-tests.js | 18 +++++++----------- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/packages/pg-pool/diagnostics.js b/packages/pg-pool/diagnostics.js index 99c4ce4d9..15adc414d 100644 --- a/packages/pg-pool/diagnostics.js +++ b/packages/pg-pool/diagnostics.js @@ -24,4 +24,12 @@ try { // diagnostics_channel not available (non-Node environment) } -module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } +// Check explicitly for `false` rather than truthiness because the aggregated +// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which +// backported TracingChannel but not the getter). When `undefined`, we assume +// there may be subscribers and trace unconditionally. +function shouldTrace(channel) { + return channel.hasSubscribers !== false +} + +module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 42c02fb6b..02740b967 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -1,6 +1,6 @@ 'use strict' const EventEmitter = require('events').EventEmitter -const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } = require('./diagnostics') +const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } = require('./diagnostics') const NOOP = function () {} @@ -179,7 +179,7 @@ class Pool extends EventEmitter { this._clients = this._clients.filter((c) => c !== client) const context = this - if (poolRemoveChannel.hasSubscribers) { + if (shouldTrace(poolRemoveChannel)) { poolRemoveChannel.publish({ client: { processID: client.processID } }) } @@ -201,7 +201,7 @@ class Pool extends EventEmitter { const response = promisify(this.Promise, cb) const result = response.result - if (poolConnectChannel.hasSubscribers) { + if (shouldTrace(poolConnectChannel)) { const context = { pool: { totalCount: this.totalCount, @@ -418,7 +418,7 @@ class Pool extends EventEmitter { this.emit('release', err, client) - if (poolReleaseChannel.hasSubscribers) { + if (shouldTrace(poolReleaseChannel)) { poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined }) } diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index 357bb7b69..436ad3370 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -7,8 +7,6 @@ const it = require('mocha').it const dc = require('diagnostics_channel') const Pool = require('../') -const hasTracingChannel = typeof dc.tracingChannel === 'function' - function mockClient(methods) { return function () { const client = new EventEmitter() @@ -25,7 +23,7 @@ function mockClient(methods) { describe('diagnostics channels', function () { describe('pg:pool:connect', function () { - ;(hasTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { + it('publishes start event when connect is called', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { @@ -62,7 +60,7 @@ describe('diagnostics channels', function () { }) }) }) - ;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { + it('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 70d300638..b8ed83750 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -9,7 +9,7 @@ const Query = require('./query') const defaults = require('./defaults') const Connection = require('./connection') const crypto = require('./crypto/utils') -const { queryChannel, connectionChannel } = require('./diagnostics') +const { queryChannel, connectionChannel, shouldTrace } = require('./diagnostics') const activeQueryDeprecationNotice = nodeUtils.deprecate( () => {}, @@ -221,7 +221,7 @@ class Client extends EventEmitter { connect(callback) { if (callback) { - if (connectionChannel.hasSubscribers) { + if (shouldTrace(connectionChannel)) { const context = { connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, } @@ -240,7 +240,7 @@ class Client extends EventEmitter { }) }) - if (connectionChannel.hasSubscribers) { + if (shouldTrace(connectionChannel)) { const context = { connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, } @@ -726,7 +726,7 @@ class Client extends EventEmitter { this._pulseQueryQueue() } - if (queryChannel.hasSubscribers) { + if (shouldTrace(queryChannel)) { const context = { query: { text: query.text, name: query.name, rowMode: query._rowMode }, client: { diff --git a/packages/pg/lib/diagnostics.js b/packages/pg/lib/diagnostics.js index ca991b535..03449a04b 100644 --- a/packages/pg/lib/diagnostics.js +++ b/packages/pg/lib/diagnostics.js @@ -20,4 +20,12 @@ try { // diagnostics_channel not available (non-Node environment) } -module.exports = { queryChannel, connectionChannel } +// Check explicitly for `false` rather than truthiness because the aggregated +// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which +// backported TracingChannel but not the getter). When `undefined`, we assume +// there may be subscribers and trace unconditionally. +function shouldTrace(channel) { + return channel.hasSubscribers !== false +} + +module.exports = { queryChannel, connectionChannel, shouldTrace } diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js index 46aed29f9..41bd264fd 100644 --- a/packages/pg/test/unit/client/diagnostics-tests.js +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -3,15 +3,11 @@ const helper = require('./test-helper') const assert = require('assert') const dc = require('diagnostics_channel') -const hasTracingChannel = typeof dc.tracingChannel === 'function' - const suite = new helper.Suite() const test = suite.test.bind(suite) -// pass undefined as callback to skip when TracingChannel is unavailable -const testTracing = (name, cb) => test(name, hasTracingChannel ? cb : undefined) -testTracing('query diagnostics channel', function () { - testTracing('publishes start and asyncEnd on successful query', function (done) { +test('query diagnostics channel', function () { + test('publishes start and asyncEnd on successful query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -54,7 +50,7 @@ testTracing('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - testTracing('publishes error on failed query', function (done) { + test('publishes error on failed query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -91,7 +87,7 @@ testTracing('query diagnostics channel', function () { }) }) - testTracing('query context includes client info', function (done) { + test('query context includes client info', function (done) { const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) client.connection.emit('readyForQuery') @@ -124,7 +120,7 @@ testTracing('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - testTracing('promise query publishes diagnostics', function (done) { + test('promise query publishes diagnostics', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -158,8 +154,8 @@ testTracing('query diagnostics channel', function () { }) }) -testTracing('connection diagnostics channel', function () { - testTracing('publishes start on connect with callback', function (done) { +test('connection diagnostics channel', function () { + test('publishes start on connect with callback', function (done) { const Connection = require('../../../lib/connection') const { Client } = helper From 10f4438b2c059a6c57d99d6e0d41706f8f0cede3 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Sun, 15 Mar 2026 03:46:40 -0400 Subject: [PATCH 05/10] Skip TracingChannel tests on Node < 19.9 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Node 18 backported TracingChannel but with a buggy implementation — unsubscribing and resubscribing to the same channel crashes internally (`_subscribers` becomes undefined). Node 16 has no TracingChannel at all. Gate tests on `hasStableTracingChannel` which checks both that `dc.tracingChannel` exists AND that the aggregated `hasSubscribers` getter returns a boolean (only true on Node 19.9+/20.5+). TracingChannel tests: skipped on Node 16/18, run on Node 20+ Plain channel tests (release/remove): run on all versions --- packages/pg-pool/test/diagnostics.js | 10 +++++++-- .../pg/test/unit/client/diagnostics-tests.js | 22 +++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index 436ad3370..1038a1300 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -7,6 +7,12 @@ const it = require('mocha').it const dc = require('diagnostics_channel') const Pool = require('../') +// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter +// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tracing +// tests on older versions where TracingChannel is missing or has internal bugs. +const hasStableTracingChannel = + typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:pool:test:probe').hasSubscribers === 'boolean' + function mockClient(methods) { return function () { const client = new EventEmitter() @@ -23,7 +29,7 @@ function mockClient(methods) { describe('diagnostics channels', function () { describe('pg:pool:connect', function () { - it('publishes start event when connect is called', function (done) { + ;(hasStableTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { @@ -60,7 +66,7 @@ describe('diagnostics channels', function () { }) }) }) - it('enriches context with client info on asyncEnd', function (done) { + ;(hasStableTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js index 41bd264fd..0db837eec 100644 --- a/packages/pg/test/unit/client/diagnostics-tests.js +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -3,11 +3,19 @@ const helper = require('./test-helper') const assert = require('assert') const dc = require('diagnostics_channel') +// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter +// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tests on +// older versions where TracingChannel is missing or has internal bugs. +const hasStableTracingChannel = + typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:test:probe').hasSubscribers === 'boolean' + const suite = new helper.Suite() const test = suite.test.bind(suite) +// pass undefined as callback to skip when TracingChannel is unavailable/unstable +const testTracing = (name, cb) => test(name, hasStableTracingChannel ? cb : undefined) -test('query diagnostics channel', function () { - test('publishes start and asyncEnd on successful query', function (done) { +testTracing('query diagnostics channel', function () { + testTracing('publishes start and asyncEnd on successful query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -50,7 +58,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('publishes error on failed query', function (done) { + testTracing('publishes error on failed query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -87,7 +95,7 @@ test('query diagnostics channel', function () { }) }) - test('query context includes client info', function (done) { + testTracing('query context includes client info', function (done) { const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) client.connection.emit('readyForQuery') @@ -120,7 +128,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('promise query publishes diagnostics', function (done) { + testTracing('promise query publishes diagnostics', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -154,8 +162,8 @@ test('query diagnostics channel', function () { }) }) -test('connection diagnostics channel', function () { - test('publishes start on connect with callback', function (done) { +testTracing('connection diagnostics channel', function () { + testTracing('publishes start on connect with callback', function (done) { const Connection = require('../../../lib/connection') const { Client } = helper From 8feb2f55c043491e217c78c77b2dcce9d198c43b Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Sun, 15 Mar 2026 03:57:45 -0400 Subject: [PATCH 06/10] fix: format --- packages/pg/lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index b8ed83750..7b15147c8 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -726,7 +726,7 @@ class Client extends EventEmitter { this._pulseQueryQueue() } - if (shouldTrace(queryChannel)) { + if (shouldTrace(queryChannel) && query.callback) { const context = { query: { text: query.text, name: query.name, rowMode: query._rowMode }, client: { From f722fefa41168596aabac0816704c93a70c5d6a9 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Sun, 15 Mar 2026 04:04:36 -0400 Subject: [PATCH 07/10] fix: use traceCallback for connect promise path to preserve custom Promise type tracePromise wraps the result in a native Promise, which breaks clients configured with a custom Promise implementation (e.g. bluebird). Switch to traceCallback inside the user's this._Promise constructor so the returned promise type is always correct. --- packages/pg/lib/client.js | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 7b15147c8..f8079963b 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -232,22 +232,20 @@ class Client extends EventEmitter { return } - const connectPromise = () => - new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) reject(error) - else resolve(this) - }) - }) - - if (shouldTrace(connectionChannel)) { - const context = { - connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + return new this._Promise((resolve, reject) => { + const callback = (error) => { + if (error) reject(error) + else resolve(this) } - return connectionChannel.tracePromise(connectPromise, context) - } - - return connectPromise() + if (shouldTrace(connectionChannel)) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) + } else { + this._connect(callback) + } + }) } _attachListeners(con) { From eee5e19ceafc8031fe13bfcaa1678b785b971211 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 2 Jun 2026 15:27:58 -0400 Subject: [PATCH 08/10] fix: drop internal _rowMode from query tracing context --- packages/pg/lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index f8079963b..4f2cd3fe5 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -726,7 +726,7 @@ class Client extends EventEmitter { if (shouldTrace(queryChannel) && query.callback) { const context = { - query: { text: query.text, name: query.name, rowMode: query._rowMode }, + query: { text: query.text, name: query.name }, client: { database: this.database, host: this.host, From 64193c774b9de1a7b01e40dc1e4e085a8c083728 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 2 Jun 2026 15:36:34 -0400 Subject: [PATCH 09/10] fix: include diagnostics.js in pg-pool published files --- packages/pg-pool/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/pg-pool/package.json b/packages/pg-pool/package.json index 6b9f60155..a075de4fe 100644 --- a/packages/pg-pool/package.json +++ b/packages/pg-pool/package.json @@ -45,6 +45,7 @@ }, "files": [ "index.js", + "diagnostics.js", "esm" ] } From cf9ce13d03faeb6ff4f596338cd0f4a2f9b7b02a Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 2 Jun 2026 15:41:06 -0400 Subject: [PATCH 10/10] docs: add tracing channels documentation --- docs/pages/features/_meta.js | 1 + docs/pages/features/tracing.mdx | 229 ++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 docs/pages/features/tracing.mdx diff --git a/docs/pages/features/_meta.js b/docs/pages/features/_meta.js index 62f1660ca..52e691126 100644 --- a/docs/pages/features/_meta.js +++ b/docs/pages/features/_meta.js @@ -8,4 +8,5 @@ export default { native: 'Native', esm: 'ESM', callbacks: 'Callbacks', + tracing: 'Tracing Channels', } diff --git a/docs/pages/features/tracing.mdx b/docs/pages/features/tracing.mdx new file mode 100644 index 000000000..43caa6ed7 --- /dev/null +++ b/docs/pages/features/tracing.mdx @@ -0,0 +1,229 @@ +--- +title: Tracing Channels +--- + +node-postgres publishes lifecycle events on a set of named tracing channels so +you can instrument queries, connections, and pool activity without +monkey-patching. If you're building an APM integration, custom tracer, or just +want structured logging of your database calls, you can subscribe to these +channels and node-postgres will tell you when things happen. + +Tracing is built on +[`node:diagnostics_channel`](https://nodejs.org/api/diagnostics_channel.html) +and requires Node.js 20+. On older versions or non-Node runtimes it silently +no-ops. When nothing is listening the overhead is zero since every emission site +is guarded by a `hasSubscribers` check. + +## Quick start + +Here's a minimal example that logs every query with its duration: + +```js +import dc from 'node:diagnostics_channel' + +const timings = new WeakMap() +const channel = dc.tracingChannel('pg:query') + +channel.subscribe({ + start(ctx) { + timings.set(ctx, process.hrtime.bigint()) + }, + asyncEnd(ctx) { + const elapsed = Number(process.hrtime.bigint() - timings.get(ctx)) / 1e6 + console.log(`${ctx.query.text} completed in ${elapsed.toFixed(2)}ms`) + timings.delete(ctx) + }, + error(ctx) { + timings.delete(ctx) + }, + end() {}, + asyncStart() {}, +}) +``` + +You don't need to change how you create clients or pools. Just subscribe to the +channel and node-postgres handles the rest. + +## The channels + +### TracingChannels + +These emit the full lifecycle: `start`, `end`, `asyncStart`, `asyncEnd`, and +`error`. + +| Channel | Fires for | +| --- | --- | +| `pg:query` | Each `client.query()` call | +| `pg:connection` | Each `client.connect()` call | +| `pg:pool:connect` | Each `pool.connect()` call (acquiring a client from the pool) | + +### Plain channels + +These publish a single message. Subscribe with `dc.channel(name).subscribe(cb)`. + +| Channel | Fires for | +| --- | --- | +| `pg:pool:release` | A client is released back to the pool | +| `pg:pool:remove` | A client is removed from the pool | + +### How they fit together + +A typical pooled request moves through the channels in this order: + +```text +pg:pool:connect acquire a client from the pool + └─ pg:connection connect (only if the pool creates a new client) +pg:query execute the query +pg:pool:release release the client back to the pool +``` + +`pg:connection` only fires when the pool has to establish a new connection. For +reused clients it is skipped. `pg:pool:remove` fires separately when a client +is evicted (e.g. after an error or when `maxUses` is exceeded). + +## Lifecycle events + +Each tracing channel exposes five sub-channels that fire in a fixed order +depending on whether the operation completes synchronously or asynchronously: + +- **Synchronous success:** `start` -> `end` +- **Synchronous failure:** `start` -> `error` -> `end` +- **Asynchronous success:** `start` -> `end` -> `asyncStart` -> `asyncEnd` +- **Asynchronous failure:** `start` -> `end` -> `asyncStart` -> `error` -> `asyncEnd` + +In practice queries and connections are always asynchronous, so you'll mostly +work with `start` and `asyncEnd`. The context object is shared across all +events for a single operation, and properties like `result` and `error` are +added as the operation progresses. + +## Context payloads + +### pg:query + +```js +{ + query: { + text: 'SELECT $1::int', // the query text + name: undefined, // prepared statement name, if any + }, + client: { + database: 'mydb', + host: 'localhost', + port: 5432, + user: 'postgres', + processID: 123, // PostgreSQL backend process ID + ssl: false, + }, + // added on asyncEnd: + result: { + rowCount: 1, + command: 'SELECT', + }, +} +``` + +### pg:connection + +```js +{ + connection: { + database: 'mydb', + host: 'localhost', + port: 5432, + user: 'postgres', + ssl: false, + }, +} +``` + +### pg:pool:connect + +```js +{ + pool: { + totalCount: 2, // total clients in the pool + idleCount: 1, // idle clients available + waitingCount: 0, // callers waiting for a client + maxSize: 10, // configured pool maximum + }, + // added on asyncEnd: + client: { + processID: 123, + reused: true, // whether the client was already in the pool + }, +} +``` + +### pg:pool:release + +```js +{ + client: { processID: 123 }, + error: undefined, // the Error passed to release(err), if any +} +``` + +### pg:pool:remove + +```js +{ + client: { processID: 123 }, +} +``` + +## More examples + +### Monitoring pool usage + +```js +import dc from 'node:diagnostics_channel' + +const poolConnect = dc.tracingChannel('pg:pool:connect') +const poolRelease = dc.channel('pg:pool:release') + +poolConnect.subscribe({ + start(ctx) { + console.log('pool checkout:', ctx.pool.idleCount, 'idle,', ctx.pool.waitingCount, 'waiting') + }, + asyncEnd(ctx) { + console.log('checked out client', ctx.client.processID, '(reused:', ctx.client.reused + ')') + }, + error() {}, + end() {}, + asyncStart() {}, +}) + +poolRelease.subscribe((msg) => { + console.log('client', msg.client.processID, 'released') +}) +``` + +### Subscribing to all query events + +```js +import dc from 'node:diagnostics_channel' + +const channel = dc.tracingChannel('pg:query') + +channel.subscribe({ + start(ctx) { + console.log('query started:', ctx.query.text) + }, + asyncEnd(ctx) { + console.log('query completed:', ctx.result.command, ctx.result.rowCount, 'rows') + }, + error(ctx) { + console.error('query failed:', ctx.error) + }, + end() {}, + asyncStart() {}, +}) +``` + +## Notes + +- These channels report observability events. They are not hooks for altering + behavior; mutating a context payload does not change node-postgres internals. +- The plain channels (`pg:pool:release`, `pg:pool:remove`) are not + TracingChannels because they represent point-in-time events with no async + continuation.