goRedisMQ/consumer.go
2025-03-27 17:30:19 +08:00

211 lines
4.5 KiB
Go

package redisMq
import (
"context"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"log"
)
type MsgEntity struct {
MsgID string
Key string
Val string
}
type MsgCallback func(ctx context.Context, msg *MsgEntity) error
type Consumer struct {
client *redis.Client
opts *ConsumerOptions
// 消费的 topic
topic string
// 所属的消费者组
groupID string
// 当前节点的消费者 id
consumerID string
// 接收到 msg 时执行的回调函数,由使用方定义
callbackFunc MsgCallback
ctx context.Context
stopFunc context.CancelFunc
failureCnt map[MsgEntity]int
}
func (r *Consumer) createMkStream(flag string) error {
// 创建消费者组(仅需运行一次)
err := r.client.XGroupCreateMkStream(r.ctx, r.topic, r.groupID, flag).Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Println(err)
return err
}
return nil
}
func (r *Consumer) deliverDeadLetter(ctx context.Context) {
for msg, times := range r.failureCnt {
if times < r.opts.maxRetryLimit {
continue
}
if err := r.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
log.Printf("failed to deliver dead letter mailbox (%v)\n", err)
continue
}
if err := r.ackMsg(ctx, &msg); err != nil {
log.Printf("failed to ack dead letter mailbox (%v)\n", err)
continue
}
delete(r.failureCnt, msg)
}
}
func (r *Consumer) handleMsg(ctx context.Context, msg []*MsgEntity) {
for _, msg := range msg {
if r.callbackFunc != nil && r.callbackFunc(ctx, msg) != nil {
// 惰性创建
if r.failureCnt == nil {
r.failureCnt = make(map[MsgEntity]int)
}
r.failureCnt[*msg]++
continue
}
if err := r.ackMsg(ctx, msg); err != nil {
log.Println(err)
continue
}
delete(r.failureCnt, *msg)
}
}
func (r *Consumer) receiveMsg() ([]*MsgEntity, error) {
return r.receive(">")
}
func (r *Consumer) receivePendingMsg() ([]*MsgEntity, error) {
return r.receive("0-0")
}
func (r *Consumer) run() {
for {
select {
case <-r.ctx.Done():
return
default:
}
// 尝试获取消息
msg, err := r.receiveMsg()
if err != nil {
log.Printf("consumer receive msg err:%v", err)
continue
}
// 尝试处理消息
ctxMsg, _ := context.WithTimeout(r.ctx, r.opts.receiveTimeout)
r.handleMsg(ctxMsg, msg)
// 处理死信队列
ctxDead, _ := context.WithTimeout(r.ctx, r.opts.deadLetterDeliverTimeout)
r.deliverDeadLetter(ctxDead)
// 处理未回复ack的消息
msg, err = r.receivePendingMsg()
if err != nil {
log.Printf("consumer receive msg err:%v", err)
continue
}
ctxMsg, _ = context.WithTimeout(r.ctx, r.opts.receiveTimeout)
r.handleMsg(ctxMsg, msg)
}
}
func (r *Consumer) ackMsg(ctx context.Context, msg *MsgEntity) error {
// 处理完成后,手动确认
_, err := r.client.XAck(ctx, r.topic, r.groupID, msg.MsgID).Result()
if err != nil {
log.Printf("Failed to acknowledge message %s: %v", msg.MsgID, err)
} else {
fmt.Printf("Acknowledged message: %s\n", msg.MsgID)
}
return err
}
func (r *Consumer) receive(flag string) ([]*MsgEntity, error) {
streams, err := r.client.XReadGroup(r.ctx, &redis.XReadGroupArgs{
Streams: []string{r.topic, flag},
Group: r.groupID,
Consumer: r.consumerID,
Count: 3,
Block: r.opts.receiveTimeout,
}).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
}
var entities []*MsgEntity
for _, stream := range streams {
for _, message := range stream.Messages {
fmt.Printf("Received message: ID=%s, Values=%v\n", message.ID, message.Values)
for key, rawValue := range message.Values {
value, ok := rawValue.(string)
if !ok {
return nil, errors.New(fmt.Sprintf("Value for key=%s is %s\n", key, rawValue))
}
entities = append(entities, &MsgEntity{
MsgID: message.ID,
Key: key,
Val: value,
})
}
}
}
return entities, nil
}
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
consumer := &Consumer{
client: client,
topic: topic,
groupID: groupID,
consumerID: consumerID,
callbackFunc: callbackFunc,
ctx: ctx,
stopFunc: cancelFunc,
opts: &ConsumerOptions{},
}
for _, opt := range opts {
opt(consumer.opts)
}
if err := consumer.createMkStream("0-0"); err != nil { //or "$"
return nil, err
}
go consumer.run()
return consumer, nil
}
func (r *Consumer) Stop() {
r.stopFunc()
}