All Articles

NSQ Note

1. 前言

说到消息队列一般就会直接想起Kafka、Rocket、Rabbit之类的全功能消息队列。但实际上在应用程序制作的过程中,我们也很需要一些小而精巧的消息队列用来进行某些业务上的解耦,或者降低主业务的消耗、耗时等。 类似于redis作者的这个:antirez / disque。其中Nsq就是一款非常小巧而实用的消息队列,特别是它还基于Go语言编写,特别符合性能要求以及当前云原生的大方向。

2. 简介

Nsq的官方网站在:nsq.io

一些比较关键的文档位置:

此外,还有一份PPT,虽然年代比较久远了(2012年),但基本上把所有Nsq中比较重要的点都提及到了,如果需要快速过一遍的话,看这篇PPT也是个不错的选择:NSQ - NYC Golang Meetup

3. 入门使用

快速入门可以查看官方文档:Quick Start。Node的npm包在:nsqjs

3.1 组件

Nsq主要有3个组件:

  • nsqd:真正的消息队列服务进程,消息收发都是由客户端直接和nsqd沟通完成,没有任何第三方中间成员
  • nsqlookupd:消息队列的拓扑管理,主要负责:
    • nsqd的注册,存活管理
    • topic和channel的管理,所有的topic有哪些,分别分布在哪几个nsqd上lookupd都知道
    • 客户端会先到lookupd上查询目标的topic,找到之后再与某个匹配的nsqd进行连接,最后收发消息
  • nsqadmin:管理界面,功能非常丰富

关于各组件的启动参数,以及HTTP的API,都可以在官方文档中找到。需要记住的只有一点,末尾为0的端口都是TCP端口,1的端口都是HTTP端口。

3.2 监控

作为go语言编写的组件,理所当然的Nsq的各组件会暴露HTTP的/debug/pprof终端,作为性能查询的输出终端。

4. 细节深入

接下来的章节,我会把所有我个人觉得比较重要的Nsq相关内容都列进去。

nsqd消息不持久

默认行为是所有收到的消息都存储在nsqd的内存中,且没有任何内建的replication机制。所以默认情况下,节点挂掉的话,消息会丢失,如果这种情况不可接受,就需要自己做一些机制来规避消息丢失。

--mem-queue-size参数可以限定存储在内存中的消息数量,如果超量,则会将后续的消息存储在磁盘上。将该值设置为0可以将消息全部存储在磁盘上。

at least once

消息消费失败或超时会requeue,并在后续重新进行分发,所以一条消息可能会被多次投递到client进行消费,以保证该消息确定被消费完毕。消费者需要做好幂等,否则一条消息可能会别重复消费多次。

消息的投递是无序的

nsqd只管将消息按顺序投递出去,但消息可能因为网络原因等导致最终消费的顺序和投递的顺序不一致。

nsqlookupd延迟

服务发现被设计为最终一致。在扩散的过程中,可能会有时间差。客户端会最终从nsqlookupd获得所有的topic信息。

测试应用中很可能出现的一情况就是:nsqlookupd上并没有任何topic消息,producer创建,nsqd上创建出topic,并在之后同步topic信息到nsqlookupd。而在这之前早得多consumer就已经连上nsqlookupd并没有收到任何topic的信息,这中间会有一个时间差。consumer会静止在那里,直到配置的lookup间隔时间到了,然后consumer会再次查询nsqlookupd,并最终或得到topic信息以及所属的nsqd。

可以看下这几个comment:

性能

看官方文档:Performance

topic和channel

估计这是使用Nsq的程序员最初最难搞懂的东西。一般来说其他的消息队列都是将消息发布到topic中,然后多个consumer消费一个topic。但Nsq在topic和consumer之间还插入了一个channel的概念。

这里主要是看一个图:

需要理解的最重要的概念是,同一个topic下的所有channel,都会得到进入topic的消息的拷贝。同样一份消息,进入topic后会分别拷贝并进入每一个channel,每个channel都需要各自的consumer把消息消费掉,否则就会造成消息积压。

简单来说:如果你希望和一般的消息队列一样的 topic => consumers,那么就让所有该topic下的消费者使用同一个channel即可。

channel的设计让Nsq可以做一些其他消息队列做不到的用途,最为常见的是:消息广播。多个consumer都需要在某个业务触发的时候得到通知,那么就在某个topic下,让所有的consumer都定义一个自己的channel连接进去(topic_channel_node_1/…),这样所有的consumer都各自占据了一个channel,而所有的channel都会得到topic内消息的拷贝,所以他们会都得到同样的消息通知。

自动消失的topic和channel

topic和channel都是持久化的,默认情况下在创建成功后,他们都是不会自动消失的,时间长了之后清理就会比较麻烦。而上面的例子中那种类似消息通知的情况,其实topic和channel都是生命周期非常短暂的,一般我们希望他们在没用了之后就被销毁掉。

在topic和channel的名字后面添加#ephemeral可以做到这点,当所有连接在topic或channel上的producer/consumer都断开连接之后,nsqd就会自动删除该topic或channel。

需要注意的是,#ephemeral命名的topic/channel在超过--mem-queue-size的限制之后不会将消息写入磁盘,而是直接丢弃。这点行为上和普通的topic/channel非常不同,一定要小心注意。

max-in-flight

客户端可以设置该值(比如说10),然后客户端在连接到nsqd的时候会发送RDY 10过去,nsqd就会知道该client的同时处理能力。在接下来的消息推送过程中,nsqd会不断向该client推送消息,直到目前已分发且尚未反馈结果的消息数量达到该数值(10)为止。这是为了更好的性能,client程序如果实现了多线程的话,可以充分利用线程和CPU来消费消息,而不是只有一个线程在工作,其他的都在闲置。

对于node程序来说,这个数值就没有意义了,因为node进程只有一个工作线程。

还可以看下这个comment:Question about concurrency and Max in Flight #221

减轻GC压力

见官方文档:Reducing GC Pressure

消息生命周期

一条消息的生命周期(消费端)主要是控制在client手中,这点和普通的消息队列的设计理念完全不同,这里需要花点时间稍微整理下。

消息的创建是producer连接到nsqd,并进行publish的操作,这点没有任何问题。创建成功消息就收到了,失败的话,则消息在nsqd上不存在。

消息的消费由nsqd向连接的consumer进行推送:

  • 消息已分发且尚未反馈(未到超时时间),状态:In-Flight
  • 消息已分发且超时没有反馈(该超时的时长是在客户端设置的!),状态:Time Out,该消息会被nsqd重新requeue(REQ)
    • 消息在处理中,且耗时很长,可以由客户端进行 Touch,这样该消息就不会Time Out
  • 消息发送后,可以由客户端主动进行requeue(REQ)
    • requeue行为可以附带设置延迟(Deferred)
  • 消息发送后,如果客户端出错被认为是有问题的,客户端自身需要恢复,则客户端可以将该消息反馈为backoff,这个操作会导致:
    • nsqd会将该消息requeue,消息本身会再次(如果设置延迟的话,也会Deferred)分发
    • 此外这个操作会导致客户端将自身的RDY设置为0,意味着这个客户端短时间内将拒绝处理任何消息
  • 消息发送后,如果该消息之前被requeue则该消息会附带Attempts(重试)数,客户端在尝试失败后,会匹配失败次数,如果超过限制,则该消息会被DISCARD,即客户端会直接将该消息标记为Finished(FIN)
    • 如果客户端的max attempts设置为0,则可以无限次重试,不会被discard
  • 消息发送后,可以由客户端标记为完成Finished(FIN)

可以看到Nsq的客户端有很大的权利,特别是失败重试的决策是由客户端做的,如果超限消息就会被丢弃,标记为完成。

更多关于Timeout相关,请见comment:Timeout options #1028

关于requeue和backoff的详细分析,可以看这篇:NSQ Requeue and Backoff

Admin Metrics

Admin统计数据里的一些词汇可以简单解释下:

Message Queues:

  • Depth:当前存储在内存和磁盘上的消息总量(尚未被分发)
  • In-Flight:目前已经被分发,但尚未被标记为 完成(FIN)、重新入队(REQ)或是超时的消息总量
  • Deferred:目前被重新入队且显式标记为推迟处理并且尚未到重新分发阶段的消息总量

Statistics:

  • Requeued:消息因超时或被显式要求而重新入队的次数
  • Timed Out:消息因超过设定的超时时长未响应,而被标记为超时的次数
  • Messages:在节点启动之后收到的消息总数
  • Rate:过去两次statsd间隔间,每秒新收到的消息数量
  • Connections:连接总数

Client Connections:

  • NSQd Host:Address of the nsqd node this client is connected to.
  • In-flight:当前发送给该client等待返回的消息数量
  • Ready Count:当前client节点最高in-flight可能的消息数量
  • Finished:所有当前client节点标记为FIN的消息数量
  • Requeued:同上,REQ
  • Messages:所有发送给该client节点的消息数量

5. 使用范例

光说理论上的东西比较枯燥,实践操作也是比较重要的,下面可以通过几个例子来看下Nsq的一些行为。

pubBeforeSub

If there is no topic on nsqd (first execution), reader will suck for a long time. Since at the first time reader connect to lookupd, there hasn’t been updated with the published topics info yet. So it will suck until readers retrieve topics info next time.

pubBeforeSub:
    Two writers initialized
    Two writers publish 3 messages each into topic "TOPIC1" (total 6)
    One of the writer publish 1 message into topic "TOPIC2_UNI"
    Two readers created for topic "TOPIC1" with the same channel "TOPIC1_READER_CHANNEL"
    One reader created for topic "TOPIC2_UNI" with channel "TOPIC2_UNI#ephemeral"
    -
    Two readers share the total 6 messages in topic "TOPIC1"
    One reader consume the message in topic "TOPIC2_UNI"
➜  couchnsq git:(master) ✗ ./bash/nsq_run.sh
  Nsq:Writer:bgF6l bgF6l: ready +0ms
  Nsq:Writer:Plwtx Plwtx: ready +0ms
  Nsq:Reader:oiFQg oiFQg: nsqd connected: "10.10.2.126:4250" +0ms
  Nsq:Reader:oiFQg oiFQg: nsqd connected: "10.10.2.126:4150" +0ms
  Nsq:Reader:MmLbX MmLbX: nsqd connected: "10.10.2.126:4250" +0ms
  Nsq:Reader:MmLbX MmLbX: nsqd connected: "10.10.2.126:4150" +0ms
  Nsq:Reader:8hpju 8hpju: nsqd connected: "10.10.2.126:4250" +0ms
  Nsq:Reader:oiFQg oiFQg: message 0e38318eeb01e000 got: "{"msg":"TOPIC1:jQ77FfY8lS"}" +16ms
  Nsq:Message oiFQg: message 0e38318eeb01e000 respond: "0", "FIN 0e38318eeb01e000
  Nsq:Message " +0ms
  Nsq:Reader:MmLbX MmLbX: message 0e38318eeb01e001 got: "{"msg":"TOPIC1:ONvD8mH4Jq"}" +16ms
  Nsq:Message MmLbX: message 0e38318eeb01e001 respond: "0", "FIN 0e38318eeb01e001
  Nsq:Message " +1ms
  Nsq:Reader:8hpju 8hpju: ready +17ms
  Nsq:Reader:MmLbX MmLbX: message 0e38318eeb01e002 got: "{"msg":"TOPIC1:7xjankTlzU"}" +3ms
  Nsq:Message MmLbX: message 0e38318eeb01e002 respond: "0", "FIN 0e38318eeb01e002
  Nsq:Message " +3ms
  Nsq:Reader:oiFQg oiFQg: message 0e38318eeb435000 got: "{"msg":"TOPIC1:bQOiIt7BLE"}" +5ms
  Nsq:Message oiFQg: message 0e38318eeb435000 respond: "0", "FIN 0e38318eeb435000
  Nsq:Message " +1ms
  Nsq:Reader:8hpju 8hpju: message 0e38318eec435000 got: "{"msg":"TOPIC2_UNI:Ms10V0jtpc"}" +3ms
  Nsq:Message 8hpju: message 0e38318eec435000 respond: "0", "FIN 0e38318eec435000
  Nsq:Message " +1ms
  Nsq:Reader:MmLbX MmLbX: message 0e38318eeb435001 got: "{"msg":"TOPIC1:APZJDBiVP9"}" +3ms
  Nsq:Message MmLbX: message 0e38318eeb435001 respond: "0", "FIN 0e38318eeb435001
  Nsq:Message " +1ms
  Nsq:Reader:MmLbX MmLbX: message 0e38318eeb435002 got: "{"msg":"TOPIC1:CXF6AQVBDr"}" +53ms
  Nsq:Message MmLbX: message 0e38318eeb435002 respond: "0", "FIN 0e38318eeb435002
  Nsq:Message " +53ms
^C  Nsq:Writer:bgF6l bgF6l: closed +1m
  Nsq:Writer:Plwtx Plwtx: closed +1m
  Nsq:Reader:oiFQg oiFQg: notReady +26s
  Nsq:Reader:MmLbX MmLbX: notReady +25s
  Nsq:Reader:8hpju 8hpju: notReady +26s
NsqTest: all cleared

sameMessageMultiChannel

Multiple channels subscribe the same topic will receive duplicate messages.

sameMessageMultiChannel:
    Two writers initialized
    One of the writer publish 1 message into topic "TOPIC_SAME"
    Two readers created for topic "TOPIC_SAME" with two channels "TOPIC_SAME_CHANNEL_1/2" subscribe the same topic
    -
    Two readers receive the same message
➜  couchnsq git:(master) ✗ ./bash/nsq_run.sh
  Nsq:Writer:oeAOC oeAOC: ready +0ms
  Nsq:Writer:EdW9U EdW9U: ready +0ms
  Nsq:Reader:SuqpY SuqpY: nsqd connected: "10.10.2.126:4250" +0ms
  Nsq:Reader:vKCk9 vKCk9: nsqd connected: "10.10.2.126:4250" +0ms
  Nsq:Reader:vKCk9 vKCk9: ready +5ms
  Nsq:Reader:SuqpY SuqpY: ready +6ms
  Nsq:Reader:vKCk9 vKCk9: message 0e38322140035000 got: "{"msg":"TOPIC_SAME:Hn4tB5UA0N"}" +3ms
  Nsq:Message vKCk9: message 0e38322140035000 respond: "0", "FIN 0e38322140035000
  Nsq:Message " +0ms
  Nsq:Reader:SuqpY SuqpY: message 0e38322140035000 got: "{"msg":"TOPIC_SAME:Hn4tB5UA0N"}" +3ms
  Nsq:Message SuqpY: message 0e38322140035000 respond: "0", "FIN 0e38322140035000
  Nsq:Message " +0ms
^C  Nsq:Writer:oeAOC oeAOC: closed +3s
  Nsq:Writer:EdW9U EdW9U: closed +3s
  Nsq:Reader:vKCk9 vKCk9: notReady +3s
  Nsq:Reader:SuqpY SuqpY: notReady +3s
NsqTest: all cleared

subBeforePub

If the topic not deleted before execution (or first run). Reader will suck for a long time, the reason have been explained previously.

subBeforePub:
    Two writers initialized
    One reader create to subscribe an non-existing topic "TOPIC_UNKNOWN"
    One of the writer wait 1000ms, then publish 1 message into topic "TOPIC_UNKNOWN"
    -
    One reader wait for a long time then receive the message
➜  couchnsq git:(master) ✗ ./bash/nsq_run.sh
  Nsq:Writer:2Fr6p 2Fr6p: ready +0ms
  Nsq:Writer:vFIw5 vFIw5: ready +0ms
  Nsq:Reader:uR6Lp uR6Lp: nsqd connected: "10.10.2.126:4150" +0ms
  Nsq:Reader:uR6Lp uR6Lp: ready +5ms
  Nsq:Reader:uR6Lp uR6Lp: message 0e38323df741e000 got: "{"msg":"TOPIC_UNKNOWN:kCwBySUMY0"}" +984ms
  Nsq:Message uR6Lp: message 0e38323df741e000 respond: "0", "FIN 0e38323df741e000
  Nsq:Message " +0ms
^C  Nsq:Writer:2Fr6p 2Fr6p: closed +3s
  Nsq:Writer:vFIw5 vFIw5: closed +3s
  Nsq:Reader:uR6Lp uR6Lp: notReady +2s
NsqTest: all cleared

errorMessage

Send error message, and message would be requeue with backoff in the reader, finally it would be discarded (finished).

{
  "maxAttempts": 3
}
➜  couchnsq git:(master) ✗ ./bash/nsq_run.sh
  Nsq:Writer:5P9rd 5P9rd: ready +0ms
  Nsq:Writer:KKhyz KKhyz: ready +0ms
  Nsq:Reader:hSWmK hSWmK: nsqd connected: "10.10.2.126:4150" +0ms
  Nsq:Reader:hSWmK hSWmK: ready +5ms
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +4ms
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 1 +1ms
  Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +0ms
  Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
  Nsq:Message " +1ms
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +4s
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 2 +0ms
  Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +4s
  Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
  Nsq:Message " +0ms
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +7s
  Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 3 +0ms
  Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +7s
  Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
  Nsq:Message " +0ms
  Nsq:Reader:hSWmK hSWmK: message discarded: "{"msg":"should throw error:WGZ9VlLLPa"}" +11s
^C  Nsq:Writer:5P9rd 5P9rd: closed +23s
  Nsq:Writer:KKhyz KKhyz: closed +23s
  Nsq:Reader:hSWmK hSWmK: notReady +2s
NsqTest: all cleared

EOF

Published 2020/12/31

Some tech & personal blog posts