Skip to content

PoC: Introduce read-only postgres replicas for accelerated scheduling queries #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ k8s_resource('cortex-mqtt', port_forwards=[
########### Postgres DB for Cortex Core Service
local('sh helm/sync.sh helm/cortex-postgres')
k8s_yaml(helm('./helm/cortex-postgres', name='cortex-postgres'))
k8s_resource('cortex-postgresql', port_forwards=[
k8s_resource('cortex-postgresql-primary', port_forwards=[
port_forward(5432, 5432),
], labels=['Core-Services'])
k8s_resource('cortex-postgresql-read', labels=['Core-Services'])

########### Monitoring
local('sh helm/sync.sh helm/cortex-prometheus-operator')
Expand Down
2 changes: 1 addition & 1 deletion commands/fillup/fillup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
os.Exit(0)
}

db := db.NewPostgresDB(conf.DBConfig{
db := db.NewPostgresDB(conf.DBConnectionConfig{
Host: "localhost",
Port: 5432,
User: "postgres",
Expand Down
3 changes: 1 addition & 2 deletions helm/cortex-postgres/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ apiVersion: v2
name: cortex-postgres
description: Postgres setup for Cortex.
type: application
version: 0.1.1
version: 0.2.0
dependencies:
- name: postgresql
repository: oci://registry-1.docker.io/bitnamicharts
# Use a specific version of this chart which contains compliant images.
version: 15.5.27
# Owner info adds a configmap to the kubernetes cluster with information on
# the service owner. This makes it easier to find out who to contact in case
Expand Down
5 changes: 5 additions & 0 deletions helm/cortex-postgres/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@ owner-info:
enabled: true

postgresql:
architecture: replication
readReplicas:
replicaCount: 2

fullnameOverride: cortex-postgresql
volumePermissions:
enabled: true
auth:
postgresPassword: secret
replicationPassword: secret
service:
ports:
postgresql: 5432
2 changes: 1 addition & 1 deletion helm/cortex/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ apiVersion: v2
name: cortex
description: A Helm chart for deploying Cortex.
type: application
version: 0.19.0
version: 0.20.0
appVersion: 0.1.0
dependencies:
# Owner info adds a configmap to the kubernetes cluster with information on
Expand Down
24 changes: 15 additions & 9 deletions helm/cortex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,21 @@ conf:

# Connection parameters for the database where data is stored.
db:
host: cortex-postgresql
port: 5432
user: postgres
password: secret
database: postgres
reconnect:
maxRetries: 20
retryIntervalSeconds: 1
livenessPingIntervalSeconds: 5
sharedDBConf: &sharedDBConf
port: 5432
user: postgres
password: secret
database: postgres
reconnect:
maxRetries: 20
retryIntervalSeconds: 1
livenessPingIntervalSeconds: 5
primary:
<<: *sharedDBConf
host: cortex-postgresql-primary
readonly:
<<: *sharedDBConf
host: cortex-postgresql-read

# Sync plugins config.
sync:
Expand Down
9 changes: 7 additions & 2 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ type DBReconnectConfig struct {
MaxRetries int `json:"maxRetries"`
}

// Database configuration.
type DBConfig struct {
type DBConnectionConfig struct {
Host string `json:"host"`
Port int `json:"port"`
Database string `json:"database"`
Expand All @@ -45,6 +44,12 @@ type DBConfig struct {
Reconnect DBReconnectConfig `json:"reconnect"`
}

// Database configuration.
type DBConfig struct {
Primary DBConnectionConfig `json:"primary"`
ReadOnly DBConnectionConfig `json:"readonly"`
}

// Metric configuration for the sync/prometheus module.
type SyncPrometheusMetricConfig struct {
// The query to use to fetch the metric.
Expand Down
47 changes: 36 additions & 11 deletions internal/conf/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ func TestNewConfig(t *testing.T) {
"format": "text"
},
"db": {
"host": "cortex-postgresql",
"port": 5432,
"user": "postgres",
"password": "secret",
"database": "postgres"
},
"primary": {
"host": "cortex-postgresql-primary",
"port": 5432,
"user": "postgres",
"password": "secret",
"database": "postgres"
},
"readonly": {
"host": "cortex-postgresql-readonly",
"port": 5432,
"user": "postgres",
"password": "secret",
"database": "postgres"
}
},
"monitoring": {
"port": 2112,
"labels": {
Expand Down Expand Up @@ -176,21 +185,37 @@ func TestNewConfig(t *testing.T) {

// Test DBConfig
dbConfig := config.GetDBConfig()
if dbConfig.Host == "" {
if dbConfig.Primary.Host == "" {
t.Errorf("Expected non-empty DB host, got empty string")
}
if dbConfig.Port == 0 {
if dbConfig.Primary.Port == 0 {
t.Errorf("Expected non-zero DB port, got 0")
}
if dbConfig.Database == "" {
if dbConfig.Primary.Database == "" {
t.Errorf("Expected non-empty DB name, got empty string")
}
if dbConfig.User == "" {
if dbConfig.Primary.User == "" {
t.Errorf("Expected non-empty DB user, got empty string")
}
if dbConfig.Password == "" {
if dbConfig.Primary.Password == "" {
t.Errorf("Expected non-empty DB password, got empty string")
}
// Check the read-only DB configuration as well
if dbConfig.ReadOnly.Host == "" {
t.Errorf("Expected non-empty read-only DB host, got empty string")
}
if dbConfig.ReadOnly.Port == 0 {
t.Errorf("Expected non-zero read-only DB port, got 0")
}
if dbConfig.ReadOnly.Database == "" {
t.Errorf("Expected non-empty read-only DB name, got empty string")
}
if dbConfig.ReadOnly.User == "" {
t.Errorf("Expected non-empty read-only DB user, got empty string")
}
if dbConfig.ReadOnly.Password == "" {
t.Errorf("Expected non-empty read-only DB password, got empty string")
}

// Test MonitoringConfig
monitoringConfig := config.GetMonitoringConfig()
Expand Down
4 changes: 2 additions & 2 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// Wrapper around gorp.DbMap that adds some convenience functions.
type DB struct {
*gorp.DbMap
conf conf.DBConfig
conf conf.DBConnectionConfig
// Monitor for database related metrics like connection attempts.
monitor Monitor
}
Expand All @@ -39,7 +39,7 @@ type Index struct {
}

// Create a new postgres database and wait until it is connected.
func NewPostgresDB(c conf.DBConfig, registry *monitoring.Registry, monitor Monitor) DB {
func NewPostgresDB(c conf.DBConnectionConfig, registry *monitoring.Registry, monitor Monitor) DB {
strip := func(s string) string { return strings.ReplaceAll(s, "\n", "") }
dbURL, err := easypg.URLFrom(easypg.URLParts{
HostName: strip(c.Host),
Expand Down
4 changes: 2 additions & 2 deletions internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestNewDB(t *testing.T) {
if err != nil {
t.Fatalf("failed to convert port: %v", err)
}
config := conf.DBConfig{
config := conf.DBConnectionConfig{
Host: "localhost",
Port: port,
User: "postgres",
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestUnexpectedConnectionLoss(t *testing.T) {
if err != nil {
t.Fatalf("failed to convert port: %v", err)
}
config := conf.DBConfig{
config := conf.DBConnectionConfig{
Host: "localhost",
Port: port,
User: "postgres",
Expand Down
25 changes: 16 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,23 @@ func main() {
// Set up the monitoring registry and database connection.
monitoringConfig := config.GetMonitoringConfig()
registry := monitoring.NewRegistry(monitoringConfig)

database := db.NewPostgresDB(config.GetDBConfig(), registry, db.NewDBMonitor(registry))
defer database.Close()
dbMonitor := db.NewDBMonitor(registry)
dbConf := config.GetDBConfig()

// Check if we want to perform one-time tasks like checks or migrations.
switch taskName {
case "checks":
checks.RunChecks(ctx, config)
return
case "migrate":
migrater := db.NewMigrater(database)
jobDB := db.NewPostgresDB(dbConf.Primary, registry, dbMonitor)
defer jobDB.Close()
migrater := db.NewMigrater(jobDB)
migrater.Migrate(true)
slog.Info("migrations executed")
return
}

go database.CheckLivenessPeriodically()
go runMonitoringServer(ctx, registry, monitoringConfig)

// Run an api server that serves some basic endpoints and can be extended.
Expand All @@ -184,18 +184,25 @@ func main() {
w.WriteHeader(http.StatusOK)
})

var serviceDB db.DB
switch taskName {
case "syncer":
runSyncer(ctx, registry, config.GetSyncConfig(), database)
serviceDB = db.NewPostgresDB(dbConf.Primary, registry, dbMonitor)
runSyncer(ctx, registry, config.GetSyncConfig(), serviceDB)
case "extractor":
runExtractor(registry, config.GetExtractorConfig(), database)
serviceDB = db.NewPostgresDB(dbConf.Primary, registry, dbMonitor)
runExtractor(registry, config.GetExtractorConfig(), serviceDB)
case "scheduler-nova":
runSchedulerNova(mux, registry, config.GetSchedulerConfig(), database)
serviceDB = db.NewPostgresDB(dbConf.ReadOnly, registry, dbMonitor)
runSchedulerNova(mux, registry, config.GetSchedulerConfig(), serviceDB)
case "kpis":
runKPIService(registry, config.GetKPIsConfig(), database)
serviceDB = db.NewPostgresDB(dbConf.ReadOnly, registry, dbMonitor)
runKPIService(registry, config.GetKPIsConfig(), serviceDB)
default:
panic("unknown task")
}
defer serviceDB.Close()
go serviceDB.CheckLivenessPeriodically()

// Run the api server after all other tasks have been started and
// all http handlers have been registered to the mux.
Expand Down
Loading