邓嘉文Jarvan

V1

2022/10/08阅读:259主题:橙心

nats 简介和使用

nats 简介和使用

nats 有 3 个产品

  • core-nats: 不做持久化的及时信息传输系统
  • nats-streaming: 基于 nats 的持久化消息队列(已弃用)
  • nats-jetstream: 基于 nats 的持久化消息队列

这里主要讨论 core-nats 和 nats-jetstream

nats

nats 快速开始

  • 启动 nats
# 启动 nats
docker run --network host -p 4222:4222 nats
  • Connect 连接
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
  log.Fatal("NATS 连接失败")
}
defer nc.Close()
  • Publish 发布/生产消息
// 生产消息
err := nc.Publish("foo", []byte("Hello World"))
if err != nil {
  log.Fatal("NATS 发布失败")
}
// Flush 发布缓冲区
err = nc.Flush()
if err != nil {
  log.Fatal("NATS Flush 失败")
}

出于性能考虑, 发布的消息先写入到类似 Buffer 缓存的地方, 然后再一次性发送到 nats 服务器

参考官方文档: https://docs.nats.io/using-nats/developer/sending/caches

  • Subscribe 订阅/消费消息
// 消费消息
_, err = nc.Subscribe("foo"func(msg *nats.Msg) {
  fmt.Printf("收到消息: %s\n", msg.Data)
})
if err != nil {
  log.Fatal("NATS 订阅失败")
}

nats 提供 发布订阅, 请求响应, 和队列模型 3 种 API. 分别是发布订阅模型, 请求响应模型, 和队列模型, 下面展开介绍

发布订阅

发布订阅
发布订阅

发布订阅模型, 一个发布者, 多个订阅者, 多个订阅者都可以收到同一个消息

// 消费消息
_, err = nc.Subscribe("foo"func(msg *nats.Msg) {
  fmt.Printf("收到消息: %s\n", msg.Data)
})
if err != nil {
  log.Fatal("NATS 订阅失败")
}

队列模型

队列模型
队列模型

队列模型, 一个发布者, 多个订阅者, 消息在多个消息中负载均衡分配, 分配给 A 消费者, 这个消息就不会再分配给其他消费者了

// 消费消息
// queue 是队列组的名称, 同一组队列最多只有一个接收者能成功接收
_, _ = nc.QueueSubscribe("foo""queue"func(msg *nats.Msg) {
 fmt.Printf("收到消息: %s\n"string(msg.Data))
})

请求响应

生产者能收到消费者的回复

请求响应
请求响应
// 消费消息
nc.Subscribe("help"func(m *nats.Msg) {
 fmt.Printf("收到消息: %s\n"string(m.Data))
 nc.Publish(m.Reply, []byte("I can help!"))
})

// 生产消息
go func() {
 msg, _ := nc.Request("help", []byte("help me"), 100*time.Millisecond)
 fmt.Printf("收到回复: %s\n"string(msg.Data))
}()

select {}

应用1-保证消息可靠性

nats 本身不做任何的消息的持久化, 是 "最多一次" 交付模型

举个例子, 如果生产的消息没有消费者接, 消息就丢掉了

但是请求响应机制可以通过业务代码保证消息的可靠性, 在业务层面实现常见消息队列的 ACK 机制

举个例子, 生产者发送消息, 消费者接受消息后处理, 成功返回 OK, 失败返回 error, 生产者如果收到 error 或者超时就可以补发消息

应用2-解耦 PRC 调用

请求响应模型和 RPC 调用是一致的, 我们可以用这个实现一个基于事件驱动的 RPC 总线

nats-jetstream

架构

jetstream 提供持久化 nats 服务, 客户端支持实时推送的 push 模式和自定义拉取的 pull 模式, 架构图如下

nats-jetstream架构
nats-jetstream架构
  • subject: 和 nats 一样, 用来区分不同的消息
  • stream: 定义了消息的储存方式, 保留规则, 丢弃规则 (stream 和 subject 是 1:n 的关系)
  • consumer: 定义了消息接受方式并记录接受到的位置, 有 2 种消费方式及时推送 push 和自定义拉取 pull (consumer 和 stream 是 1:n 的关系)

用支持 jetstream 的方式启动 nats

# 启动 nats jetStream (同时支持 nats API 和 jetStream API)
docker run --network host -p 4222:4222 nats -js

架构示例

贴一个来自于官方文档的 push pull 混合使用的架构示例图

image-20221008162524128
image-20221008162524128

基于 pull 的 worker 消费者和基于 push 的 monitor 消费者同时存在

Stream

JetStream 中 Stream 定义了消息的储存方式, 保留规则, 丢弃规则.

一个 Stream 可以对应多个 Subject, 如果一条消息符合 Stream 的保留规则, 就会被保留下来

注意 JetStream 所有生产和消费的消息的 Subject 都需要有 Stream 对应, 不然报错

贴一个 Stream 的核心配置 (v1.15.0)

// jsm.go

// StreamConfig 用于定义一个流, 大多数参数都有合理的默认值
// 如果 subject 没写, 就会分配一个随机的 subject
type StreamConfig struct {
    // 名称
 Name        string   
    // 描述
 Description string
    // 对应的多个 Subject
 Subjects    []string 
 // 消息 3 种保留策略
 // RetentionPolicy 最大消息数, 最大存储空间或者最大存活时间达到限制, 就可以删除消息
 // InterestPolicy 需要所有 consumer 确认可以删除消息
 // WorkQueuePolicy 只需要一个 consumer 确认可以删除消息
 Retention RetentionPolicy 
 // 最大 Consumer 数量
 MaxConsumers int 
 // 最大存储 Mgs 数量
 MaxMsgs int64 
 // 最大储存占用
 MaxBytes int64 
 // 消息 2 种淘汰策略
 // DiscardOld 消息达到限制后, 丢弃最早的消息
 // DiscardNew 消息达到限制后, 信息消息新推送会失败
 Discard DiscardPolicy 
 // 消息存活时间
 MaxAge time.Duration 
 // 每个 subject 最大消息数量
 MaxMsgsPerSubject int64 
 // 每个消息最大大小
 MaxMsgSize int32 
 // 支持文件储存和内存储存 2 种类型
 Storage StorageType 
 // 消息分片数量
 Replicas int 
 // 不需要 ack
 NoAck    bool   
    // ...         
}

Consumer

Consumer 定义了消息接受方式并记录接受到的位置

举个例子如果消费者在 Sub 消息的时候指定了 Consumer, 就会从记录的位置开始推送消息, 而不是从头开始

贴一个 Consumer 的核心配置 (v1.15.0)

// jsm.go

type ConsumerConfig struct {
 // 名称
 Durable string `json:"durable_name,omitempty"`
 // 描述
 Description string `json:"description,omitempty"`
 // 交付 Subject
 DeliverSubject string `json:"deliver_subject,omitempty"`
 // 交付 Group
 DeliverGroup string `json:"deliver_group,omitempty"`
 // 交付策略
 // 交付所有 (默认), 交付最后一个, 交付最新, 自定义开始序号, 自定义开始时间
 DeliverPolicy DeliverPolicy `json:"deliver_policy"`
 // 开始序号
 OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
 // 开始时间
 OptStartTime *time.Time `json:"opt_start_time,omitempty"`
 // ack 策略
 // 不需要ack (默认), 隐式ack All , 每个都需要显示ack
 AckPolicy AckPolicy `json:"ack_policy"`
 // ack等待时间
 AckWait    time.Duration   `json:"ack_wait,omitempty"`
 MaxDeliver int             `json:"max_deliver,omitempty"`
 BackOff    []time.Duration `json:"backoff,omitempty"`
 // 过滤的Subject
 FilterSubject string `json:"filter_subject,omitempty"`
 // 重试策略
 // 尽快重试, ReplayOriginalPolicy 相同时间重试
 ReplayPolicy ReplayPolicy `json:"replay_policy"`
 // 限速
 RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
 // 采样频率
 SampleFrequency string `json:"sample_freq,omitempty"`
 // 最大等待数量
 MaxWaiting int `json:"max_waiting,omitempty"`
 //最大Pending ack数量
 MaxAckPending int `json:"max_ack_pending,omitempty"`
 // flow 控制
 FlowControl bool `json:"flow_control,omitempty"`
 // 心跳时间
 Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
    // ...
}

代码示例

Core Publish-Subcribe

package main


import (
 "fmt"
 "os"
 "time"


 "github.com/nats-io/nats.go"
)


func main() {
 // 环境变量中获取 NATS 服务器地址
 url := os.Getenv("NATS_URL")
 if url == "" {
  url = nats.DefaultURL
 }

 // 连接 NATS 服务器
 nc, _ := nats.Connect(url)


 defer nc.Drain()

 // 生产消息 1, 因为没有消费者, 这个消息会丢失
 nc.Publish("greet.1", []byte("hello"))

 // 订阅消息, 异步接受, 这个时候有消费者了
 sub, _ := nc.SubscribeSync("greet.*")

 // 第一个消息因为没有消费者所以会丢失
 msg, _ := sub.NextMsg(10 * time.Millisecond)
 fmt.Println("subscribed after a publish...")
 fmt.Printf("msg is nil? %v\n", msg == nil)

 // 生产消息 2, 3
 nc.Publish("greet.2", []byte("hello"))
 nc.Publish("greet.3", []byte("hello"))


 msg, _ = sub.NextMsg(10 * time.Millisecond)
 fmt.Printf("msg data: %q on subject %q\n"string(msg.Data), msg.Subject)


 msg, _ = sub.NextMsg(10 * time.Millisecond)
 fmt.Printf("msg data: %q on subject %q\n"string(msg.Data), msg.Subject)


 nc.Publish("greet.4", []byte("hello"))


 msg, _ = sub.NextMsg(10 * time.Millisecond)
 fmt.Printf("msg data: %q on subject %q\n"string(msg.Data), msg.Subject)
}

output:

subscribed after a publish...
msg is nil? true
msg data: "hello" on subject "greet.2"
msg data: "hello" on subject "greet.3"
msg data: "hello" on subject "greet.4"

Request-Reply

package main


import (
 "fmt"
 "os"
 "time"


 "github.com/nats-io/nats.go"
)


func main() {

 url := os.Getenv("NATS_URL")
 if url == "" {
  url = nats.DefaultURL
 }


 nc, _ := nats.Connect(url)
 defer nc.Drain()


 sub, _ := nc.Subscribe("greet.*"func(msg *nats.Msg) {

  name := msg.Subject[6:]
  msg.Respond([]byte("hello, " + name))
 })


 rep, _ := nc.Request("greet.joe"nil, time.Second)
 fmt.Println(string(rep.Data))


 rep, _ = nc.Request("greet.sue"nil, time.Second)
 fmt.Println(string(rep.Data))


 rep, _ = nc.Request("greet.bob"nil, time.Second)
 fmt.Println(string(rep.Data))


 sub.Unsubscribe()


 _, err := nc.Request("greet.joe"nil, time.Second)
 fmt.Println(err)
}

output

hello, joe
hello, sue
hello, bob
nats: no responders available for request

Limits-based Stream

package main


import (
 "encoding/json"
 "fmt"
 "log"
 "os"
 "time"


 "github.com/nats-io/nats.go"
)


func main() {

 url := os.Getenv("NATS_URL")
 if url == "" {
  url = nats.DefaultURL
 }


 nc, _ := nats.Connect(url)


 defer nc.Drain()


 js, _ := nc.JetStream()


 cfg := nats.StreamConfig{
  Name:     "EVENTS",
  Subjects: []string{"events.>"},
 }


 cfg.Storage = nats.FileStorage


 js.AddStream(&cfg)
 fmt.Println("created the stream")


 js.Publish("events.page_loaded"nil)
 js.Publish("events.mouse_clicked"nil)
 js.Publish("events.mouse_clicked"nil)
 js.Publish("events.page_loaded"nil)
 js.Publish("events.mouse_clicked"nil)
 js.Publish("events.input_focused"nil)
 fmt.Println("published 6 messages")


 js.PublishAsync("events.input_changed"nil)
 js.PublishAsync("events.input_blurred"nil)
 js.PublishAsync("events.key_pressed"nil)
 js.PublishAsync("events.input_focused"nil)
 js.PublishAsync("events.input_changed"nil)
 js.PublishAsync("events.input_blurred"nil)


 select {
 case <-js.PublishAsyncComplete():
  fmt.Println("published 6 messages")
 case <-time.After(time.Second):
  log.Fatal("publish took too long")
 }


 printStreamState(js, cfg.Name)


 // 限制消息数量
 cfg.MaxMsgs = 10
 js.UpdateStream(&cfg)
 fmt.Println("set max messages to 10")


 printStreamState(js, cfg.Name)

 // 限制消息大小
 cfg.MaxBytes = 300
 js.UpdateStream(&cfg)
 fmt.Println("set max bytes to 300")


 printStreamState(js, cfg.Name)

 // 限制消息最大存活时间
 cfg.MaxAge = time.Second
 js.UpdateStream(&cfg)
 fmt.Println("set max age to one second")


 printStreamState(js, cfg.Name)


 fmt.Println("sleeping one second...")
 time.Sleep(time.Second)


 printStreamState(js, cfg.Name)
}


func printStreamState(js nats.JetStreamContext, name string) {
 info, _ := js.StreamInfo(name)
 b, _ := json.MarshalIndent(info.State, """ ")
 fmt.Println("inspecting stream info")
 fmt.Println(string(b))
}

output

created the stream
published 6 messages
published 6 messages
inspecting stream info
{
 "messages": 12,
 "bytes": 594,
 "first_seq": 1,
 "first_ts""2022-07-22T13:04:47.814798969Z",
 "last_seq": 12,
 "last_ts""2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max messages to 10
inspecting stream info
{
 "messages": 10,
 "bytes": 496,
 "first_seq": 3,
 "first_ts""2022-07-22T13:04:47.815772395Z",
 "last_seq": 12,
 "last_ts""2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max bytes to 300
inspecting stream info
{
 "messages": 6,
 "bytes": 298,
 "first_seq": 7,
 "first_ts""2022-07-22T13:04:47.817220635Z",
 "last_seq": 12,
 "last_ts""2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
set max age to one second
inspecting stream info
{
 "messages": 6,
 "bytes": 298,
 "first_seq": 7,
 "first_ts""2022-07-22T13:04:47.817220635Z",
 "last_seq": 12,
 "last_ts""2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}
sleeping one second...
inspecting stream info
{
 "messages": 0,
 "bytes": 0,
 "first_seq": 13,
 "first_ts""1970-01-01T00:00:00Z",
 "last_seq": 12,
 "last_ts""2022-07-22T13:04:47.817297637Z",
 "consumer_count": 0
}

更多示例参考: https://natsbyexample.com/

reference

官方文档: https://docs.nats.io/

官方GitHub: https://github.com/nats-io/nats.go

代码示例: https://natsbyexample.com/

https://marco79423.net/articles/淺談-natsstan-和-jetstream-兩三事

分类:

后端

标签:

后端

作者介绍

邓嘉文Jarvan
V1