first commit
This commit is contained in:
commit
43e4fb042a
8
.idea/.gitignore
generated
vendored
Normal file
8
.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# 默认忽略的文件
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# 基于编辑器的 HTTP 客户端请求
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/redisMq.iml" filepath="$PROJECT_DIR$/.idea/redisMq.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
9
.idea/redisMq.iml
generated
Normal file
9
.idea/redisMq.iml
generated
Normal file
@ -0,0 +1,9 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="WEB_MODULE" version="4">
|
||||
<component name="Go" enabled="true" />
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
210
consumer.go
Normal file
210
consumer.go
Normal file
@ -0,0 +1,210 @@
|
||||
package redisMq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"log"
|
||||
)
|
||||
|
||||
type MsgEntity struct {
|
||||
MsgID string
|
||||
Key string
|
||||
Val string
|
||||
}
|
||||
|
||||
type MsgCallback func(ctx context.Context, msg *MsgEntity) error
|
||||
|
||||
type Consumer struct {
|
||||
client *redis.Client
|
||||
opts *ConsumerOptions
|
||||
|
||||
// 消费的 topic
|
||||
topic string
|
||||
// 所属的消费者组
|
||||
groupID string
|
||||
// 当前节点的消费者 id
|
||||
consumerID string
|
||||
|
||||
// 接收到 msg 时执行的回调函数,由使用方定义
|
||||
callbackFunc MsgCallback
|
||||
|
||||
ctx context.Context
|
||||
stopFunc context.CancelFunc
|
||||
|
||||
failureCnt map[MsgEntity]int
|
||||
}
|
||||
|
||||
func (r *Consumer) createMkStream(flag string) error {
|
||||
// 创建消费者组(仅需运行一次)
|
||||
err := r.client.XGroupCreateMkStream(r.ctx, r.topic, r.groupID, flag).Err()
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Consumer) deliverDeadLetter(ctx context.Context) {
|
||||
for msg, times := range r.failureCnt {
|
||||
if times < r.opts.maxRetryLimit {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
|
||||
log.Printf("failed to deliver dead letter mailbox (%v)\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.ackMsg(ctx, &msg); err != nil {
|
||||
log.Printf("failed to ack dead letter mailbox (%v)\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
delete(r.failureCnt, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Consumer) handleMsg(ctx context.Context, msg []*MsgEntity) {
|
||||
for _, msg := range msg {
|
||||
if r.callbackFunc != nil && r.callbackFunc(ctx, msg) != nil {
|
||||
// 惰性创建
|
||||
if r.failureCnt == nil {
|
||||
r.failureCnt = make(map[MsgEntity]int)
|
||||
}
|
||||
r.failureCnt[*msg]++
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.ackMsg(ctx, msg); err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
delete(r.failureCnt, *msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Consumer) receiveMsg() ([]*MsgEntity, error) {
|
||||
return r.receive(">")
|
||||
}
|
||||
|
||||
func (r *Consumer) receivePendingMsg() ([]*MsgEntity, error) {
|
||||
return r.receive("0-0")
|
||||
}
|
||||
|
||||
func (r *Consumer) run() {
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// 尝试获取消息
|
||||
msg, err := r.receiveMsg()
|
||||
if err != nil {
|
||||
log.Printf("consumer receive msg err:%v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 尝试处理消息
|
||||
ctxMsg, _ := context.WithTimeout(r.ctx, r.opts.receiveTimeout)
|
||||
r.handleMsg(ctxMsg, msg)
|
||||
|
||||
// 处理死信队列
|
||||
ctxDead, _ := context.WithTimeout(r.ctx, r.opts.deadLetterDeliverTimeout)
|
||||
r.deliverDeadLetter(ctxDead)
|
||||
|
||||
// 处理未回复ack的消息
|
||||
msg, err = r.receivePendingMsg()
|
||||
if err != nil {
|
||||
log.Printf("consumer receive msg err:%v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
ctxMsg, _ = context.WithTimeout(r.ctx, r.opts.receiveTimeout)
|
||||
r.handleMsg(ctxMsg, msg)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *Consumer) ackMsg(ctx context.Context, msg *MsgEntity) error {
|
||||
// 处理完成后,手动确认
|
||||
_, err := r.client.XAck(ctx, r.topic, r.groupID, msg.MsgID).Result()
|
||||
if err != nil {
|
||||
log.Printf("Failed to acknowledge message %s: %v", msg.MsgID, err)
|
||||
} else {
|
||||
fmt.Printf("Acknowledged message: %s\n", msg.MsgID)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Consumer) receive(flag string) ([]*MsgEntity, error) {
|
||||
streams, err := r.client.XReadGroup(r.ctx, &redis.XReadGroupArgs{
|
||||
Streams: []string{r.topic, flag},
|
||||
Group: r.groupID,
|
||||
Consumer: r.consumerID,
|
||||
Count: 3,
|
||||
Block: r.opts.receiveTimeout,
|
||||
}).Result()
|
||||
|
||||
if err != nil && !errors.Is(err, redis.Nil) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var entities []*MsgEntity
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
fmt.Printf("Received message: ID=%s, Values=%v\n", message.ID, message.Values)
|
||||
for key, rawValue := range message.Values {
|
||||
value, ok := rawValue.(string)
|
||||
if !ok {
|
||||
return nil, errors.New(fmt.Sprintf("Value for key=%s is %s\n", key, rawValue))
|
||||
}
|
||||
entities = append(entities, &MsgEntity{
|
||||
MsgID: message.ID,
|
||||
Key: key,
|
||||
Val: value,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return entities, nil
|
||||
}
|
||||
|
||||
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
consumer := &Consumer{
|
||||
client: client,
|
||||
topic: topic,
|
||||
groupID: groupID,
|
||||
consumerID: consumerID,
|
||||
callbackFunc: callbackFunc,
|
||||
ctx: ctx,
|
||||
stopFunc: cancelFunc,
|
||||
opts: &ConsumerOptions{},
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(consumer.opts)
|
||||
}
|
||||
|
||||
if err := consumer.createMkStream("0-0"); err != nil { //or "$"
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go consumer.run()
|
||||
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func (r *Consumer) Stop() {
|
||||
r.stopFunc()
|
||||
}
|
17
deadleater.go
Normal file
17
deadleater.go
Normal file
@ -0,0 +1,17 @@
|
||||
package redisMq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
)
|
||||
|
||||
type DeadLetterMailbox interface {
|
||||
Deliver(ctx context.Context, msg *MsgEntity) error
|
||||
}
|
||||
|
||||
type DeadLetterMailboxImpl struct{}
|
||||
|
||||
func (d *DeadLetterMailboxImpl) Deliver(ctx context.Context, msg *MsgEntity) error {
|
||||
log.Printf("Deliver msg: %v\n", msg)
|
||||
return nil
|
||||
}
|
42
example/consumer_test.go
Normal file
42
example/consumer_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package example
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"log"
|
||||
"redisMq"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func callBack(ctx context.Context, msg *redisMq.MsgEntity) error {
|
||||
log.Println(msg)
|
||||
return errors.New("error_test")
|
||||
}
|
||||
|
||||
func TestConsumer(t *testing.T) {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "192.168.8.1:6379",
|
||||
Password: "",
|
||||
})
|
||||
|
||||
topic := "test_stream_topic"
|
||||
groupId := "my_test_group"
|
||||
consumerId := "consumer_1"
|
||||
|
||||
consumer, err := redisMq.NewConsumer(client, topic, groupId, consumerId, callBack,
|
||||
redisMq.WithReceiveTimeout(time.Second),
|
||||
redisMq.WithMaxRetryLimit(2),
|
||||
redisMq.WithHandleMsgTimeout(time.Second),
|
||||
redisMq.WithDeadLetterDeliverTimeout(time.Second),
|
||||
redisMq.WithDeadLetterMailbox(&redisMq.DeadLetterMailboxImpl{}))
|
||||
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
defer consumer.Stop()
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
24
example/producer_test.go
Normal file
24
example/producer_test.go
Normal file
@ -0,0 +1,24 @@
|
||||
package example
|
||||
|
||||
import (
|
||||
"github.com/go-redis/redis/v8"
|
||||
"redisMq"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_Producer(t *testing.T) {
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "192.168.8.1:6379",
|
||||
Password: "",
|
||||
})
|
||||
|
||||
topic := "test_stream_topic"
|
||||
|
||||
producer := redisMq.NewProducer(rdb, redisMq.WithMsgQueueLen(6))
|
||||
msg, err := producer.SendMsg(topic, "key1", "value1")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
t.Log(msg)
|
||||
}
|
10
go.mod
Normal file
10
go.mod
Normal file
@ -0,0 +1,10 @@
|
||||
module redisMq
|
||||
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/go-redis/redis/v8 v8.11.5 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
)
|
8
go.sum
Normal file
8
go.sum
Normal file
@ -0,0 +1,8 @@
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
|
||||
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
59
option.go
Normal file
59
option.go
Normal file
@ -0,0 +1,59 @@
|
||||
package redisMq
|
||||
|
||||
import "time"
|
||||
|
||||
type ProducerOptions struct {
|
||||
msgQueueLen int64
|
||||
}
|
||||
|
||||
type ProducerOption func(*ProducerOptions)
|
||||
|
||||
func WithMsgQueueLen(msgQueueLen int64) ProducerOption {
|
||||
return func(o *ProducerOptions) {
|
||||
o.msgQueueLen = msgQueueLen
|
||||
}
|
||||
}
|
||||
|
||||
type ConsumerOptions struct {
|
||||
// 每轮接收消息的超时时长
|
||||
receiveTimeout time.Duration
|
||||
// 处理消息的最大重试次数,超过此次数时,消息会被投递到死信队列
|
||||
maxRetryLimit int
|
||||
// 死信队列,可以由使用方自定义实现
|
||||
deadLetterMailbox DeadLetterMailbox
|
||||
// 投递死信流程超时阈值
|
||||
deadLetterDeliverTimeout time.Duration
|
||||
// 处理消息流程超时阈值
|
||||
handleMsgTimeout time.Duration
|
||||
}
|
||||
type ConsumerOption func(*ConsumerOptions)
|
||||
|
||||
func WithReceiveTimeout(receiveTimeout time.Duration) ConsumerOption {
|
||||
return func(o *ConsumerOptions) {
|
||||
o.receiveTimeout = receiveTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxRetryLimit(maxRetryLimit int) ConsumerOption {
|
||||
return func(o *ConsumerOptions) {
|
||||
o.maxRetryLimit = maxRetryLimit
|
||||
}
|
||||
}
|
||||
|
||||
func WithDeadLetterMailbox(deadLetterMailbox DeadLetterMailbox) ConsumerOption {
|
||||
return func(o *ConsumerOptions) {
|
||||
o.deadLetterMailbox = deadLetterMailbox
|
||||
}
|
||||
}
|
||||
|
||||
func WithDeadLetterDeliverTimeout(deadLetterDeliverTimeout time.Duration) ConsumerOption {
|
||||
return func(o *ConsumerOptions) {
|
||||
o.deadLetterDeliverTimeout = deadLetterDeliverTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func WithHandleMsgTimeout(handleMsgTimeout time.Duration) ConsumerOption {
|
||||
return func(o *ConsumerOptions) {
|
||||
o.handleMsgTimeout = handleMsgTimeout
|
||||
}
|
||||
}
|
38
producer.go
Normal file
38
producer.go
Normal file
@ -0,0 +1,38 @@
|
||||
package redisMq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
client *redis.Client
|
||||
opts *ProducerOptions
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (p *Producer) SendMsg(topic string, key, val string) (string, error) {
|
||||
add := p.client.XAdd(p.ctx, &redis.XAddArgs{
|
||||
Stream: topic,
|
||||
MaxLen: p.opts.msgQueueLen,
|
||||
Values: map[string]interface{}{
|
||||
key: val,
|
||||
},
|
||||
})
|
||||
|
||||
return add.Result()
|
||||
}
|
||||
|
||||
func NewProducer(client *redis.Client, opts ...ProducerOption) *Producer {
|
||||
p := &Producer{
|
||||
client: client,
|
||||
opts: &ProducerOptions{},
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(p.opts)
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
Loading…
Reference in New Issue
Block a user