diff --git a/config/samples/app.yml b/config/samples/app.yml index c948d49..98ddff7 100644 --- a/config/samples/app.yml +++ b/config/samples/app.yml @@ -2,6 +2,15 @@ name: 'auxo' debug: true banner: true +global: + mq: + nsq: + nsqd_addr: + - "127.0.0.1:4150" + max_in_flight: 5 + concurrent: 3 + max_attempt: 2 + channel_name: test.nsq log: loggers: - level: info @@ -9,6 +18,9 @@ log: - name: 'auxo.net.web' level: debug writers: text + - name: 'auxo.mq.nsq' + level: debug + writers: nsq - name: 'benchmark1' level: debug writers: drop1 @@ -32,6 +44,11 @@ log: format: json options: colorized: true + - name: nsq + type: nsq + layout: '{level: a=b},{time: 2006-01-02T15:04:05.000Z07:00},{msg},{file:short},{newline}' + options: + topic: "mq_nsq_writer" web: mode: develop diff --git a/db/mq/mynsq/consumer.go b/db/mq/mynsq/consumer.go new file mode 100644 index 0000000..2feb8f0 --- /dev/null +++ b/db/mq/mynsq/consumer.go @@ -0,0 +1,256 @@ +package mynsq + +import ( + "errors" + "fmt" + "os" + "time" + + "github.com/cuigh/auxo/util/lazy" + gonsq "github.com/nsqio/go-nsq" +) + +// max_retry=5 +// 模型说明 +// 系统中单一程序体只可以订阅一个通道,所有人都是同一个通道 +// 但是可以对应不同的topic,topic(m)->channel(1) +// 所以channel定义在global config单一app为一个channel +// consumer 消费者结构体 +// global: +// mq: +// nsq: +// nsqd_addr: +// - "127.0.0.1:4150" +// max_in_flight: 5 +// concurrent: 3 +// max_attempt: 2 +// channel_name: test.nsq +type myConsumer struct { + isInit bool + Debug bool + channelName string + concurrent int + maxInFlight int + maxAttempt uint16 + //addr 连接地址 + nsqdAddr []string + // 各个topic的worker + topics map[string]*topicInfo +} + +// topicInfo topic 信息结构体 +type topicInfo struct { + topic string + maxInFlight int + concurrentNum int + config *gonsq.Config + handler gonsq.HandlerFunc + consumer *gonsq.Consumer +} + +// 失败消息处理函数类型 +type FailMessageFunc func(message FailMessage) (err error) + +func (f FailMessageFunc) HandleFailMessage(message FailMessage) (err error) { + err = f(message) + return +} + +// 失败消息处理接口,继承了该接口的接口都会调用该接口 +type FailMessageHandler interface { + HandleFailMessage(message FailMessage) (err error) +} + +type FailMessage struct { + Body []byte + Attempt uint16 + Timestamp int64 + MessageID string + FailMsg string +} + +var ( + mynsqConsumerValue = lazy.Value{New: consumerCreate} +) + +// 不定义close +func MustGetConsumer() *myConsumer { + v, err := mynsqConsumerValue.Get() + if err != nil { + fmt.Println("MustGetComsumer | must open comsumer failed") + os.Exit(-1) + } + return v.(*myConsumer) +} + +func consumerCreate() (d interface{}, err error) { + options, err := loadOptions() + if err != nil { + fmt.Printf("consumerCreate | loadOptions| err=%v\n", err) + return nil, err + } + ret := &myConsumer{ + nsqdAddr: make([]string, 0), + topics: make(map[string]*topicInfo), + } + err = ret.init(options, true) + if err != nil { + fmt.Printf("consumerCreate | ret.init | err=%v\n", err) + return nil, err + } + d = interface{}(ret) + return d, err +} + +// Connect 连接 +func (t *topicInfo) connect(channelName string, nsqdAddr []string, debug bool) { + if len(nsqdAddr) == 0 { + fmt.Println("nsqd地址为空,跳过连接,topic:", t.topic) + return + } + var ( + retryNum = 0 + sleepSeconds = 0 + err error + ) + t.consumer, err = gonsq.NewConsumer(t.topic, channelName, t.config) + if err != nil { + fmt.Printf("新建nsq consumer失败,err:%s,topic:%s,channel:%s\n", err.Error(), t.topic, channelName) + return + } + t.consumer.ChangeMaxInFlight(t.maxInFlight) + t.consumer.AddConcurrentHandlers(gonsq.Handler(t.handler), t.concurrentNum) + // t.consumer.AddHandler(gonsq.Handler(t.handler)) + // 不断进行重连,直到连接成功 + for { + // 只要连上了就不会退出的, 为空判断由入口保证 + if len(nsqdAddr) == 1 { + err = t.consumer.ConnectToNSQD(nsqdAddr[0]) + } else { + err = t.consumer.ConnectToNSQDs(nsqdAddr) + } + if err != nil { + fmt.Printf("连接nsqd(addr:%v)失败,err:%s\n", nsqdAddr, err.Error()) + retryNum++ + sleepSeconds = 5 + if retryNum%6 == 0 { + sleepSeconds = 30 + } + time.Sleep(time.Duration(sleepSeconds) * time.Second) + continue + } + if debug { + t.consumer.SetLogger(&interLog{}, gonsq.LogLevelDebug) + } else { + t.consumer.SetLogger(&interLog{}, gonsq.LogLevelWarning) + } + fmt.Printf("连接nsqd(%v)成功\n", nsqdAddr) + break + } + <-t.consumer.StopChan + err = nil + return +} + +// AddHandler 添加handler +func (c *myConsumer) AddHandler(topic string, handler gonsq.HandlerFunc, failHandler FailMessageFunc) *myConsumer { + var ( + t = &topicInfo{} + ok bool + ) + if t, ok = c.topics[topic]; !ok { + t = &topicInfo{} + t.concurrentNum = c.concurrent + t.maxInFlight = c.maxInFlight + t.config = gonsq.NewConfig() + t.config.MaxAttempts = c.maxAttempt + } + + t.topic = topic + // 自定义 handler + t.handler = func(nm *gonsq.Message) (err error) { + err = handler(nm) + if err != nil && c.topics[topic].config.MaxAttempts > 0 && c.topics[topic].config.MaxAttempts == nm.Attempts && failHandler != nil { + messageID := make([]byte, 0) + for _, v := range nm.ID { + messageID = append(messageID, v) + } + failHandler(FailMessage{ + MessageID: string(messageID), + Body: nm.Body, + Timestamp: nm.Timestamp, + FailMsg: err.Error(), + }) + err = nil + } + return + } + c.topics[topic] = t + return c +} + +// StopAll 停止 +func (c *myConsumer) stop() { + for k := range c.topics { + c.topics[k].consumer.Stop() + } +} + +// Run 运行 +func (c *myConsumer) Run() (err error) { + defer c.stop() + if !c.isInit { + err = errors.New("consumer not init") + return + } + if len(c.nsqdAddr) == 0 { + err = errors.New("nsqd addr address required") + return + } + for _, topicInfo := range c.topics { + topicInfo.config.MaxAttempts = c.maxAttempt + topicInfo.config.MaxInFlight = c.maxInFlight + go topicInfo.connect(c.channelName, c.nsqdAddr, c.Debug) + } + neverBack := make(chan int) + <-neverBack + return +} + +// Init 初始化 +func (c *myConsumer) init(configSection *Options, debug bool) (err error) { + if len(configSection.NsqdAddr) > 0 { + c.nsqdAddr = configSection.NsqdAddr + } + if configSection.MaxInFlight > 0 { + c.maxInFlight = configSection.MaxInFlight + } + if configSection.Concurrent > 0 { + c.concurrent = configSection.Concurrent + } + if configSection.ChannelName != "" { + c.channelName = configSection.ChannelName + } + if c.channelName == "" { + err = errors.New("config channelName not found") + return + } + if configSection.MaxAttempt > 0 { + c.maxAttempt = uint16(configSection.MaxAttempt) + } + + if c.maxInFlight < 1 { + c.maxInFlight = 1 + } + if c.concurrent < 1 { + c.concurrent = 1 + } + + if c.maxInFlight < c.concurrent { + err = errors.New("max_in_flight should exceed than concurrent") + return + } + c.isInit = true + + return +} diff --git a/db/mq/mynsq/nsq.go b/db/mq/mynsq/nsq.go new file mode 100644 index 0000000..82cd760 --- /dev/null +++ b/db/mq/mynsq/nsq.go @@ -0,0 +1,41 @@ +package mynsq + +import ( + "fmt" + + "github.com/cuigh/auxo/config" + "github.com/cuigh/auxo/errors" +) + +const PkgName = "auxo.mq.nsq" + +type Options struct { + NsqdAddr []string + NsqlookupdAddr []string + MaxInFlight int + Concurrent int + MaxAttempt int + ChannelName string +} + +type interLog struct { +} + +func (il *interLog) Output(calldepth int, s string) error { + fmt.Println(s) + return nil +} + +func loadOptions() (*Options, error) { + key := "global.mq.nsq" + if !config.Exist(key) { + return nil, errors.Format("can't find nsq config for [%s]", key) + } + + opts := &Options{} + err := config.UnmarshalOption(key, opts) + if err != nil { + return nil, err + } + return opts, nil +} diff --git a/db/mq/mynsq/nsq_test.go b/db/mq/mynsq/nsq_test.go new file mode 100644 index 0000000..e787b41 --- /dev/null +++ b/db/mq/mynsq/nsq_test.go @@ -0,0 +1,38 @@ +package mynsq_test + +import ( + "fmt" + "testing" + + "github.com/cuigh/auxo/config" + "github.com/cuigh/auxo/db/mq/mynsq" + gonsq "github.com/nsqio/go-nsq" +) + +func init() { + config.AddFolder("../../../config/samples") +} + +func Test_Pulish(t *testing.T) { + producer := mynsq.MustGetProducer() + producer.Publish("test_mynsq", "hello world!") +} + +func Test_Consumer(t *testing.T) { + mynsq.MustGetConsumer().AddHandler("test_mynsq", testHandler(), testFailHandler()).Run() +} + +func testHandler() gonsq.HandlerFunc { + return func(nm *gonsq.Message) error { + fmt.Println(string(nm.Body)) + return nil + } +} + +func testFailHandler() mynsq.FailMessageFunc { + return func(message mynsq.FailMessage) (err error) { + fmt.Println("error msg trigger,msg:", string(message.Body), ",messageid:", message.MessageID) + err = nil + return + } +} diff --git a/db/mq/mynsq/producer.go b/db/mq/mynsq/producer.go new file mode 100644 index 0000000..3090a89 --- /dev/null +++ b/db/mq/mynsq/producer.go @@ -0,0 +1,118 @@ +package mynsq + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "time" + + "github.com/cuigh/auxo/errors" + "github.com/cuigh/auxo/util/lazy" + gonsq "github.com/nsqio/go-nsq" +) + +var ( + mynsqProducerValue = lazy.Value{New: create} +) + +type myProducer struct { + producer *gonsq.Producer +} + +// 不定义close +func MustGetProducer() *myProducer { + v, err := mynsqProducerValue.Get() + if err != nil { + fmt.Println("MustGetProducer | must open producer failed") + os.Exit(-1) + } + return v.(*myProducer) +} + +func create() (d interface{}, err error) { + options, err := loadOptions() + if err != nil { + return nil, err + } + if len(options.NsqdAddr) <= 0 { + return nil, errors.New("create | options.NsqdAddr is empty") + } + config := gonsq.NewConfig() + w, err := gonsq.NewProducer(options.NsqdAddr[0], config) + if err != nil { + err = errors.New("初始化 nsq producer 失败, err:" + err.Error()) + return + } + w.SetLogger(&interLog{}, gonsq.LogLevelDebug) + ret := &myProducer{producer: w} + d = interface{}(ret) + return d, err +} + +// marshalMsg 将消息解析成[]byte,如果出错,第二个参数返回 error +func (p *myProducer) marshalMsg(msg interface{}) (m []byte, err error) { + switch t := msg.(type) { + case []byte: + m = t + case float64: + m = []byte(strconv.FormatFloat(t, 'f', -1, 64)) + case int64: + m = []byte(strconv.FormatInt(t, 10)) + case string: + m = []byte(t) + default: + m, err = json.Marshal(msg) + } + + return +} + +// Publish 投递消息,如果失败,返回 error +func (p *myProducer) Publish(topic string, msg interface{}) (err error) { + var ( + m []byte + ) + if m, err = p.marshalMsg(msg); err != nil { + return + } + err = p.producer.Publish(topic, m) + + return +} + +// Publish 投递消息,如果失败,返回 error +func (p *myProducer) PublishRaw(topic string, m []byte) (err error) { + + err = p.producer.Publish(topic, m) + + return +} + +// MultiPublish 批量发布消息,如果失败,返回 error +func (p *myProducer) MultiPublish(topic string, msg [][]interface{}) (err error) { + var ( + m = make([][]byte, 0) + tmp []byte + ) + for _, v := range msg { + if tmp, err = p.marshalMsg(v); err != nil { + return + } + m = append(m, tmp) + } + err = p.producer.MultiPublish(topic, m) + + return +} + +func (p *myProducer) DeferPublish(topic string, msg interface{}, deferSecond int64) (err error) { + var ( + m []byte + ) + if m, err = p.marshalMsg(msg); err != nil { + return + } + err = p.producer.DeferredPublish(topic, time.Second*time.Duration(deferSecond), m) + return +} diff --git a/log/log_test.go b/log/log_test.go index d206488..4057e78 100755 --- a/log/log_test.go +++ b/log/log_test.go @@ -24,6 +24,15 @@ func TestLogger(t *testing.T) { l.Error("error") } +func TestNsqLogger(t *testing.T) { + l := log.Get("auxo.mq.nsq") + assert.NotNil(t, l) + + l.Debug("debug") + l.Info("info") + l.Warn("warn") + l.Error("error") +} func TestFileLogger(t *testing.T) { l := log.Get("test") for i := 0; i < 100; i++ { diff --git a/log/nsq/nsq.go b/log/nsq/nsq.go new file mode 100644 index 0000000..f33d378 --- /dev/null +++ b/log/nsq/nsq.go @@ -0,0 +1,32 @@ +package nsq + +import ( + "errors" + "io" + + "github.com/cuigh/auxo/data" + "github.com/cuigh/auxo/db/mq/mynsq" + "github.com/cuigh/auxo/util/cast" +) + +type writer struct { + topic string +} + +func New(options data.Map) (io.Writer, error) { + topic := cast.ToString(options.Get("topic")) + if topic == "" { + return nil, errors.New("missing required option: topic name") + } + w := &writer{topic: topic} + return w, nil +} + +func (w *writer) Write(p []byte) (n int, err error) { + + err = mynsq.MustGetProducer().PublishRaw(w.topic, p) + if err != nil { + return 0, err + } + return len(p), nil +} diff --git a/log/writer.go b/log/writer.go index e748daa..c56b037 100755 --- a/log/writer.go +++ b/log/writer.go @@ -10,6 +10,7 @@ import ( "github.com/cuigh/auxo/data" "github.com/cuigh/auxo/log/console" "github.com/cuigh/auxo/log/file" + "github.com/cuigh/auxo/log/nsq" ) var writerBuilders = WriterBuilders{ @@ -22,6 +23,9 @@ var writerBuilders = WriterBuilders{ "file": func(options data.Map) (io.Writer, error) { return file.New(options) }, + "nsq": func(options data.Map) (io.Writer, error) { + return nsq.New(options) + }, } type WriterBuilder func(options data.Map) (io.Writer, error)