Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement db-uri-read-replicas #3017

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nix/tools/withTools.nix
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ let
export PGTZ
export PGOPTIONS

# FIXME this should point to a read replica
export PGHOSTREP="$PGHOST"

HBA_FILE="$tmpdir/pg_hba.conf"
echo "local $PGDATABASE some_protected_user password" > "$HBA_FILE"
echo "local $PGDATABASE all trust" >> "$HBA_FILE"
Expand Down
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ library
, optparse-applicative >= 0.13 && < 0.18
, parsec >= 3.1.11 && < 3.2
, protolude >= 0.3.1 && < 0.4
, random >= 1.2.1 && < 1.3
, regex-tdfa >= 1.2.2 && < 1.4
, retry >= 0.7.4 && < 0.10
, scientific >= 0.3.4 && < 0.4
Expand Down
15 changes: 8 additions & 7 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ module PostgREST.Admin
( runAdmin
) where

import qualified Data.Text as T
import qualified Hasql.Session as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import qualified Data.Text as T
import qualified Hasql.Session as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp

import Control.Monad.Extra (whenJust)

Expand All @@ -36,10 +37,10 @@ admin :: AppState.AppState -> AppConfig -> Wai.Application
admin appState appConfig req respond = do
isMainAppReachable <- any isRight <$> reachMainApp appConfig
isSchemaCacheLoaded <- isJust <$> AppState.getSchemaCache appState
isConnectionUp <-
isConnectionUp <- -- FIXME primary / read-replicas
if configDbChannelEnabled appConfig
then AppState.getIsListenerOn appState
else isRight <$> AppState.usePool appState (SQL.sql "SELECT 1")
else isRight <$> AppState.usePool appState SQL.Read (SQL.sql "SELECT 1")

case Wai.pathInfo req of
["ready"] ->
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ runDbHandler :: AppState.AppState -> SQL.IsolationLevel -> SQL.Mode -> Bool -> B
runDbHandler appState isoLvl mode authenticated prepared handler = do
dbResp <- lift $ do
let transaction = if prepared then SQL.transaction else SQL.unpreparedTransaction
AppState.usePool appState . transaction isoLvl mode $ runExceptT handler
AppState.usePool appState mode . transaction isoLvl mode $ runExceptT handler

resp <-
liftEither . mapLeft Error.PgErr $
Expand Down
79 changes: 59 additions & 20 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module PostgREST.AppState
, putSchemaCache
, putPgVersion
, usePool
, usePoolReadWrite
, loadSchemaCache
, reReadConfig
, connectionWorker
Expand All @@ -40,6 +41,7 @@ import qualified Hasql.Session as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified PostgREST.Error as Error
import PostgREST.Version (prettyVersion)
import qualified Prelude ((!!))

import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
Expand All @@ -51,6 +53,7 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
import Data.Time (ZonedTime, defaultTimeLocale, formatTime,
getZonedTime)
import Data.Time.Clock (UTCTime, getCurrentTime)
import System.Random (randomRIO)

import PostgREST.Config (AppConfig (..),
addFallbackAppName,
Expand All @@ -74,7 +77,7 @@ data AuthResult = AuthResult

data AppState = AppState
-- | Database connection pool
{ statePool :: SQL.Pool
{ statePool :: Pool
-- | Database server version, will be updated by the connectionWorker
, statePgVersion :: IORef PgVersion
-- | No schema cache at the start. Will be filled in by the connectionWorker
Expand Down Expand Up @@ -104,10 +107,18 @@ data AppState = AppState
init :: AppConfig -> IO AppState
init conf = do
pool <- initPool conf
initWithPool pool conf
initWithCompletePool pool conf

data Pool = Pool
{ poolPrimary :: SQL.Pool
, poolReadReplicas :: [SQL.Pool]
}

initWithPool :: SQL.Pool -> AppConfig -> IO AppState
initWithPool pool conf = do
initWithPool pool = initWithCompletePool (Pool pool [])

initWithCompletePool :: Pool -> AppConfig -> IO AppState
initWithCompletePool pool conf = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
Expand Down Expand Up @@ -144,32 +155,57 @@ initWithPool pool conf = do
destroy :: AppState -> IO ()
destroy = destroyPool

initPool :: AppConfig -> IO SQL.Pool
initPool AppConfig{..} =
SQL.acquire
configDbPoolSize
(fromIntegral configDbPoolAcquisitionTimeout)
(fromIntegral configDbPoolMaxLifetime)
(fromIntegral configDbPoolMaxIdletime)
(toUtf8 $ addFallbackAppName prettyVersion configDbUri)
initPool :: AppConfig -> IO Pool
initPool AppConfig{..} = do
primary <- mkPool configDbUri
readReplicas <- traverse mkPool configDbUriReadReplicas
return $ Pool primary readReplicas
where
mkPool uri = SQL.acquire
configDbPoolSize
(fromIntegral configDbPoolAcquisitionTimeout)
(fromIntegral configDbPoolMaxLifetime)
(fromIntegral configDbPoolMaxIdletime)
(toUtf8 $ addFallbackAppName prettyVersion uri)

releasePool :: Pool -> IO ()
releasePool (Pool primary readReplicas) = do
SQL.release primary
mapM_ SQL.release readReplicas

-- | Run an action with a database connection.
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} x = do
res <- SQL.use statePool x
usePool :: AppState -> SQL.Mode -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{..} mode x = do
pool <- pickPool statePool mode
res <- SQL.use pool x
whenLeft res (\case
SQL.AcquisitionTimeoutUsageError -> debounceLogAcquisitionTimeout -- this can happen rapidly for many requests, so we debounce
_ -> pure ())
return res

-- | Run an action with a database connection.
usePoolReadWrite :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePoolReadWrite appState = usePool appState SQL.Write

pickPool :: Pool -> SQL.Mode -> IO SQL.Pool
pickPool pool mode =
case mode of
SQL.Write -> return $ poolPrimary pool
SQL.Read ->
case poolReadReplicas pool of
[] -> return $ poolPrimary pool
pools -> do
i <- randomRIO (0, length pools - 1)
return $ pools Prelude.!! i

-- | Flush the connection pool so that any future use of the pool will
-- use connections freshly established after this call.
flushPool :: AppState -> IO ()
flushPool AppState{..} = SQL.release statePool
flushPool AppState{..} = releasePool statePool

-- | Destroy the pool on shutdown.
destroyPool :: AppState -> IO ()
destroyPool AppState{..} = SQL.release statePool
destroyPool AppState{..} = releasePool statePool

getPgVersion :: AppState -> IO PgVersion
getPgVersion = readIORef . statePgVersion
Expand Down Expand Up @@ -245,7 +281,7 @@ loadSchemaCache appState = do
conf@AppConfig{..} <- getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
usePool appState . transaction SQL.ReadCommitted SQL.Read $
usePool appState SQL.Read . transaction SQL.ReadCommitted SQL.Read $
querySchemaCache conf
case result of
Left e -> do
Expand Down Expand Up @@ -342,7 +378,9 @@ establishConnection appState =

getConnectionStatus :: IO ConnectionStatus
getConnectionStatus = do
pgVersion <- usePool appState $ queryPgVersion False -- No need to prepare the query here, as the connection might not be established
-- FIXME usePoolReadWrite in order to use the master database connection
-- we should probably have ConnectionStatus take into account both master and read replicas
pgVersion <- usePoolReadWrite appState $ queryPgVersion False -- No need to prepare the query here, as the connection might not be established
case pgVersion of
Left e -> do
logPgrstError appState e
Expand Down Expand Up @@ -378,7 +416,8 @@ reReadConfig startingUp appState = do
AppConfig{..} <- getConfig appState
dbSettings <-
if configDbConfig then do
qDbSettings <- usePool appState $ queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements
-- FIXME usePoolReadWrite to query the master database, we could also use the replica if any
qDbSettings <- usePoolReadWrite appState $ queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements
case qDbSettings of
Left e -> do
logWithZTime appState
Expand All @@ -396,7 +435,7 @@ reReadConfig startingUp appState = do
pure mempty
(roleSettings, roleIsolationLvl) <-
if configDbConfig then do
rSettings <- usePool appState $ queryRoleSettings configDbPreparedStatements
rSettings <- usePoolReadWrite appState $ queryRoleSettings configDbPreparedStatements -- FIXME read-only?
case rSettings of
Left e -> do
logWithZTime appState "An error ocurred when trying to query the role settings"
Expand Down
2 changes: 1 addition & 1 deletion src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ dumpSchema appState = do
conf@AppConfig{..} <- AppState.getConfig appState
result <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
AppState.usePool appState $
AppState.usePool appState SQL.Read $
transaction SQL.ReadCommitted SQL.Read $
querySchemaCache conf
case result of
Expand Down
7 changes: 7 additions & 0 deletions src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ data AppConfig = AppConfig
, configDbTxAllowOverride :: Bool
, configDbTxRollbackAll :: Bool
, configDbUri :: Text
, configDbUriReadReplicas :: [Text]
, configDbUseLegacyGucs :: Bool
, configFilePath :: Maybe FilePath
, configJWKS :: Maybe JWKSet
Expand Down Expand Up @@ -157,6 +158,7 @@ toText conf =
,("db-pre-config", q . maybe mempty dumpQi . configDbPreConfig)
,("db-tx-end", q . showTxEnd)
,("db-uri", q . configDbUri)
,("db-uri-read-replicas", q . T.intercalate " " . configDbUriReadReplicas)
,("db-use-legacy-gucs", T.toLower . show . configDbUseLegacyGucs)
,("jwt-aud", T.decodeUtf8 . LBS.toStrict . JSON.encode . maybe "" toJSON . configJwtAudience)
,("jwt-role-claim-key", q . T.intercalate mempty . fmap dumpJSPath . configJwtRoleClaimKey)
Expand Down Expand Up @@ -257,6 +259,7 @@ parser optPath env dbSettings roleSettings roleIsolationLvl =
<*> parseTxEnd "db-tx-end" snd
<*> parseTxEnd "db-tx-end" fst
<*> (fromMaybe "postgresql://" <$> optString "db-uri")
<*> (maybe [] split <$> optValue "db-uri-read-replicas")
<*> (fromMaybe True <$> optBool "db-use-legacy-gucs")
<*> pure optPath
<*> pure Nothing
Expand Down Expand Up @@ -403,6 +406,10 @@ parser optPath env dbSettings roleSettings roleIsolationLvl =
Nothing -> (> 0) <$> (readMaybe s :: Maybe Integer)
coerceBool _ = Nothing

split :: C.Value -> [Text]
split (C.String s) = T.words s
split _ = []

splitOnCommas :: C.Value -> [Text]
splitOnCommas (C.String s) = T.strip <$> T.splitOn "," s
splitOnCommas _ = []
Expand Down
14 changes: 13 additions & 1 deletion test/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ def dburi():
dbname = os.environ["PGDATABASE"]
host = os.environ["PGHOST"]
user = os.environ["PGUSER"]
return f"postgresql://?dbname={dbname}&host={host}&user={user}".encode()
port = os.environ.get("PGPORT", "5432")
return f"postgresql://?dbname={dbname}&host={host}&port={port}&user={user}".encode()


@pytest.fixture
def dburi_replica():
"Postgres database connection URI."
dbname = os.environ["PGDATABASE"]
host = os.environ["PGHOSTREP"]
user = os.environ["PGUSER"]
port = os.environ.get("PGPORTREP", "5432")
return f"postgresql://?dbname={dbname}&host={host}&port={port}&user={user}".encode()


@pytest.fixture
Expand All @@ -29,6 +40,7 @@ def baseenv():
return {
"PGDATABASE": os.environ["PGDATABASE"],
"PGHOST": os.environ["PGHOST"],
"PGPORT": os.environ.get("PGPORT", "5432"),
"PGUSER": os.environ["PGUSER"],
}

Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/aliases.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"aliased\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/boolean-numeric.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/boolean-string.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "postgresql://"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://otherexample.org"
jwt-role-claim-key = ".\"other\".\"pre_config_role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/no-defaults-with-db.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = "postgrest.preconf"
db-tx-end = "commit-allow-override"
db-uri = "postgresql://"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://example.org"
jwt-role-claim-key = ".\"a\".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/no-defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "tmp_db"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://postgrest.org"
jwt-role-claim-key = ".\"user\"[0].\"real-role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/expected/types.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = true
db-pre-config = ""
db-tx-end = "commit"
db-uri = "postgresql://"
db-uri-read-replicas = ""
db-use-legacy-gucs = true
jwt-aud = ""
jwt-role-claim-key = ".\"role\""
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/no-defaults-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ PGRST_DB_CONFIG: false
PGRST_DB_PRE_CONFIG: "postgrest.pre_config"
PGRST_DB_TX_END: rollback-allow-override
PGRST_DB_URI: tmp_db
PGRST_DB_URI_READ_REPLICAS: rep_1 rep_2
PGRST_DB_USE_LEGACY_GUCS: false
PGRST_JWT_AUD: 'https://postgrest.org'
PGRST_JWT_ROLE_CLAIM_KEY: '.user[0]."real-role"'
Expand Down
1 change: 1 addition & 0 deletions test/io/configs/no-defaults.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ db-config = false
db-pre-config = "postgrest.pre_config"
db-tx-end = "rollback-allow-override"
db-uri = "tmp_db"
db-uri-read-replicas = "rep_1 rep_2"
db-use-legacy-gucs = false
jwt-aud = "https://postgrest.org"
jwt-role-claim-key = ".user[0].\"real-role\""
Expand Down
4 changes: 2 additions & 2 deletions test/io/fixtures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ GRANT USAGE ON SCHEMA v1 TO postgrest_test_anonymous;
CREATE TABLE authors_only ();
GRANT SELECT ON authors_only TO postgrest_test_author;

CREATE TABLE projects AS SELECT FROM generate_series(1,5);
GRANT SELECT ON projects TO postgrest_test_anonymous, postgrest_test_w_superuser_settings;
CREATE TABLE projects AS SELECT * FROM generate_series(1,5) AS id;
GRANT SELECT, INSERT ON projects TO postgrest_test_anonymous, postgrest_test_w_superuser_settings;

create function get_guc_value(name text) returns text as $$
select nullif(current_setting(name), '')::text;
Expand Down
1 change: 1 addition & 0 deletions test/io/postgrest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def metapostgrest():
env = {
"PGDATABASE": os.environ["PGDATABASE"],
"PGHOST": os.environ["PGHOST"],
"PGPORT": os.environ.get("PGPORT", "5432"),
"PGUSER": role,
"PGRST_DB_ANON_ROLE": role,
"PGRST_DB_CONFIG": "true",
Expand Down
Loading
Loading