Skip to content

Commit

Permalink
(fix): use Schema Registry
Browse files Browse the repository at this point in the history
- use schema registry to register the todo protobuf
- use sr to encode/decode messages
  • Loading branch information
kameshsampath committed Dec 8, 2023
1 parent a053454 commit 1795902
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@

dist/
cmd/scratches
2 changes: 0 additions & 2 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
)

func main() {
//Jai Guru

logger, _ := zap.NewDevelopment()
defer logger.Sync()
log := logger.Sugar()
Expand Down
110 changes: 104 additions & 6 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,132 @@
package main

import (
"context"
"fmt"
"io"
"net/http"

"github.com/kameshsampath/demo-protos/golang/todo"
"github.com/kameshsampath/todo-app/config"
"github.com/kameshsampath/todo-app/internal/impl"
"github.com/kameshsampath/todo-app/impl"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

func main() {
//Jai Guru
type serviceSchemas struct {
id int
typeValue any
index int
}

var (
protoMarshallFn = func(a any) ([]byte, error) {
return proto.Marshal(a.(proto.Message))
}
protoUnMarshallFn = func(b []byte, a any) error {
return proto.Unmarshal(b, a.(proto.Message))
}
log *zap.SugaredLogger
)

func init() {
logger, _ := zap.NewDevelopment()
defer logger.Sync()
log := logger.Sugar()
log = logger.Sugar()
}

config := config.New()
func main() {

config := config.New()
client, err := kgo.NewClient(
kgo.SeedBrokers(config.Seeds...),
kgo.ConsumeTopics(config.Topics...),
kgo.DefaultProduceTopic(config.DefaultProducerTopic()),
kgo.ConsumerGroup(config.ConsumerGroupID),
kgo.AllowAutoTopicCreation(),
)
if err != nil {
log.Fatal(err)
}

ss, err := createSchema(*config)
if err != nil {
log.Fatal(err)
}
// Add all schema type that needs to be registered with srede
// and used when encoding and decoding proto messages
serde := registerSchemas(
[]serviceSchemas{
{
id: ss.ID,
index: 0,
typeValue: &todo.Task{},
},
},
)

server := impl.New(
impl.WithClient(client),
impl.WithConfig(config),
impl.WithSerde(serde),
)

server := impl.New(client, config)
if err := server.Run(); err != nil {
log.Fatal(err)
}
}

// createSchema creates(registers) the schemas with the SchemaRegistry
func createSchema(config config.Config) (*sr.SubjectSchema, error) {
rcl, err := sr.NewClient(sr.URLs(config.SchemaRegistry))
if err != nil {
return nil, err
}

res, err := http.Get(config.SchemaURL)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode == http.StatusOK {
sb, err := io.ReadAll(res.Body)
log.Debugf("%s", string(sb))
if err != nil {
return nil, err
}

// if err := rcl.DeleteSchema(context.Background(), fmt.Sprintf("%s-value", config.DefaultProducerTopic()), -1, sr.HardDelete); err != nil {
// log.Fatal(err)
// }
// os.Exit(1)

ss, err := rcl.CreateSchema(context.Background(), fmt.Sprintf("%s-value", config.DefaultProducerTopic()), sr.Schema{
Type: sr.TypeProtobuf,
Schema: string(sb),
})
if err != nil {
return nil, err
}

return &ss, nil
}
return nil, fmt.Errorf("unable to read schema from url, '%s'", config.SchemaURL)
}

// registerSchemas registers all the schema and its corresponding encoding/decoding function
func registerSchemas(serviceSchemas []serviceSchemas) *sr.Serde {
serde := new(sr.Serde)
for _, schema := range serviceSchemas {
serde.Register(
schema.id,
schema.typeValue,
sr.DecodeFn(protoUnMarshallFn),
sr.EncodeFn(protoMarshallFn),
sr.Index(schema.index),
)
}
return serde
}
7 changes: 5 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"go.uber.org/zap"
)

// Config sets the configuration for the gRPC service
// Config sets the configuration for the gRPC server
type Config struct {
Env string `env:"ENV" envDefault:"dev"`
ConsumerGroupID string `env:"CONSUMER_GROUP_ID"`
Port uint16 `env:"PORT" envDefault:"9090"`
Seeds []string `env:"BROKERS" envSeparator:","`
Seeds []string `env:"BROKERS" envSeparator:"," envDefault:"localhost:19092"`
Topics []string `env:"TOPICS" envSeparator:","`
SchemaRegistry string `env:"SCHEMA_REGISTRY" envDefault:"localhost:18081"`
SchemaURL string `env:"SCHEMA_URL" envDefault:"https://raw.githubusercontent.com/kameshsampath/demo-protos/main/todo/todo.proto"`
}

var log *zap.SugaredLogger
Expand All @@ -32,6 +34,7 @@ func New() *Config {
return config
}

// DefaultProducerTopic gets the default topic that will be used as the producer topic
func (c *Config) DefaultProducerTopic() string {
return c.Topics[0]
}
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/caarlos0/env/v10 v10.0.0
github.com/kameshsampath/demo-protos/golang/todo v0.0.0-20231206022715-1afc44aab8bf
github.com/twmb/franz-go v1.15.2
github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand All @@ -15,10 +16,11 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
)
22 changes: 12 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@ github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twmb/franz-go v1.15.2 h1:mt3i7bTAp4GH/kMJiGAikJQUlG+UsCwxCmEy1CcAKYo=
github.com/twmb/franz-go v1.15.2/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1 h1:Ork4lD/g4uG5LYVNSiZ5XY4oy/k0c2do3brj0TxcU6w=
github.com/twmb/franz-go/pkg/sr v0.0.0-20231206062516-c09dc92d2db1/go.mod h1:egX+kicq83hpztv3PRCXKLNO132Ol9JTAJOCRZcqUxI=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
Expand Down
36 changes: 36 additions & 0 deletions impl/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package impl

import (
"fmt"
"net"

"github.com/kameshsampath/demo-protos/golang/todo"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

// Run runs the gRPC server
func (s *Server) Run() error {
logger, _ := zap.NewDevelopment()
defer logger.Sync()
log := logger.Sugar()

config := s.config
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Port))
if err != nil {
return fmt.Errorf("error starting server with port %d,%v", config.Port, err)
}
server := grpc.NewServer()
todo.RegisterTodoServer(server, s)
log.Infof("Server started on port %d", config.Port)
// required for grpcurl
if config.Env == "dev" {
reflection.Register(server)
}

if err := server.Serve(listen); err != nil {
return fmt.Errorf("error starting server,%v", err)
}
return nil
}
Loading

0 comments on commit 1795902

Please sign in to comment.