-
-
Notifications
You must be signed in to change notification settings - Fork 46
fix: capture ParameterDescription to resolve timestamp OIDs for EXPLAIN #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
99b7c20
a0f8e19
1ab151e
147ca42
3a55269
6e43e3d
e6682d9
709ef51
e53db2f
c2c23f7
f56daf3
631015d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| module github.com/mickamy/sql-tap | ||
|
|
||
| go 1.25.0 | ||
| go 1.26.1 | ||
|
|
||
| require ( | ||
| github.com/alecthomas/chroma/v2 v2.23.1 | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -64,7 +64,7 @@ func startProxy(t *testing.T, upstream string) (*mproxy.Proxy, string) { | |||||
| _ = lis.Close() | ||||||
|
|
||||||
| p := mproxy.New(addr, upstream) | ||||||
| ctx, cancel := context.WithCancel(t.Context()) | ||||||
| ctx, cancel := context.WithCancel(t.Context()) //nolint:gosec // cancel is deferred below via t.Cleanup | ||||||
|
||||||
| ctx, cancel := context.WithCancel(t.Context()) //nolint:gosec // cancel is deferred below via t.Cleanup | |
| ctx, cancel := context.WithCancel(t.Context()) //nolint:govet // cancel is deferred below via t.Cleanup |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,12 +42,22 @@ type conn struct { | |
| events chan<- proxy.Event | ||
|
|
||
| // Extended query state. | ||
| // preparedStmts is only accessed by the client→upstream goroutine. | ||
| preparedStmts map[string]string // stmt name -> query | ||
| preparedStmtOIDs map[string][]uint32 // stmt name -> parameter OIDs | ||
| lastParse string // query from most recent Parse | ||
| lastParamOIDs []uint32 // parameter OIDs from most recent Parse | ||
| lastBindArgs []string // args from most recent Bind | ||
| lastBindStmt string // stmt name from most recent Bind | ||
| // pendingDescribes is a FIFO queue of statement names from Describe('S') | ||
| // messages. ParameterDescription responses arrive in the same order, so | ||
| // we pop from the front to match each response to its request. | ||
| pendingDescribes []string | ||
|
|
||
| // stmtMu protects OID-related fields that are written by | ||
| // handleParameterDescription (upstream→client goroutine) and read by | ||
| // handleBind / written by handleParse (client→upstream goroutine). | ||
| stmtMu sync.Mutex | ||
|
|
||
| // Transaction tracking. | ||
| activeTxID string | ||
|
|
@@ -275,6 +285,8 @@ func (c *conn) captureClientMsg(msg pgproto.FrontendMessage) { | |
| c.handleSimpleQuery(m) | ||
| case *pgproto.Parse: | ||
| c.handleParse(m) | ||
| case *pgproto.Describe: | ||
| c.handleDescribe(m) | ||
| case *pgproto.Bind: | ||
| c.handleBind(m) | ||
| case *pgproto.Execute: | ||
|
|
@@ -284,10 +296,14 @@ func (c *conn) captureClientMsg(msg pgproto.FrontendMessage) { | |
|
|
||
| func (c *conn) captureUpstreamMsg(msg pgproto.BackendMessage) { | ||
| switch m := msg.(type) { | ||
| case *pgproto.ParameterDescription: | ||
| c.handleParameterDescription(m) | ||
| case *pgproto.CommandComplete: | ||
| c.handleCommandComplete(m) | ||
| case *pgproto.ErrorResponse: | ||
| c.handleErrorResponse(m) | ||
| case *pgproto.ReadyForQuery: | ||
| c.drainPendingDescribes() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -309,21 +325,69 @@ func (c *conn) handleSimpleQuery(m *pgproto.Query) { | |
|
|
||
| func (c *conn) handleParse(m *pgproto.Parse) { | ||
| c.lastParse = m.Query | ||
| c.stmtMu.Lock() | ||
| c.lastParamOIDs = m.ParameterOIDs | ||
| if m.Name != "" { | ||
| c.preparedStmts[m.Name] = m.Query | ||
| c.preparedStmtOIDs[m.Name] = m.ParameterOIDs | ||
| } | ||
| c.stmtMu.Unlock() | ||
| if m.Name != "" { | ||
| c.preparedStmts[m.Name] = m.Query | ||
| } | ||
| } | ||
|
|
||
| func (c *conn) handleDescribe(m *pgproto.Describe) { | ||
| if m.ObjectType == 'S' { | ||
| c.stmtMu.Lock() | ||
| c.pendingDescribes = append(c.pendingDescribes, m.Name) | ||
| c.stmtMu.Unlock() | ||
|
Comment on lines
+339
to
+343
|
||
| } | ||
| } | ||
|
|
||
| // handleParameterDescription captures the server-resolved parameter OIDs | ||
| // returned by the upstream in response to a Describe(Statement) message. | ||
| // These OIDs are authoritative — they override the OIDs from Parse, which | ||
| // are often all zeros (meaning "let the server decide"). | ||
| // Responses arrive in the same order as the corresponding Describe requests, | ||
| // so we pop from the front of pendingDescribes to match them. | ||
| func (c *conn) handleParameterDescription(m *pgproto.ParameterDescription) { | ||
| c.stmtMu.Lock() | ||
| defer c.stmtMu.Unlock() | ||
|
|
||
| if len(c.pendingDescribes) == 0 { | ||
| return | ||
| } | ||
| name := c.pendingDescribes[0] | ||
| c.pendingDescribes = c.pendingDescribes[1:] | ||
|
|
||
| if name == "" { | ||
| // Unnamed statement: update the fallback OIDs used by unnamed binds. | ||
| c.lastParamOIDs = m.ParameterOIDs | ||
| } else { | ||
| // Named statement: only update its entry without touching lastParamOIDs. | ||
| c.preparedStmtOIDs[name] = m.ParameterOIDs | ||
| } | ||
|
Comment on lines
+339
to
+369
|
||
| } | ||
|
|
||
| // drainPendingDescribes clears any unmatched Describe entries from the queue. | ||
| // Called on ReadyForQuery, which marks the end of a query cycle — any pending | ||
| // entries at this point were skipped by the server due to an earlier error. | ||
| func (c *conn) drainPendingDescribes() { | ||
| c.stmtMu.Lock() | ||
| c.pendingDescribes = nil | ||
| c.stmtMu.Unlock() | ||
| } | ||
|
|
||
| func (c *conn) handleBind(m *pgproto.Bind) { | ||
| c.lastBindStmt = m.PreparedStatement | ||
| c.stmtMu.Lock() | ||
| paramOIDs := c.lastParamOIDs | ||
| if m.PreparedStatement != "" { | ||
| if oids, ok := c.preparedStmtOIDs[m.PreparedStatement]; ok { | ||
| paramOIDs = oids | ||
| } | ||
| } | ||
| c.stmtMu.Unlock() | ||
| c.lastBindArgs = make([]string, len(m.Parameters)) | ||
| for i, p := range m.Parameters { | ||
| oid := uint32(0) | ||
|
|
@@ -363,7 +427,7 @@ func decodeBinaryParam(p []byte, oid uint32) string { | |
| switch len(p) { | ||
| case 1: | ||
| // bool or int8 | ||
| return strconv.Itoa(int(int8(p[0]))) | ||
| return strconv.Itoa(int(int8(p[0]))) //nolint:gosec // interpreting as signed int8 | ||
| case 2: | ||
| return strconv.Itoa(int(int16(binary.BigEndian.Uint16(p)))) //nolint:gosec // interpreting as signed int16 | ||
| case 4: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR title is specific to PostgreSQL
ParameterDescriptionhandling, but this change also bumps the module Go version (and separately modifies CI/docs). This makes the PR harder to review and can complicate rollback. Consider splitting the Go/tooling/docs changes into separate PR(s) unless they are required for the protocol fix.