Skip to content

Commit a7f96ef

Browse files
committed
refactor: implement connection tracking in metrics
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. Right now metrics observation handler does not track database connections but updates a single Gauge based on HasqlPoolObs events. This is problematic because Hasql pool reports various connection events in multiple phases. The connection state machine is not simple and to precisely report the number of connections in various states, it is necessary to track their lifecycles. This change adds a ConnTrack data structure and logic to track database connections lifecycles. At the moment it supports "connected" and "inUse" connection counts precisely. The "pgrst_db_pool_available" metric is implemented on top of ConnTrack instead of a simple Gauge.
1 parent bd5de88 commit a7f96ef

File tree

3 files changed

+86
-19
lines changed

3 files changed

+86
-19
lines changed

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ library
159159
, stm-hamt >= 1.2 && < 2
160160
, focus >= 1.0 && < 2
161161
, some >= 1.0.4.1 && < 2
162+
, uuid >= 1.3 && < 2
162163
-- -fno-spec-constr may help keep compile time memory use in check,
163164
-- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304
164165
-- -optP-Wno-nonportable-include-path

src/PostgREST/Metrics.hs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs.
55
-}
66
module PostgREST.Metrics
77
( init
8+
, ConnTrack
9+
, ConnStats (..)
810
, MetricsState (..)
11+
, connectionCounts
912
, observationMetrics
1013
, metricsToText
1114
) where
@@ -17,12 +20,18 @@ import Prometheus
1720

1821
import PostgREST.Observation
1922

20-
import Protolude
23+
import Control.Arrow ((&&&))
24+
import Data.Bitraversable (bisequenceA)
25+
import Data.Tuple.Extra (both)
26+
import Data.UUID (UUID)
27+
import qualified Focus
28+
import Protolude
29+
import qualified StmHamt.SizedHamt as SH
2130

2231
data MetricsState =
2332
MetricsState {
2433
poolTimeouts :: Counter,
25-
poolAvailable :: Gauge,
34+
connTrack :: ConnTrack,
2635
poolWaiting :: Gauge,
2736
poolMaxSize :: Gauge,
2837
schemaCacheLoads :: Vector Label1 Counter,
@@ -36,7 +45,7 @@ init :: Int -> IO MetricsState
3645
init configDbPoolSize = do
3746
metricState <- MetricsState <$>
3847
register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*>
39-
register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*>
48+
register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*>
4049
register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*>
4150
register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*>
4251
register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*>
@@ -46,20 +55,19 @@ init configDbPoolSize = do
4655
register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions"))
4756
setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize)
4857
pure metricState
58+
where
59+
dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts
60+
where
61+
calcAvailable = (configDbPoolSize -) . inUse
62+
toSample name labels = Sample name labels . encodeUtf8 . show
63+
noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty
4964

5065
-- Only some observations are used as metrics
5166
observationMetrics :: MetricsState -> ObservationHandler
5267
observationMetrics MetricsState{..} obs = case obs of
5368
PoolAcqTimeoutObs -> do
5469
incCounter poolTimeouts
55-
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
56-
SQL.ReadyForUseConnectionStatus -> do
57-
incGauge poolAvailable
58-
SQL.InUseConnectionStatus -> do
59-
decGauge poolAvailable
60-
SQL.TerminatedConnectionStatus _ -> do
61-
decGauge poolAvailable
62-
SQL.ConnectingConnectionStatus -> pure ()
70+
(HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs
6371
PoolRequest ->
6472
incGauge poolWaiting
6573
PoolRequestFullfilled ->
@@ -77,3 +85,28 @@ observationMetrics MetricsState{..} obs = case obs of
7785

7886
metricsToText :: IO LBS.ByteString
7987
metricsToText = exportMetricsAsText
88+
89+
data ConnStats = ConnStats {
90+
connected :: Int,
91+
inUse :: Int
92+
} deriving (Eq, Show)
93+
94+
data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID }
95+
96+
connectionTracker :: IO ConnTrack
97+
connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO
98+
99+
trackConnections :: ConnTrack -> SQL.Observation -> IO ()
100+
trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of
101+
SQL.ReadyForUseConnectionStatus -> atomically $
102+
SH.insert identity uuid connTrackConnected *>
103+
SH.focus Focus.delete identity uuid connTrackInUse
104+
SQL.TerminatedConnectionStatus _ -> atomically $
105+
SH.focus Focus.delete identity uuid connTrackConnected *>
106+
SH.focus Focus.delete identity uuid connTrackInUse
107+
SQL.InUseConnectionStatus -> atomically $
108+
SH.insert identity uuid connTrackInUse
109+
_ -> mempty
110+
111+
connectionCounts :: ConnTrack -> IO ConnStats
112+
connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse)

test/observability/Observation/MetricsSpec.hs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,20 @@
66

77
module Observation.MetricsSpec where
88

9-
import Data.List (lookup)
10-
import Network.Wai (Application)
9+
import Data.List (lookup)
10+
import qualified Hasql.Pool.Observation as SQL
11+
import Network.Wai (Application)
1112
import ObsHelper
12-
import qualified PostgREST.AppState as AppState
13-
import PostgREST.Config (AppConfig (configDbSchemas))
14-
import qualified PostgREST.Metrics as Metrics
13+
import qualified PostgREST.AppState as AppState
14+
import PostgREST.Config (AppConfig (configDbSchemas))
15+
import PostgREST.Metrics (ConnStats (..),
16+
MetricsState (..),
17+
connectionCounts)
1518
import PostgREST.Observation
16-
import Prometheus (getCounter, getVectorWith)
19+
import Prometheus (getCounter, getVectorWith)
1720
import Protolude
18-
import Test.Hspec (SpecWith, describe, it)
19-
import Test.Hspec.Wai (getState)
21+
import Test.Hspec (SpecWith, describe, it)
22+
import Test.Hspec.Wai (getState)
2023

2124
spec :: SpecWith (SpecState, Application)
2225
spec = describe "Server started with metrics enabled" $ do
@@ -47,8 +50,38 @@ spec = describe "Server started with metrics enabled" $ do
4750
-- wait up to 2 secs so that retry can happen
4851
waitFor (2 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- pure x]
4952

53+
it "Should track in use connections" $ do
54+
SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState
55+
let waitFor = waitForObs specObsChan
56+
57+
-- we expect in use connections to be the same once finished
58+
liftIO $ checkState' metrics
59+
[
60+
inUseConnections (+ 0)
61+
] $ do
62+
signal <- newEmptyMVar
63+
-- make sure waiting thread is signaled
64+
bracket_ (pure ()) (putMVar signal ()) $
65+
-- expecting one more connection in use
66+
-- expecting one more connection in use
67+
68+
-- expecting one more connection in use
69+
checkState' metrics
70+
[
71+
inUseConnections (+ 1)
72+
] $ do
73+
-- start a thread hanging on a single connection until signaled
74+
void $ forkIO $ void $ AppState.usePool appState $ liftIO (readMVar signal)
75+
-- main thread waits for ConnectionObservation with InUseConnectionStatus
76+
-- after which used connections count should be incremented
77+
waitFor (1 * sec) "InUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.InUseConnectionStatus)) <- pure x]
78+
79+
-- hanging thread was signaled and should return the connection
80+
waitFor (1 * sec) "ReadyForUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.ReadyForUseConnectionStatus)) <- pure x]
5081
where
5182
-- prometheus-client api to handle vectors is convoluted
5283
schemaCacheLoads label = expectField @"schemaCacheLoads" $
5384
fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter)
85+
86+
inUseConnections = expectField @"connTrack" ((inUse <$>) . connectionCounts)
5487
sec = 1000000

0 commit comments

Comments
 (0)