说到消息队列一般就会直接想起Kafka、Rocket、Rabbit之类的全功能消息队列。但实际上在应用程序制作的过程中,我们也很需要一些小而精巧的消息队列用来进行某些业务上的解耦,或者降低主业务的消耗、耗时等。 类似于redis作者的这个:antirez / disque。其中Nsq就是一款非常小巧而实用的消息队列,特别是它还基于Go语言编写,特别符合性能要求以及当前云原生的大方向。
Nsq的官方网站在:nsq.io。
一些比较关键的文档位置:
此外,还有一份PPT,虽然年代比较久远了(2012年),但基本上把所有Nsq中比较重要的点都提及到了,如果需要快速过一遍的话,看这篇PPT也是个不错的选择:NSQ - NYC Golang Meetup。
快速入门可以查看官方文档:Quick Start。Node的npm包在:nsqjs。
Nsq主要有3个组件:
关于各组件的启动参数,以及HTTP的API,都可以在官方文档中找到。需要记住的只有一点,末尾为0的端口都是TCP端口,1的端口都是HTTP端口。
作为go语言编写的组件,理所当然的Nsq的各组件会暴露HTTP的/debug/pprof
终端,作为性能查询的输出终端。
接下来的章节,我会把所有我个人觉得比较重要的Nsq相关内容都列进去。
默认行为是所有收到的消息都存储在nsqd的内存中,且没有任何内建的replication机制。所以默认情况下,节点挂掉的话,消息会丢失,如果这种情况不可接受,就需要自己做一些机制来规避消息丢失。
--mem-queue-size
参数可以限定存储在内存中的消息数量,如果超量,则会将后续的消息存储在磁盘上。将该值设置为0可以将消息全部存储在磁盘上。
消息消费失败或超时会requeue,并在后续重新进行分发,所以一条消息可能会被多次投递到client进行消费,以保证该消息确定被消费完毕。消费者需要做好幂等,否则一条消息可能会别重复消费多次。
nsqd只管将消息按顺序投递出去,但消息可能因为网络原因等导致最终消费的顺序和投递的顺序不一致。
服务发现被设计为最终一致。在扩散的过程中,可能会有时间差。客户端会最终从nsqlookupd获得所有的topic信息。
测试应用中很可能出现的一情况就是:nsqlookupd上并没有任何topic消息,producer创建,nsqd上创建出topic,并在之后同步topic信息到nsqlookupd。而在这之前早得多consumer就已经连上nsqlookupd并没有收到任何topic的信息,这中间会有一个时间差。consumer会静止在那里,直到配置的lookup间隔时间到了,然后consumer会再次查询nsqlookupd,并最终或得到topic信息以及所属的nsqd。
可以看下这几个comment:
看官方文档:Performance。
估计这是使用Nsq的程序员最初最难搞懂的东西。一般来说其他的消息队列都是将消息发布到topic中,然后多个consumer消费一个topic。但Nsq在topic和consumer之间还插入了一个channel的概念。
这里主要是看一个图:
需要理解的最重要的概念是,同一个topic下的所有channel,都会得到进入topic的消息的拷贝。同样一份消息,进入topic后会分别拷贝并进入每一个channel,每个channel都需要各自的consumer把消息消费掉,否则就会造成消息积压。
简单来说:如果你希望和一般的消息队列一样的 topic => consumers,那么就让所有该topic下的消费者使用同一个channel即可。
channel的设计让Nsq可以做一些其他消息队列做不到的用途,最为常见的是:消息广播。多个consumer都需要在某个业务触发的时候得到通知,那么就在某个topic下,让所有的consumer都定义一个自己的channel连接进去(topic_channel_node_1/…),这样所有的consumer都各自占据了一个channel,而所有的channel都会得到topic内消息的拷贝,所以他们会都得到同样的消息通知。
topic和channel都是持久化的,默认情况下在创建成功后,他们都是不会自动消失的,时间长了之后清理就会比较麻烦。而上面的例子中那种类似消息通知的情况,其实topic和channel都是生命周期非常短暂的,一般我们希望他们在没用了之后就被销毁掉。
在topic和channel的名字后面添加#ephemeral
可以做到这点,当所有连接在topic或channel上的producer/consumer都断开连接之后,nsqd就会自动删除该topic或channel。
需要注意的是,#ephemeral
命名的topic/channel在超过--mem-queue-size
的限制之后不会将消息写入磁盘,而是直接丢弃。这点行为上和普通的topic/channel非常不同,一定要小心注意。
客户端可以设置该值(比如说10),然后客户端在连接到nsqd的时候会发送RDY 10
过去,nsqd就会知道该client的同时处理能力。在接下来的消息推送过程中,nsqd会不断向该client推送消息,直到目前已分发且尚未反馈结果的消息数量达到该数值(10)为止。这是为了更好的性能,client程序如果实现了多线程的话,可以充分利用线程和CPU来消费消息,而不是只有一个线程在工作,其他的都在闲置。
对于node程序来说,这个数值就没有意义了,因为node进程只有一个工作线程。
还可以看下这个comment:Question about concurrency and Max in Flight #221。
见官方文档:Reducing GC Pressure。
一条消息的生命周期(消费端)主要是控制在client手中,这点和普通的消息队列的设计理念完全不同,这里需要花点时间稍微整理下。
消息的创建是producer连接到nsqd,并进行publish的操作,这点没有任何问题。创建成功消息就收到了,失败的话,则消息在nsqd上不存在。
消息的消费由nsqd向连接的consumer进行推送:
可以看到Nsq的客户端有很大的权利,特别是失败重试的决策是由客户端做的,如果超限消息就会被丢弃,标记为完成。
更多关于Timeout相关,请见comment:Timeout options #1028。
关于requeue和backoff的详细分析,可以看这篇:NSQ Requeue and Backoff。
Admin统计数据里的一些词汇可以简单解释下:
Message Queues:
Statistics:
Client Connections:
光说理论上的东西比较枯燥,实践操作也是比较重要的,下面可以通过几个例子来看下Nsq的一些行为。
If there is no topic on nsqd
(first execution), reader will suck for a long time. Since at the first time reader connect to lookupd, there hasn’t been updated with the published topics info yet. So it will suck until readers retrieve topics info next time.
pubBeforeSub:
Two writers initialized
Two writers publish 3 messages each into topic "TOPIC1" (total 6)
One of the writer publish 1 message into topic "TOPIC2_UNI"
Two readers created for topic "TOPIC1" with the same channel "TOPIC1_READER_CHANNEL"
One reader created for topic "TOPIC2_UNI" with channel "TOPIC2_UNI#ephemeral"
-
Two readers share the total 6 messages in topic "TOPIC1"
One reader consume the message in topic "TOPIC2_UNI"
➜ couchnsq git:(master) ✗ ./bash/nsq_run.sh
Nsq:Writer:bgF6l bgF6l: ready +0ms
Nsq:Writer:Plwtx Plwtx: ready +0ms
Nsq:Reader:oiFQg oiFQg: nsqd connected: "10.10.2.126:4250" +0ms
Nsq:Reader:oiFQg oiFQg: nsqd connected: "10.10.2.126:4150" +0ms
Nsq:Reader:MmLbX MmLbX: nsqd connected: "10.10.2.126:4250" +0ms
Nsq:Reader:MmLbX MmLbX: nsqd connected: "10.10.2.126:4150" +0ms
Nsq:Reader:8hpju 8hpju: nsqd connected: "10.10.2.126:4250" +0ms
Nsq:Reader:oiFQg oiFQg: message 0e38318eeb01e000 got: "{"msg":"TOPIC1:jQ77FfY8lS"}" +16ms
Nsq:Message oiFQg: message 0e38318eeb01e000 respond: "0", "FIN 0e38318eeb01e000
Nsq:Message " +0ms
Nsq:Reader:MmLbX MmLbX: message 0e38318eeb01e001 got: "{"msg":"TOPIC1:ONvD8mH4Jq"}" +16ms
Nsq:Message MmLbX: message 0e38318eeb01e001 respond: "0", "FIN 0e38318eeb01e001
Nsq:Message " +1ms
Nsq:Reader:8hpju 8hpju: ready +17ms
Nsq:Reader:MmLbX MmLbX: message 0e38318eeb01e002 got: "{"msg":"TOPIC1:7xjankTlzU"}" +3ms
Nsq:Message MmLbX: message 0e38318eeb01e002 respond: "0", "FIN 0e38318eeb01e002
Nsq:Message " +3ms
Nsq:Reader:oiFQg oiFQg: message 0e38318eeb435000 got: "{"msg":"TOPIC1:bQOiIt7BLE"}" +5ms
Nsq:Message oiFQg: message 0e38318eeb435000 respond: "0", "FIN 0e38318eeb435000
Nsq:Message " +1ms
Nsq:Reader:8hpju 8hpju: message 0e38318eec435000 got: "{"msg":"TOPIC2_UNI:Ms10V0jtpc"}" +3ms
Nsq:Message 8hpju: message 0e38318eec435000 respond: "0", "FIN 0e38318eec435000
Nsq:Message " +1ms
Nsq:Reader:MmLbX MmLbX: message 0e38318eeb435001 got: "{"msg":"TOPIC1:APZJDBiVP9"}" +3ms
Nsq:Message MmLbX: message 0e38318eeb435001 respond: "0", "FIN 0e38318eeb435001
Nsq:Message " +1ms
Nsq:Reader:MmLbX MmLbX: message 0e38318eeb435002 got: "{"msg":"TOPIC1:CXF6AQVBDr"}" +53ms
Nsq:Message MmLbX: message 0e38318eeb435002 respond: "0", "FIN 0e38318eeb435002
Nsq:Message " +53ms
^C Nsq:Writer:bgF6l bgF6l: closed +1m
Nsq:Writer:Plwtx Plwtx: closed +1m
Nsq:Reader:oiFQg oiFQg: notReady +26s
Nsq:Reader:MmLbX MmLbX: notReady +25s
Nsq:Reader:8hpju 8hpju: notReady +26s
NsqTest: all cleared
Multiple channels subscribe the same topic will receive duplicate messages.
sameMessageMultiChannel:
Two writers initialized
One of the writer publish 1 message into topic "TOPIC_SAME"
Two readers created for topic "TOPIC_SAME" with two channels "TOPIC_SAME_CHANNEL_1/2" subscribe the same topic
-
Two readers receive the same message
➜ couchnsq git:(master) ✗ ./bash/nsq_run.sh
Nsq:Writer:oeAOC oeAOC: ready +0ms
Nsq:Writer:EdW9U EdW9U: ready +0ms
Nsq:Reader:SuqpY SuqpY: nsqd connected: "10.10.2.126:4250" +0ms
Nsq:Reader:vKCk9 vKCk9: nsqd connected: "10.10.2.126:4250" +0ms
Nsq:Reader:vKCk9 vKCk9: ready +5ms
Nsq:Reader:SuqpY SuqpY: ready +6ms
Nsq:Reader:vKCk9 vKCk9: message 0e38322140035000 got: "{"msg":"TOPIC_SAME:Hn4tB5UA0N"}" +3ms
Nsq:Message vKCk9: message 0e38322140035000 respond: "0", "FIN 0e38322140035000
Nsq:Message " +0ms
Nsq:Reader:SuqpY SuqpY: message 0e38322140035000 got: "{"msg":"TOPIC_SAME:Hn4tB5UA0N"}" +3ms
Nsq:Message SuqpY: message 0e38322140035000 respond: "0", "FIN 0e38322140035000
Nsq:Message " +0ms
^C Nsq:Writer:oeAOC oeAOC: closed +3s
Nsq:Writer:EdW9U EdW9U: closed +3s
Nsq:Reader:vKCk9 vKCk9: notReady +3s
Nsq:Reader:SuqpY SuqpY: notReady +3s
NsqTest: all cleared
If the topic not deleted before execution (or first run). Reader will suck for a long time, the reason have been explained previously.
subBeforePub:
Two writers initialized
One reader create to subscribe an non-existing topic "TOPIC_UNKNOWN"
One of the writer wait 1000ms, then publish 1 message into topic "TOPIC_UNKNOWN"
-
One reader wait for a long time then receive the message
➜ couchnsq git:(master) ✗ ./bash/nsq_run.sh
Nsq:Writer:2Fr6p 2Fr6p: ready +0ms
Nsq:Writer:vFIw5 vFIw5: ready +0ms
Nsq:Reader:uR6Lp uR6Lp: nsqd connected: "10.10.2.126:4150" +0ms
Nsq:Reader:uR6Lp uR6Lp: ready +5ms
Nsq:Reader:uR6Lp uR6Lp: message 0e38323df741e000 got: "{"msg":"TOPIC_UNKNOWN:kCwBySUMY0"}" +984ms
Nsq:Message uR6Lp: message 0e38323df741e000 respond: "0", "FIN 0e38323df741e000
Nsq:Message " +0ms
^C Nsq:Writer:2Fr6p 2Fr6p: closed +3s
Nsq:Writer:vFIw5 vFIw5: closed +3s
Nsq:Reader:uR6Lp uR6Lp: notReady +2s
NsqTest: all cleared
Send error message, and message would be requeue with backoff in the reader, finally it would be discarded (finished).
{
"maxAttempts": 3
}
➜ couchnsq git:(master) ✗ ./bash/nsq_run.sh
Nsq:Writer:5P9rd 5P9rd: ready +0ms
Nsq:Writer:KKhyz KKhyz: ready +0ms
Nsq:Reader:hSWmK hSWmK: nsqd connected: "10.10.2.126:4150" +0ms
Nsq:Reader:hSWmK hSWmK: ready +5ms
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +4ms
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 1 +1ms
Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +0ms
Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
Nsq:Message " +1ms
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +4s
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 2 +0ms
Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +4s
Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
Nsq:Message " +0ms
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 got: "{"msg":"should throw error:WGZ9VlLLPa"}" +7s
Nsq:Reader:hSWmK hSWmK: message 0e3832a5bb01e000 error & requeue, attempts: 3 +0ms
Nsq:Message hSWmK: message 0e3832a5bb01e000 backoff +7s
Nsq:Message hSWmK: message 0e3832a5bb01e000 respond: "1", "REQ 0e3832a5bb01e000 2000
Nsq:Message " +0ms
Nsq:Reader:hSWmK hSWmK: message discarded: "{"msg":"should throw error:WGZ9VlLLPa"}" +11s
^C Nsq:Writer:5P9rd 5P9rd: closed +23s
Nsq:Writer:KKhyz KKhyz: closed +23s
Nsq:Reader:hSWmK hSWmK: notReady +2s
NsqTest: all cleared
EOF