Skip to content

Commit

Permalink
refactoring streams pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
speier committed Oct 11, 2024
1 parent f50e510 commit adbfbae
Show file tree
Hide file tree
Showing 1,000 changed files with 508,940 additions and 254 deletions.
8 changes: 6 additions & 2 deletions pkg/rules/repo/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ type jetstreamRuleRepo struct {
}

func NewJetstreamRuleRepo(natsURL string) (RuleRepo, error) {
jsInst := &jetstreamRuleRepo{STREAM_NAME, RULE_SUBJECT, streams.NewStream(natsURL, STREAM_NAME)}
err := jsInst.init()
stream, err := streams.NewStream(natsURL, STREAM_NAME)
if err != nil {
return nil, err
}
jsInst := &jetstreamRuleRepo{STREAM_NAME, RULE_SUBJECT, stream}
err = jsInst.init()
return jsInst, err
}

Expand Down
66 changes: 66 additions & 0 deletions pkg/streams/connpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package streams

import (
"sync"

"github.com/nats-io/nats.go"
)

const (
CONN_POOL_SIZE = 100
)

type NatsConnPool struct {
url string
mutex *sync.RWMutex
pool chan *nats.Conn
options []nats.Option
}

func NewNatsConnPool(url string, options ...nats.Option) *NatsConnPool {
return &NatsConnPool{
url: url,
mutex: new(sync.RWMutex),
pool: make(chan *nats.Conn, CONN_POOL_SIZE),
options: options,
}
}

func (this *NatsConnPool) GetConnection() (*nats.Conn, error) {
connect := func() (*nats.Conn, error) {
return nats.Connect(this.url, this.options...)
}

this.mutex.RLock()
defer this.mutex.RUnlock()

var nc *nats.Conn
var err error
select {
case nc = <-this.pool:
// reuse exists pool
if nc.IsConnected() != true {
// close to be sure
nc.Close()
// disconnected conn, create new *nats.Conn
nc, err = connect()
}
default:
// create *nats.Conn
nc, err = connect()
}

return nc, err
}

func (this *NatsConnPool) Close() {
this.mutex.Lock()
defer this.mutex.Unlock()

close(this.pool)
for nc := range this.pool {
nc.Close()
}

this.pool = make(chan *nats.Conn, CONN_POOL_SIZE)
}
97 changes: 14 additions & 83 deletions pkg/streams/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,44 @@ package streams

import (
"context"
"fmt"
"log/slog"
"os"
"strings"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

type jetstreamConsumer struct {
StreamName string
jStream *Stream
stream jetstream.Stream
conn *nats.Conn
cc jetstream.ConsumeContext
handlers map[string]handlerFunc
}

type handlerFunc func(context.Context, *Stream, string, []byte, string) error

func NewJetstreamConsumer(jStream *Stream, streamName string) *jetstreamConsumer {
handlers := make(map[string]handlerFunc)
return &jetstreamConsumer{handlers: handlers, StreamName: streamName, jStream: jStream}
}

func (c *jetstreamConsumer) init() error {
var err error
c.conn, err = c.jStream.natsConnect()
if err != nil {
return err
}
type handlerFunc func(context.Context, string, map[string][]string, []byte) error

js, err := jetstream.New(c.conn)
if err != nil {
return err
}
func ConsumeMessages(nc *nats.Conn, streamName string, subject string, durable string, handler handlerFunc) (jetstream.ConsumeContext, error) {
ctx := context.Background()

c.stream, err = js.Stream(context.Background(), c.StreamName)
js, err := jetstream.New(nc)
if err != nil {
return err
return nil, err
}

return nil
}

func (c *jetstreamConsumer) Handle(subject string, handler handlerFunc) {
c.handlers[subject] = handler
}

func (c *jetstreamConsumer) ReceiveMessages(durable string) error {
// initialise connection, jetstream
err := c.init()
stream, err := js.Stream(ctx, streamName)
if err != nil {
return err
return nil, err
}

ctx := context.Background()
cons, err := c.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
FilterSubject: subject,
Durable: durable,
DeliverPolicy: jetstream.DeliverLastPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
slog.Error(err.Error())
os.Exit(1)
return nil, err
}

c.cc, err = cons.Consume(func(msg jetstream.Msg) {
s := strings.Split(msg.Subject(), ".")
handler := c.handlers[s[0]]
return cons.Consume(func(msg jetstream.Msg) {
if handler != nil {
// fmt.Println("ReceiveMessages", msg.Subject(), string(msg.Data()), msg.Headers())
publishedBy := getHeader(msg.Headers(), "publishedBy")
err = handler(ctx, c.jStream, msg.Subject(), msg.Data(), publishedBy)
err := handler(ctx, msg.Subject(), msg.Headers(), msg.Data())
if err != nil {
//msg.Nak()
msg.Ack()
fmt.Println(err.Error())
msg.Nak()
} else {
msg.Ack()
}
} else {
fmt.Println("no consumer for subject", s)
}
}, jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
fmt.Println("consumer error", err.Error())
}))
if err != nil {
slog.Error(err.Error())
os.Exit(1)
}

return nil
}

func (c *jetstreamConsumer) Close() {
defer c.conn.Close()
c.cc.Stop()
}

func getHeader(headers map[string][]string, name string) string {
for hn, hv := range headers {
if hn == name {
if len(hv) > 0 {
return hv[0]
}
}
}
return ""
})
}
25 changes: 24 additions & 1 deletion pkg/streams/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
package streams

import "time"
import (
"time"

"github.com/nats-io/nkeys"
)

func NKeySignatureHandler(seed string, b []byte) ([]byte, error) {
sk, err := nkeys.FromSeed([]byte(seed))
if err != nil {
return nil, err
}
return sk.Sign(b)
}

func GetHeader(headers map[string][]string, name string) string {
for hn, hv := range headers {
if hn == name {
if len(hv) > 0 {
return hv[0]
}
}
}
return ""
}

func getElapsed(start time.Time) string {
return time.Since(start).String()
Expand Down
Loading

0 comments on commit adbfbae

Please sign in to comment.