Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions app/payment/biz/fsm/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package fsm
import (
"context"
"fmt"

"github.com/doutokk/doutok/app/payment/biz/pay"

//"github.com/doutokk/doutok/app/payment/infra/rpc"
//"github.com/doutokk/doutok/rpc_gen/kitex_gen/order"
"time"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/doutokk/doutok/app/payment/biz/interfaces"
"github.com/doutokk/doutok/app/payment/biz/pay"
"github.com/doutokk/doutok/app/payment/conf"
"github.com/doutokk/doutok/common/lock"

Expand Down Expand Up @@ -86,6 +84,14 @@ func RestoreFromDB(orderId string) (*PayOrderFSM, error) {
// Redis address should come from configuration
var redLock = lock.NewRedLock(conf.GetConf().Redis.Address)

// Global variable for the delayed message sender
var delayedMessageSender interfaces.DelayedMessageSender

// SetDelayedMessageSender sets the implementation for sending delayed messages
func SetDelayedMessageSender(sender interfaces.DelayedMessageSender) {
delayedMessageSender = sender
}

// NewOrder creates a new payment order with distributed locking
func NewOrder(req CreatePayOrderReq) (*PayOrderFSM, error) {
o := &PayOrderFSM{}
Expand Down Expand Up @@ -130,6 +136,14 @@ func NewOrder(req CreatePayOrderReq) (*PayOrderFSM, error) {
return nil, fmt.Errorf("failed to create payment log: %w", err)
}

// Schedule delayed order cancellation if we have a sender configured
if delayedMessageSender != nil {
if err := delayedMessageSender.SendDelayedOrderCancellation(context.Background(), req.OrderId); err != nil {
klog.Warnf("Failed to schedule delayed cancellation for order %s: %v", req.OrderId, err)
// Continue even if scheduling fails, as this is not critical for order creation
}
}

return o, nil
}

Expand Down
13 changes: 13 additions & 0 deletions app/payment/biz/interfaces/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package interfaces

import "context"

// OrderCanceller defines the interface for order cancellation services
type OrderCanceller interface {
CancelOrder(ctx context.Context, orderID string) error
}

// DelayedMessageSender defines the interface for sending delayed messages
type DelayedMessageSender interface {
SendDelayedOrderCancellation(ctx context.Context, orderID string) error
}
205 changes: 205 additions & 0 deletions app/payment/biz/mq/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package mq

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/doutokk/doutok/app/payment/biz/interfaces"
"github.com/doutokk/doutok/app/payment/conf"
)

// RocketMQClient encapsulates RocketMQ producer and consumer
type RocketMQClient struct {
producer rocketmq.Producer
consumer rocketmq.PushConsumer
config *conf.Config
once sync.Once
orderCanceller interfaces.OrderCanceller
}

// OrderCancelMessage represents the message for order cancellation
type OrderCancelMessage struct {
OrderID string `json:"order_id"`
}

var (
mqClient *RocketMQClient
mux sync.Mutex
)

// GetMQClient returns a singleton RocketMQ client
func GetMQClient() *RocketMQClient {
if mqClient == nil {
mux.Lock()
defer mux.Unlock()
if mqClient == nil {
mqClient = &RocketMQClient{
config: conf.GetConf(),
}
}
}
return mqClient
}

// SetOrderCanceller sets the handler for order cancellation
func (c *RocketMQClient) SetOrderCanceller(canceller interfaces.OrderCanceller) {
c.orderCanceller = canceller
}

// InitProducer initializes the RocketMQ producer
func (c *RocketMQClient) InitProducer() error {
var err error
c.once.Do(func() {
opts := []producer.Option{
producer.WithNameServer(c.config.RocketMQ.NameServer),
producer.WithGroupName(c.config.RocketMQ.GroupName),
producer.WithRetry(2),
}

// Add ACL if credentials are provided
if c.config.RocketMQ.AccessKey != "" && c.config.RocketMQ.SecretKey != "" {
opts = append(opts, producer.WithCredentials(primitive.Credentials{
AccessKey: c.config.RocketMQ.AccessKey,
SecretKey: c.config.RocketMQ.SecretKey,
}))
}

c.producer, err = rocketmq.NewProducer(opts...)
if err != nil {
klog.Errorf("Failed to create RocketMQ producer: %v", err)
return
}

if err = c.producer.Start(); err != nil {
klog.Errorf("Failed to start RocketMQ producer: %v", err)
return
}

klog.Infof("RocketMQ producer started successfully")
})
return err
}

// InitConsumer initializes the RocketMQ consumer
func (c *RocketMQClient) InitConsumer() error {
var err error
opts := []consumer.Option{
consumer.WithNameServer(c.config.RocketMQ.NameServer),
consumer.WithGroupName(c.config.RocketMQ.GroupName),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithMaxReconsumeTimes(int32(c.config.RocketMQ.MaxReconsumeTimes)),
}

// Add ACL if credentials are provided
if c.config.RocketMQ.AccessKey != "" && c.config.RocketMQ.SecretKey != "" {
opts = append(opts, consumer.WithCredentials(primitive.Credentials{
AccessKey: c.config.RocketMQ.AccessKey,
SecretKey: c.config.RocketMQ.SecretKey,
}))
}

c.consumer, err = rocketmq.NewPushConsumer(opts...)
if err != nil {
klog.Errorf("Failed to create RocketMQ consumer: %v", err)
return err
}

// Subscribe to the order cancellation topic
err = c.consumer.Subscribe(c.config.RocketMQ.OrderCancelTopic, consumer.MessageSelector{}, c.handleOrderCancellation)
if err != nil {
klog.Errorf("Failed to subscribe to topic %s: %v", c.config.RocketMQ.OrderCancelTopic, err)
return err
}

if err = c.consumer.Start(); err != nil {
klog.Errorf("Failed to start RocketMQ consumer: %v", err)
return err
}

klog.Infof("RocketMQ consumer started successfully")
return nil
}

// SendDelayedOrderCancellation sends a delayed message for order cancellation
func (c *RocketMQClient) SendDelayedOrderCancellation(ctx context.Context, orderID string) error {
if err := c.InitProducer(); err != nil {
return err
}

message := OrderCancelMessage{
OrderID: orderID,
}

data, err := json.Marshal(message)
if err != nil {
klog.Errorf("Failed to marshal order cancellation message: %v", err)
return err
}

msg := &primitive.Message{
Topic: c.config.RocketMQ.OrderCancelTopic,
Body: data,
}

// Set delay level (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
// Since we need 30 minutes, we can use level 16 (30m)
msg.WithDelayTimeLevel(16)

res, err := c.producer.SendSync(ctx, msg)
if err != nil {
klog.Errorf("Failed to send delayed order cancellation message: %v", err)
return err
}

klog.Infof("Sent delayed cancellation for order %s, result: %s", orderID, res.String())
return nil
}

// handleOrderCancellation processes order cancellation messages
func (c *RocketMQClient) handleOrderCancellation(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
var message OrderCancelMessage
if err := json.Unmarshal(msg.Body, &message); err != nil {
klog.Errorf("Failed to unmarshal order cancellation message: %v", err)
continue
}

klog.Infof("Processing delayed order cancellation for order ID: %s", message.OrderID)

// Check if we have a canceller configured
if c.orderCanceller == nil {
klog.Errorf("No order canceller configured, cannot process cancellation for order %s", message.OrderID)
return consumer.ConsumeRetryLater, fmt.Errorf("no order canceller configured")
}

// Call the order canceller
err := c.orderCanceller.CancelOrder(ctx, message.OrderID)
if err != nil {
klog.Errorf("Failed to cancel order %s: %v", message.OrderID, err)
// If it's a transient error, we might want to retry
return consumer.ConsumeRetryLater, err
}

klog.Infof("Successfully processed cancellation for order %s", message.OrderID)
}

return consumer.ConsumeSuccess, nil
}

// Close shuts down the RocketMQ client
func (c *RocketMQClient) Close() {
if c.producer != nil {
c.producer.Shutdown()
}
if c.consumer != nil {
c.consumer.Shutdown()
}
}
42 changes: 42 additions & 0 deletions app/payment/biz/service/order_canceller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package service

import (
"context"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/doutokk/doutok/app/payment/biz/fsm"
)

// OrderCancellerService provides methods to cancel orders
type OrderCancellerService struct{}

// NewOrderCancellerService creates a new OrderCancellerService
func NewOrderCancellerService() *OrderCancellerService {
return &OrderCancellerService{}
}

// CancelOrder cancels an order by ID
func (s *OrderCancellerService) CancelOrder(ctx context.Context, orderID string) error {
klog.Infof("Cancelling order with ID: %s", orderID)

// Restore order FSM from database
orderFSM, err := fsm.RestoreFromDB(orderID)
if err != nil {
klog.Errorf("Failed to restore order FSM: %v", err)
return err
}

// Check current status - only cancel if not in FINISH state
if orderFSM.GetStatus() != fsm.FINISH {
// Cancel the order
if err := orderFSM.CancelPayment(ctx); err != nil {
klog.Errorf("Failed to cancel order: %v", err)
return err
}
klog.Infof("Successfully cancelled order %s", orderID)
} else {
klog.Infof("Order %s is already in FINISH state, skipping cancellation", orderID)
}

return nil
}
11 changes: 11 additions & 0 deletions app/payment/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ type Config struct {
CallBackUrl string `mapstructure:"callback_url"`
NotifyBackUrl string `mapstructure:"notifyback_url"`
} `mapstructure:"alipay"`

RocketMQ struct {
NameServer []string `mapstructure:"name_server"`
GroupName string `mapstructure:"group_name"`
OrderTopic string `mapstructure:"order_topic"`
OrderCancelTopic string `mapstructure:"order_cancel_topic"`
AccessKey string `mapstructure:"access_key"`
SecretKey string `mapstructure:"secret_key"`
OrderCancelDelay int `mapstructure:"order_cancel_delay"` // in minutes
MaxReconsumeTimes int `mapstructure:"max_reconsume_times"`
} `mapstructure:"rocketmq"`
}

// GetConf gets configuration instance
Expand Down
15 changes: 14 additions & 1 deletion app/payment/conf/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,17 @@ alipay:
app_id: "202100011768"
private_key: "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDQ5Q5Q"
callback_url: "http://localhost:8888/payment/alipay/callback"
notifyback_url: "http://localhost:8888/payment/alipay/notify"
notifyback_url: "http://localhost:8888/payment/alipay/notify"

rocketmq:
namesrv_addr: "10.21.22.54:8808"
group_id: "payment_group"
name_server:
- "10.21.22.54:8808"
group_name: "payment_service"
order_topic: "payment_order"
order_cancel_topic: "payment_order_cancel"
access_key: ""
secret_key: ""
order_cancel_delay: 30 # in minutes
max_reconsume_times: 3
Loading