Skip to content

Commit

Permalink
Added cassandra migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinbennyofficial committed Nov 16, 2024
1 parent 704a8a6 commit 10c9494
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 9 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ RUN mkdir -p /app/logs /app/migrations && chmod 777 /app/logs /app/migrations

# Copy the migrations
COPY src/postgres/migrations ./migrations/postgres
COPY src/cassandra/migrations ./migrations/cassandra

# Expose port
EXPOSE 8080
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:
networks:
- backend


postgres:
image: bitnami/postgresql
container_name: mydb_container
Expand Down
12 changes: 8 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gocql/gocql v1.7.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/golang-migrate/migrate/v4 v4.18.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -50,12 +53,13 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4CV3uAuvHGC+Y=
github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
Expand Down Expand Up @@ -103,6 +112,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
Expand Down Expand Up @@ -138,10 +148,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
Expand All @@ -151,8 +165,12 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
15 changes: 13 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func main() {
defer pgConn.Close()

if cfg.Postgres.Migrations.Enabled {
logger.Info().Msg("Running database migrations")
logger.Info().Msg("Running postgres migrations")
if err := postgres.Migrate(pgConn, cfg.Postgres.Migrations.Path); err != nil {
logger.Fatal().Err(err).Msg("Failed to run database migrations")
logger.Fatal().Err(err).Msg("Failed to run postgres migrations")
}
logger.Info().Msg("Database migrations completed successfully")
}
Expand All @@ -41,9 +41,20 @@ func main() {
defer cassandraClient.Close()



if cfg.Cassandra.Migrations.Enabled {
err:=cassandra.Migrate(cassandraClient,logger, cfg.Cassandra.Migrations.Path, cfg.Cassandra.Keyspace)
if err!=nil{
logger.Fatal().Err(err).Msg("Failed to run cassandra migrations")
// logger.Error().Err(err)
}
}


redisClient,err := redis.NewRedisClient(cfg.Redis, logger)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to connect to Redis")

}
defer redisClient.Close()

Expand Down
51 changes: 49 additions & 2 deletions src/cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cassandra

import (
"fmt"
"myapp/src/config"
"time"

Expand All @@ -24,13 +25,25 @@ func NewCassandraConnection(config config.CassandraConfig, log zerolog.Logger) (
var session *gocql.Session
var err error

maxRetries := 10
maxRetries := 20
retryDelay := 5 * time.Second

for attempt := 1; attempt <= maxRetries; attempt++ {
session, err = cluster.CreateSession()
if err == nil {
log.Info().Msgf("Connected to Cassandra on attempt %d", attempt)
if err=ensureKeyspace(session, config, log);err!=nil{
return nil,err
}
session.Close()

cluster.Keyspace = config.Keyspace
session, err = cluster.CreateSession()
if err!=nil{
return nil,err
}


return session, nil
}

Expand All @@ -41,4 +54,38 @@ func NewCassandraConnection(config config.CassandraConfig, log zerolog.Logger) (
log.Error().Err(err).Msg("Exhausted all retries to connect to Cassandra")
return nil, err

}
}


// EnsureKeyspace checks if the keyspace exists and creates it if it does not
func ensureKeyspace(session *gocql.Session, config config.CassandraConfig, logger zerolog.Logger) error {
// Query to check if the keyspace exists
query := fmt.Sprintf("SELECT keyspace_name FROM system_schema.keyspaces WHERE keyspace_name='%s'", config.Keyspace)
iter := session.Query(query).Iter()

var name string
found := iter.Scan(&name)
iter.Close()

if found {
logger.Info().Msgf("Cassandra Keyspace '%s' already exists", config.Keyspace)
return nil
}

// Create the keyspace if not found
createKeyspaceQuery := fmt.Sprintf(`
CREATE KEYSPACE %s
WITH replication = {
'class': '%s',
'replication_factor': %d
}`,
config.Keyspace,config.Replication.Strategy,config.Replication.Factor)

if err := session.Query(createKeyspaceQuery).Exec(); err != nil {
logger.Error().Err(err).Msg("Failed to create cassandra keyspace")
return fmt.Errorf("failed to create cassandra keyspace: %w", err)
}

logger.Info().Msgf("Cassandra Keyspace '%s' created successfully", config.Keyspace)
return nil
}
53 changes: 53 additions & 0 deletions src/cassandra/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cassandra

import (
"fmt"
"path/filepath"

"github.com/gocql/gocql"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/golang-migrate/migrate/v4/database/cassandra"
"github.com/rs/zerolog"
)

// RunMigrations applies Cassandra migrations from a directory
func Migrate(session *gocql.Session, logger zerolog.Logger, migrationPath string, KeySpaceName string) error {
// Convert migration path to absolute path
absPath, err := filepath.Abs(migrationPath)
if err != nil {
logger.Error().Err(err).Msg("Failed to resolve migration path")
return fmt.Errorf("failed to resolve migration path: %w", err)
}

// Initialize Cassandra migration driver
driver, err := cassandra.WithInstance(session, &cassandra.Config{
KeyspaceName : KeySpaceName,
})
if err != nil {
logger.Error().Err(err).Msg("Failed to create Cassandra migration driver")
return fmt.Errorf("failed to create Cassandra driver: %w", err)
}

// Initialize the migrate instance with file source
m, err := migrate.NewWithDatabaseInstance(fmt.Sprintf("file://%s", absPath), "cassandra", driver)
if err != nil {
logger.Error().Err(err).Msg("Failed to initialize migrations")
return fmt.Errorf("failed to initialize migrations: %w", err)
}

// Apply migrations
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
logger.Error().Err(err).Msg("Migration failed")
return fmt.Errorf("migration failed: %w", err)
}

if err == migrate.ErrNoChange {
logger.Info().Msg("No new Cassandra migrations to apply")
return nil
}

logger.Info().Msg("Cassandra migrations applied successfully!")
return nil
}
1 change: 0 additions & 1 deletion src/cassandra/migrations.go

This file was deleted.

1 change: 1 addition & 0 deletions src/cassandra/migrations/0000001_initial.down.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS orders;
5 changes: 5 additions & 0 deletions src/cassandra/migrations/0000001_initial.up.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS orders (
id UUID PRIMARY KEY,
user_id UUID,
total DECIMAL
);
1 change: 1 addition & 0 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type CassandraConfig struct {
Consistency string `mapstructure:"consistency"`
Replication CasandraReplication `mapstructure:"replication"`
ProtoVersion int `mapstructure:"proto_version"`
Migrations MigrationsConfig `mapstructure:"migrations"`
}

type CasandraReplication struct{
Expand Down

0 comments on commit 10c9494

Please sign in to comment.