diff --git a/README.md b/README.md index 24ad41c..6a5f818 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,14 @@ [![Sponsor](https://img.shields.io/badge/Sponsor-❤-ea4aaa?style=flat-square&logo=github)](https://github.com/sponsors/mickamy) -Real-time SQL traffic viewer — proxy daemon + TUI client. +Real-time SQL traffic viewer — proxy daemon + TUI / Web client. sql-tap sits between your application and your database (PostgreSQL, MySQL, or TiDB), capturing every query and displaying it in an interactive terminal UI. Inspect queries, view transactions, and run EXPLAIN — all without changing your application code. -![demo](./docs/demo.gif) +![tui](./docs/tui.gif) +![web](./docs/web.png) ## Installation @@ -103,6 +104,7 @@ Flags: -listen client listen address (required) -upstream upstream database address (required) -grpc gRPC server address for TUI (default: ":9091") + -http HTTP server address for web UI (e.g. ":8080") -dsn-env env var holding DSN for EXPLAIN (default: "DATABASE_URL") -version show version and exit ``` @@ -110,6 +112,23 @@ Flags: Set `DATABASE_URL` (or the env var specified by `-dsn-env`) to enable EXPLAIN support. Without it, the proxy still captures queries but EXPLAIN is disabled. +### Web UI + +Add `--http=:8080` to serve a browser-based viewer: + +```bash +DATABASE_URL="postgres://user:pass@localhost:5432/db?sslmode=disable" \ + sql-tapd --driver=postgres --listen=:5433 --upstream=localhost:5432 --http=:8080 +``` + +Open `http://localhost:8080` in your browser to view queries in real-time. The web UI supports: + +- Real-time query stream via SSE +- Click to inspect query details +- EXPLAIN / EXPLAIN ANALYZE +- Text filter +- Copy query (with or without bound args) + ### sql-tap ``` @@ -229,9 +248,10 @@ sql-tap includes a workaround that detects and discards these split sequences. A │ captures queries │ │ via wire protocol │ └───────────┬───────────┘ - │ gRPC stream + │ gRPC stream / SSE ┌───────────▼───────────┐ │ sql-tap (TUI) │ + │ Browser (Web UI) │ └───────────────────────┘ ``` diff --git a/cmd/sql-tapd/main.go b/cmd/sql-tapd/main.go index d3dde89..980f708 100644 --- a/cmd/sql-tapd/main.go +++ b/cmd/sql-tapd/main.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "syscall" + "time" _ "github.com/go-sql-driver/mysql" _ "github.com/jackc/pgx/v5/stdlib" @@ -20,6 +21,7 @@ import ( "github.com/mickamy/sql-tap/proxy/mysql" "github.com/mickamy/sql-tap/proxy/postgres" "github.com/mickamy/sql-tap/server" + "github.com/mickamy/sql-tap/web" ) var version = "dev" @@ -37,6 +39,7 @@ func main() { upstream := fs.String("upstream", "", "upstream database address (required)") grpcAddr := fs.String("grpc", ":9091", "gRPC server address for TUI") dsnEnv := fs.String("dsn-env", "DATABASE_URL", "environment variable holding DSN for EXPLAIN") + httpAddr := fs.String("http", "", "HTTP server address for web UI (e.g. :8080)") showVersion := fs.Bool("version", false, "show version and exit") _ = fs.Parse(os.Args[1:]) @@ -51,12 +54,12 @@ func main() { os.Exit(1) } - if err := run(*driver, *listen, *upstream, *grpcAddr, *dsnEnv); err != nil { + if err := run(*driver, *listen, *upstream, *grpcAddr, *dsnEnv, *httpAddr); err != nil { log.Fatal(err) } } -func run(driver, listen, upstream, grpcAddr, dsnEnv string) error { +func run(driver, listen, upstream, grpcAddr, dsnEnv, httpAddr string) error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() @@ -100,6 +103,26 @@ func run(driver, listen, upstream, grpcAddr, dsnEnv string) error { } }() + // HTTP server (optional) + if httpAddr != "" { + httpLis, err := lc.Listen(ctx, "tcp", httpAddr) + if err != nil { + return fmt.Errorf("listen http %s: %w", httpAddr, err) + } + webSrv := web.New(b, explainClient) + go func() { + log.Printf("HTTP server listening on %s", httpAddr) + if err := webSrv.Serve(httpLis); err != nil { + log.Printf("http serve: %v", err) + } + }() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = webSrv.Shutdown(shutdownCtx) + }() + } + // Proxy var p proxy.Proxy switch driver { diff --git a/docs/demo.gif b/docs/tui.gif similarity index 100% rename from docs/demo.gif rename to docs/tui.gif diff --git a/docs/web.png b/docs/web.png new file mode 100644 index 0000000..e12f4f6 Binary files /dev/null and b/docs/web.png differ diff --git a/example/compose.yaml b/example/compose.yaml index 47ba965..57fafab 100644 --- a/example/compose.yaml +++ b/example/compose.yaml @@ -56,6 +56,7 @@ services: ports: - "5433:5433" - "9091:9091" + - "8080:8080" environment: DATABASE_URL: postgres://postgres:postgres@postgres:5432/db?sslmode=disable command: @@ -63,6 +64,7 @@ services: - --listen=:5433 - --upstream=postgres:5432 - --grpc=:9091 + - --http=:8080 depends_on: postgres: condition: service_healthy @@ -74,6 +76,7 @@ services: ports: - "3307:3307" - "9092:9092" + - "8081:8081" environment: DATABASE_URL: "mysql:mysql@tcp(mysql:3306)/db" command: @@ -81,6 +84,7 @@ services: - --listen=:3307 - --upstream=mysql:3306 - --grpc=:9092 + - --http=:8081 depends_on: mysql: condition: service_healthy @@ -92,6 +96,7 @@ services: ports: - "3309:3309" - "9093:9093" + - "8082:8082" environment: DATABASE_URL: "mysql:mysql@tcp(mysql9:3306)/db" command: @@ -99,6 +104,7 @@ services: - --listen=:3309 - --upstream=mysql9:3306 - --grpc=:9093 + - --http=:8082 depends_on: mysql9: condition: service_healthy @@ -129,6 +135,7 @@ services: ports: - "4001:4001" - "9094:9094" + - "8083:8083" environment: DATABASE_URL: "root@tcp(tidb:4000)/db" command: @@ -136,6 +143,7 @@ services: - --listen=:4001 - --upstream=tidb:4000 - --grpc=:9094 + - --http=:8083 depends_on: tidb-init: condition: service_completed_successfully diff --git a/web/static/app.js b/web/static/app.js new file mode 100644 index 0000000..aa31e39 --- /dev/null +++ b/web/static/app.js @@ -0,0 +1,224 @@ +const events = []; +let selectedIdx = -1; +let filterText = ''; +let autoScroll = true; + +const tbody = document.getElementById('tbody'); +const tableWrap = document.getElementById('table-wrap'); +const statsEl = document.getElementById('stats'); +const statusEl = document.getElementById('status'); +const filterEl = document.getElementById('filter'); +const detailEl = document.getElementById('detail'); +const explainOutput = document.getElementById('explain-output'); + +filterEl.addEventListener('input', () => { + filterText = filterEl.value.toLowerCase(); + renderTable(); +}); + +tableWrap.addEventListener('scroll', () => { + const el = tableWrap; + autoScroll = el.scrollTop + el.clientHeight >= el.scrollHeight - 20; +}); + +function getFiltered() { + if (!filterText) return events.map((ev, i) => ({ev, idx: i})); + return events.reduce((acc, ev, i) => { + if (ev.query.toLowerCase().includes(filterText) || + ev.op.toLowerCase().includes(filterText) || + (ev.error && ev.error.toLowerCase().includes(filterText))) + acc.push({ev, idx: i}); + return acc; + }, []); +} + +function fmtDur(ms) { + if (ms < 1) return (ms * 1000).toFixed(0) + '\u00b5s'; + if (ms < 1000) return ms.toFixed(1) + 'ms'; + return (ms / 1000).toFixed(2) + 's'; +} + +function fmtTime(iso) { + const d = new Date(iso); + return d.toLocaleTimeString('en-GB', {hour12: false}) + '.' + String(d.getMilliseconds()).padStart(3, '0'); +} + +function escapeHTML(s) { + const el = document.createElement('span'); + el.textContent = s; + return el.innerHTML; +} + +function renderTable() { + const filtered = getFiltered(); + statsEl.textContent = filterText + ? `${filtered.length}/${events.length} queries` + : `${events.length} queries`; + + const fragment = document.createDocumentFragment(); + for (const {ev, idx} of filtered) { + const tr = document.createElement('tr'); + tr.className = 'row' + (idx === selectedIdx ? ' selected' : '') + (ev.error ? ' has-error' : ''); + tr.dataset.idx = idx; + tr.onclick = () => selectRow(idx); + tr.innerHTML = + `${escapeHTML(fmtTime(ev.start_time))}` + + `${escapeHTML(ev.op)}` + + `${escapeHTML(ev.query || '-')}` + + `${escapeHTML(fmtDur(ev.duration_ms))}` + + `${ev.error ? '\u26a0' : ''}`; + fragment.appendChild(tr); + } + tbody.replaceChildren(fragment); + + if (autoScroll) { + tableWrap.scrollTop = tableWrap.scrollHeight; + } +} + +function selectRow(idx) { + selectedIdx = idx; + const ev = events[idx]; + document.getElementById('d-op').textContent = ev.op; + document.getElementById('d-time').textContent = fmtTime(ev.start_time); + document.getElementById('d-dur').textContent = fmtDur(ev.duration_ms); + + const rowsRow = document.getElementById('d-rows-row'); + if (ev.rows_affected > 0) { + document.getElementById('d-rows').textContent = ev.rows_affected; + rowsRow.style.display = ''; + } else { + rowsRow.style.display = 'none'; + } + + const txRow = document.getElementById('d-tx-row'); + if (ev.tx_id) { + document.getElementById('d-tx').textContent = ev.tx_id; + txRow.style.display = ''; + } else { + txRow.style.display = 'none'; + } + + const errRow = document.getElementById('d-err-row'); + if (ev.error) { + document.getElementById('d-err').textContent = ev.error; + errRow.style.display = ''; + } else { + errRow.style.display = 'none'; + } + + document.getElementById('d-query').textContent = ev.query || '-'; + document.getElementById('d-args').textContent = ev.args && ev.args.length > 0 + ? 'Args: [' + ev.args.map(a => "'" + a + "'").join(', ') + ']' + : ''; + + const isQueryOp = ['Query', 'Exec', 'Execute'].includes(ev.op); + document.getElementById('btn-explain').disabled = !isQueryOp || !ev.query; + document.getElementById('btn-analyze').disabled = !isQueryOp || !ev.query; + + explainOutput.className = ''; + detailEl.className = 'open'; + renderTable(); +} + +async function runExplain(analyze) { + if (selectedIdx < 0) return; + const ev = events[selectedIdx]; + const pre = document.getElementById('explain-pre'); + pre.textContent = 'Running...'; + pre.className = ''; + explainOutput.className = 'open'; + + try { + const resp = await fetch('/api/explain', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({query: ev.query, args: ev.args, analyze}), + }); + const data = await resp.json(); + if (data.error) { + pre.textContent = data.error; + pre.className = 'explain-error'; + } else { + pre.textContent = data.plan; + pre.className = ''; + } + } catch (e) { + pre.textContent = 'Request failed: ' + e.message; + pre.className = 'explain-error'; + } +} + +function copyQuery(withArgs) { + if (selectedIdx < 0) return; + const ev = events[selectedIdx]; + let text = ev.query || ''; + if (withArgs && ev.args && ev.args.length > 0) { + text = embedArgs(text, ev.args); + } + if (navigator.clipboard && navigator.clipboard.writeText) { + navigator.clipboard.writeText(text).then(() => showToast('Copied!')).catch(() => fallbackCopy(text)); + } else { + fallbackCopy(text); + } +} + +function embedArgs(query, args) { + // $1, $2, ... (PostgreSQL style) + if (/\$\d+/.test(query)) { + return query.replace(/\$(\d+)/g, (_, n) => { + const i = parseInt(n, 10) - 1; + return i < args.length ? quoteArg(args[i]) : '$' + n; + }); + } + // ? (MySQL style) + let i = 0; + return query.replace(/\?/g, () => i < args.length ? quoteArg(args[i++]) : '?'); +} + +function quoteArg(v) { + if (v === null || v === undefined || v === '') return "''"; + if (!isNaN(v) && v.trim() !== '') return v; + return "'" + v.replace(/'/g, "''") + "'"; +} + +function fallbackCopy(text) { + const ta = document.createElement('textarea'); + ta.value = text; + ta.style.position = 'fixed'; + ta.style.opacity = '0'; + document.body.appendChild(ta); + ta.select(); + document.execCommand('copy'); + document.body.removeChild(ta); + showToast('Copied!'); +} + +function showToast(msg) { + const t = document.getElementById('toast'); + t.textContent = msg; + t.classList.add('show'); + setTimeout(() => t.classList.remove('show'), 2000); +} + +// SSE +function connectSSE() { + const es = new EventSource('/api/events'); + es.onopen = () => { + statusEl.textContent = 'connected'; + statusEl.className = 'status connected'; + }; + es.onmessage = (e) => { + const ev = JSON.parse(e.data); + events.push(ev); + renderTable(); + }; + es.onerror = () => { + statusEl.textContent = 'disconnected'; + statusEl.className = 'status disconnected'; + es.close(); + setTimeout(connectSSE, 2000); + }; +} + +connectSSE(); diff --git a/web/static/index.html b/web/static/index.html new file mode 100644 index 0000000..fbd5301 --- /dev/null +++ b/web/static/index.html @@ -0,0 +1,57 @@ + + + + + +sql-tap + + + +
+
+

sql-tap

+ 0 queries + + connecting... +
+
+ + + + + + + + + + + +
TimeOpQueryDurationError
+
+
+
+
Op:
+
Time:
+
Duration:
+
Rows:
+
Tx:
+
Error:
+
Query:
+
+
+
+ + + + +
+
+

+      
+
+
+
+
+ + + diff --git a/web/static/style.css b/web/static/style.css new file mode 100644 index 0000000..c0cb1cb --- /dev/null +++ b/web/static/style.css @@ -0,0 +1,154 @@ +* { margin: 0; padding: 0; box-sizing: border-box; } + +body { + background: #1e1e1e; + color: #d4d4d4; + font-family: 'SF Mono', 'Cascadia Code', 'Fira Code', monospace; + font-size: 13px; +} + +#app { + display: flex; + flex-direction: column; + height: 100vh; +} + +header { + display: flex; + align-items: center; + gap: 12px; + padding: 8px 16px; + background: #252526; + border-bottom: 1px solid #3c3c3c; +} + +header h1 { font-size: 15px; font-weight: 600; color: #e0e0e0; } +header .stats { color: #888; font-size: 12px; } +header .status { margin-left: auto; font-size: 12px; } +header .status.connected { color: #4ec9b0; } +header .status.disconnected { color: #f44747; } + +#filter { + background: #3c3c3c; + border: 1px solid #555; + color: #d4d4d4; + padding: 4px 8px; + border-radius: 3px; + font-family: inherit; + font-size: 12px; + width: 220px; +} + +#filter:focus { outline: none; border-color: #007acc; } + +#table-wrap { flex: 1; overflow-y: auto; min-height: 0; } + +table { width: 100%; border-collapse: collapse; table-layout: fixed; } + +thead { position: sticky; top: 0; background: #2d2d2d; z-index: 1; } + +th { + text-align: left; + padding: 4px 8px; + color: #888; + font-weight: 600; + border-bottom: 1px solid #3c3c3c; + font-size: 12px; + white-space: nowrap; +} + +td { + padding: 4px 8px; + border-bottom: 1px solid #2a2a2a; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} + +tr.row { cursor: pointer; } +tr.row:hover { background: #2a2d2e; } +tr.row.selected { background: #094771; } +tr.row.has-error td { color: #f44747; } + +.col-time { width: 110px; } +.col-op { width: 70px; } +.col-dur { width: 80px; text-align: right; } +.col-err { width: 60px; } +.col-query { overflow: hidden; text-overflow: ellipsis; } + +#detail { + border-top: 1px solid #3c3c3c; + background: #252526; + max-height: 45vh; + overflow-y: auto; + display: none; +} + +#detail.open { display: block; } +#detail-content { padding: 12px 16px; } + +.detail-row { margin-bottom: 4px; } +.detail-label { color: #888; display: inline-block; width: 80px; } +.detail-value { color: #d4d4d4; } + +.detail-query { + background: #1e1e1e; + padding: 8px; + border-radius: 3px; + margin: 4px 0 8px; + white-space: pre-wrap; + word-break: break-all; + font-size: 12px; + max-height: 120px; + overflow-y: auto; +} + +.detail-args { color: #ce9178; font-size: 12px; margin-bottom: 8px; } +.detail-buttons { display: flex; gap: 8px; margin-top: 8px; } + +.btn { + background: #3c3c3c; + border: 1px solid #555; + color: #d4d4d4; + padding: 4px 12px; + border-radius: 3px; + cursor: pointer; + font-family: inherit; + font-size: 12px; +} + +.btn:hover { background: #505050; } +.btn:disabled { opacity: .5; cursor: default; } + +#explain-output { margin-top: 12px; display: none; } +#explain-output.open { display: block; } + +#explain-pre { + background: #1e1e1e; + padding: 8px; + border-radius: 3px; + white-space: pre; + overflow-x: auto; + font-size: 12px; + max-height: 200px; + overflow-y: auto; +} + +.explain-error { color: #f44747; } + +#toast { + position: fixed; + top: -40px; + left: 50%; + transform: translateX(-50%); + background: #4ec9b0; + color: #1e1e1e; + padding: 6px 20px; + border-radius: 0 0 6px 6px; + font-size: 12px; + font-weight: 600; + transition: top .3s ease; + z-index: 100; +} + +#toast.show { top: 0; } diff --git a/web/web.go b/web/web.go new file mode 100644 index 0000000..eec75f2 --- /dev/null +++ b/web/web.go @@ -0,0 +1,187 @@ +package web + +import ( + "context" + "embed" + "encoding/json" + "fmt" + "io/fs" + "net" + "net/http" + "time" + + "github.com/mickamy/sql-tap/broker" + "github.com/mickamy/sql-tap/explain" + "github.com/mickamy/sql-tap/proxy" +) + +//go:embed static +var staticFS embed.FS + +// Server serves the sql-tap web UI and API endpoints. +type Server struct { + httpServer *http.Server + broker *broker.Broker + explain *explain.Client +} + +// New creates a new web Server backed by the given Broker. +// explainClient may be nil if EXPLAIN is not configured. +func New(b *broker.Broker, explainClient *explain.Client) *Server { + s := &Server{ + broker: b, + explain: explainClient, + } + + mux := http.NewServeMux() + + sub, _ := fs.Sub(staticFS, "static") + mux.Handle("GET /", http.FileServer(http.FS(sub))) + mux.HandleFunc("GET /api/events", s.handleSSE) + mux.HandleFunc("POST /api/explain", s.handleExplain) + + s.httpServer = &http.Server{ + Handler: mux, + ReadHeaderTimeout: 10 * time.Second, + } + return s +} + +// Serve starts the HTTP server on the given listener. +func (s *Server) Serve(lis net.Listener) error { + if err := s.httpServer.Serve(lis); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("web: serve: %w", err) + } + return nil +} + +// Shutdown gracefully stops the HTTP server. +func (s *Server) Shutdown(ctx context.Context) error { + if err := s.httpServer.Shutdown(ctx); err != nil { + return fmt.Errorf("web: shutdown: %w", err) + } + return nil +} + +// Handler returns the HTTP handler for testing. +func (s *Server) Handler() http.Handler { + return s.httpServer.Handler +} + +type eventJSON struct { + ID string `json:"id"` + Op string `json:"op"` + Query string `json:"query"` + Args []string `json:"args"` + StartTime string `json:"start_time"` + DurationMs float64 `json:"duration_ms"` + RowsAffected int64 `json:"rows_affected"` + Error string `json:"error,omitempty"` + TxID string `json:"tx_id,omitempty"` +} + +func eventToJSON(ev proxy.Event) eventJSON { + args := make([]string, len(ev.Args)) + copy(args, ev.Args) + return eventJSON{ + ID: ev.ID, + Op: ev.Op.String(), + Query: ev.Query, + Args: args, + StartTime: ev.StartTime.Format(time.RFC3339Nano), + DurationMs: float64(ev.Duration.Microseconds()) / 1000, + RowsAffected: ev.RowsAffected, + Error: ev.Error, + TxID: ev.TxID, + } +} + +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + flusher.Flush() // send headers immediately + + ch, unsub := s.broker.Subscribe() + defer unsub() + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-ch: + if !ok { + return + } + data, err := json.Marshal(eventToJSON(ev)) + if err != nil { + continue + } + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + } +} + +type explainRequest struct { + Query string `json:"query"` + Args []string `json:"args"` + Analyze bool `json:"analyze"` +} + +type explainResponse struct { + Plan string `json:"plan,omitempty"` + Error string `json:"error,omitempty"` +} + +func (s *Server) handleExplain(w http.ResponseWriter, r *http.Request) { + if s.explain == nil { + writeJSON(w, http.StatusServiceUnavailable, &explainResponse{ + Error: "EXPLAIN is not configured (set DATABASE_URL)", + }) + return + } + + var req explainRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSON(w, http.StatusBadRequest, &explainResponse{ + Error: "invalid request body: " + err.Error(), + }) + return + } + + mode := explain.Explain + if req.Analyze { + mode = explain.Analyze + } + + result, err := s.explain.Run(r.Context(), mode, req.Query, req.Args) + if err != nil { + writeJSON(w, http.StatusInternalServerError, &explainResponse{ + Error: err.Error(), + }) + return + } + + writeJSON(w, http.StatusOK, &explainResponse{Plan: result.Plan}) +} + +func writeJSON(w http.ResponseWriter, status int, v *explainResponse) { + b, err := json.Marshal(v) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write(b) + _, _ = w.Write([]byte("\n")) +} diff --git a/web/web_test.go b/web/web_test.go new file mode 100644 index 0000000..abdc101 --- /dev/null +++ b/web/web_test.go @@ -0,0 +1,169 @@ +package web_test + +import ( + "bufio" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/mickamy/sql-tap/broker" + "github.com/mickamy/sql-tap/proxy" + "github.com/mickamy/sql-tap/web" +) + +func TestStaticFiles(t *testing.T) { + t.Parallel() + + srv := web.New(broker.New(8), nil) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + ctx := context.Background() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, ts.URL+"/", nil) + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test URL + if err != nil { + t.Fatal(err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("got status %d, want 200", resp.StatusCode) + } + ct := resp.Header.Get("Content-Type") + if !strings.Contains(ct, "text/html") { + t.Fatalf("got Content-Type %q, want text/html", ct) + } +} + +func TestSSE_ReceivesEvents(t *testing.T) { + t.Parallel() + + b := broker.New(8) + srv := web.New(b, nil) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, ts.URL+"/api/events", nil) + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test URL + if err != nil { + t.Fatal(err) + } + defer func() { _ = resp.Body.Close() }() + + if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" { + t.Fatalf("got Content-Type %q, want text/event-stream", ct) + } + + // Wait for subscription to be registered. + time.Sleep(50 * time.Millisecond) + + b.Publish(proxy.Event{ + ID: "test-1", + Op: proxy.OpQuery, + Query: "SELECT 1", + StartTime: time.Date(2026, 2, 20, 15, 4, 5, 0, time.UTC), + Duration: 5 * time.Millisecond, + }) + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + data := strings.TrimPrefix(line, "data: ") + + var ev struct { + ID string `json:"id"` + Op string `json:"op"` + Query string `json:"query"` + DurationMs float64 `json:"duration_ms"` + } + if err := json.Unmarshal([]byte(data), &ev); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if ev.ID != "test-1" { + t.Fatalf("got ID %q, want test-1", ev.ID) + } + if ev.Op != "Query" { + t.Fatalf("got Op %q, want Query", ev.Op) + } + if ev.Query != "SELECT 1" { + t.Fatalf("got Query %q, want SELECT 1", ev.Query) + } + if ev.DurationMs < 4.9 || ev.DurationMs > 5.1 { + t.Fatalf("got DurationMs %f, want ~5.0", ev.DurationMs) + } + return // success + } + t.Fatal("no SSE data received") +} + +func TestSSE_DisconnectUnsubscribes(t *testing.T) { + t.Parallel() + + b := broker.New(8) + srv := web.New(b, nil) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + ctx, cancel := context.WithCancel(context.Background()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, ts.URL+"/api/events", nil) + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test URL + if err != nil { + t.Fatal(err) + } + + time.Sleep(50 * time.Millisecond) + if n := b.SubscriberCount(); n != 1 { + t.Fatalf("got %d subscribers, want 1", n) + } + + cancel() + _ = resp.Body.Close() + + // Wait for cleanup. + time.Sleep(100 * time.Millisecond) + if n := b.SubscriberCount(); n != 0 { + t.Fatalf("got %d subscribers after disconnect, want 0", n) + } +} + +func TestExplain_NotConfigured(t *testing.T) { + t.Parallel() + + srv := web.New(broker.New(8), nil) + ts := httptest.NewServer(srv.Handler()) + defer ts.Close() + + body := strings.NewReader(`{"query":"SELECT 1","args":[],"analyze":false}`) + ctx := context.Background() + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, ts.URL+"/api/explain", body) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) //nolint:gosec // test URL + if err != nil { + t.Fatal(err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("got status %d, want 503", resp.StatusCode) + } + + var result struct { + Error string `json:"error"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + t.Fatal(err) + } + if !strings.Contains(result.Error, "not configured") { + t.Fatalf("got error %q, want contains 'not configured'", result.Error) + } +}