211 lines
4.5 KiB
Go
211 lines
4.5 KiB
Go
package redisMq
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/go-redis/redis/v8"
|
|
"log"
|
|
)
|
|
|
|
type MsgEntity struct {
|
|
MsgID string
|
|
Key string
|
|
Val string
|
|
}
|
|
|
|
type MsgCallback func(ctx context.Context, msg *MsgEntity) error
|
|
|
|
type Consumer struct {
|
|
client *redis.Client
|
|
opts *ConsumerOptions
|
|
|
|
// 消费的 topic
|
|
topic string
|
|
// 所属的消费者组
|
|
groupID string
|
|
// 当前节点的消费者 id
|
|
consumerID string
|
|
|
|
// 接收到 msg 时执行的回调函数,由使用方定义
|
|
callbackFunc MsgCallback
|
|
|
|
ctx context.Context
|
|
stopFunc context.CancelFunc
|
|
|
|
failureCnt map[MsgEntity]int
|
|
}
|
|
|
|
func (r *Consumer) createMkStream(flag string) error {
|
|
// 创建消费者组(仅需运行一次)
|
|
err := r.client.XGroupCreateMkStream(r.ctx, r.topic, r.groupID, flag).Err()
|
|
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
|
log.Println(err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Consumer) deliverDeadLetter(ctx context.Context) {
|
|
for msg, times := range r.failureCnt {
|
|
if times < r.opts.maxRetryLimit {
|
|
continue
|
|
}
|
|
|
|
if err := r.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
|
|
log.Printf("failed to deliver dead letter mailbox (%v)\n", err)
|
|
continue
|
|
}
|
|
|
|
if err := r.ackMsg(ctx, &msg); err != nil {
|
|
log.Printf("failed to ack dead letter mailbox (%v)\n", err)
|
|
continue
|
|
}
|
|
|
|
delete(r.failureCnt, msg)
|
|
}
|
|
}
|
|
|
|
func (r *Consumer) handleMsg(ctx context.Context, msg []*MsgEntity) {
|
|
for _, msg := range msg {
|
|
if r.callbackFunc != nil && r.callbackFunc(ctx, msg) != nil {
|
|
// 惰性创建
|
|
if r.failureCnt == nil {
|
|
r.failureCnt = make(map[MsgEntity]int)
|
|
}
|
|
r.failureCnt[*msg]++
|
|
|
|
continue
|
|
}
|
|
|
|
if err := r.ackMsg(ctx, msg); err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
|
|
delete(r.failureCnt, *msg)
|
|
}
|
|
}
|
|
|
|
func (r *Consumer) receiveMsg() ([]*MsgEntity, error) {
|
|
return r.receive(">")
|
|
}
|
|
|
|
func (r *Consumer) receivePendingMsg() ([]*MsgEntity, error) {
|
|
return r.receive("0-0")
|
|
}
|
|
|
|
func (r *Consumer) run() {
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// 尝试获取消息
|
|
msg, err := r.receiveMsg()
|
|
if err != nil {
|
|
log.Printf("consumer receive msg err:%v", err)
|
|
continue
|
|
}
|
|
|
|
// 尝试处理消息
|
|
ctxMsg, _ := context.WithTimeout(r.ctx, r.opts.receiveTimeout)
|
|
r.handleMsg(ctxMsg, msg)
|
|
|
|
// 处理死信队列
|
|
ctxDead, _ := context.WithTimeout(r.ctx, r.opts.deadLetterDeliverTimeout)
|
|
r.deliverDeadLetter(ctxDead)
|
|
|
|
// 处理未回复ack的消息
|
|
msg, err = r.receivePendingMsg()
|
|
if err != nil {
|
|
log.Printf("consumer receive msg err:%v", err)
|
|
continue
|
|
}
|
|
|
|
ctxMsg, _ = context.WithTimeout(r.ctx, r.opts.receiveTimeout)
|
|
r.handleMsg(ctxMsg, msg)
|
|
}
|
|
|
|
}
|
|
|
|
func (r *Consumer) ackMsg(ctx context.Context, msg *MsgEntity) error {
|
|
// 处理完成后,手动确认
|
|
_, err := r.client.XAck(ctx, r.topic, r.groupID, msg.MsgID).Result()
|
|
if err != nil {
|
|
log.Printf("Failed to acknowledge message %s: %v", msg.MsgID, err)
|
|
} else {
|
|
fmt.Printf("Acknowledged message: %s\n", msg.MsgID)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *Consumer) receive(flag string) ([]*MsgEntity, error) {
|
|
streams, err := r.client.XReadGroup(r.ctx, &redis.XReadGroupArgs{
|
|
Streams: []string{r.topic, flag},
|
|
Group: r.groupID,
|
|
Consumer: r.consumerID,
|
|
Count: 3,
|
|
Block: r.opts.receiveTimeout,
|
|
}).Result()
|
|
|
|
if err != nil && !errors.Is(err, redis.Nil) {
|
|
return nil, err
|
|
}
|
|
|
|
var entities []*MsgEntity
|
|
|
|
for _, stream := range streams {
|
|
for _, message := range stream.Messages {
|
|
fmt.Printf("Received message: ID=%s, Values=%v\n", message.ID, message.Values)
|
|
for key, rawValue := range message.Values {
|
|
value, ok := rawValue.(string)
|
|
if !ok {
|
|
return nil, errors.New(fmt.Sprintf("Value for key=%s is %s\n", key, rawValue))
|
|
}
|
|
entities = append(entities, &MsgEntity{
|
|
MsgID: message.ID,
|
|
Key: key,
|
|
Val: value,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return entities, nil
|
|
}
|
|
|
|
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
consumer := &Consumer{
|
|
client: client,
|
|
topic: topic,
|
|
groupID: groupID,
|
|
consumerID: consumerID,
|
|
callbackFunc: callbackFunc,
|
|
ctx: ctx,
|
|
stopFunc: cancelFunc,
|
|
opts: &ConsumerOptions{},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(consumer.opts)
|
|
}
|
|
|
|
if err := consumer.createMkStream("0-0"); err != nil { //or "$"
|
|
return nil, err
|
|
}
|
|
|
|
go consumer.run()
|
|
|
|
return consumer, nil
|
|
}
|
|
|
|
func (r *Consumer) Stop() {
|
|
r.stopFunc()
|
|
}
|