博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
NSQ笔记 - golang
阅读量:6804 次
发布时间:2019-06-26

本文共 5467 字,大约阅读时间需要 18 分钟。

一.  单机nsqd

nsqd 是一个守护进程,负责接收,排队,投递消息给客户端

简单的说,真正干活的就是这个服务,它主要负责message的收发,队列的维护。nsqd会默认监听一个tcp端口(4150)和一个http端口(4151)以及一个可选的https端口

总的来说,nsqd 具有以下功能或特性

  • 对订阅了同一个topic,同一个channel的消费者使用负载均衡策略(不是轮询)

  • 只要channel存在,即使没有该channel的消费者,也会将生产者的message缓存到队列中(注意消息的过期处理)

  • 保证队列中的message至少会被消费一次,即使nsqd退出,也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)

  • 限定内存占用,能够配置nsqd中每个channel队列在内存中缓存的message数量,一旦超出,message将被缓存到磁盘中

  • topic,channel一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的topic和channel,避免资源的浪费

由于nsq已经有负载均衡等机制,所以在使用过程中,当多个消费者时候,就会触发.下面例子,可以用多个接收者试试.

1. 启动一个nsqd

nsqd --lookupd-tcp-address=127.0.0.1:4160复制代码

显示:

bogon:~ xuanpro$ nsqd --lookupd-tcp-address=127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.170663 nsqd v0.2.31 (built w/go1.3.1)[nsqd] 2018/04/08 14:02:43.170753 ID: 660[nsqd] 2018/04/08 14:02:43.170953 TOPIC(test): created[nsqd] 2018/04/08 14:02:43.172795 TOPIC(test): new channel(test-channel)[nsqd] 2018/04/08 14:02:43.172810 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.173508 DISKQUEUE(test:test-channel): readOne() opened test:test-channel.diskqueue.000000.dat[nsqd] 2018/04/08 14:02:43.173599 LOOKUP: adding peer 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173610 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173732 TCP: listening on [::]:4150[nsqd] 2018/04/08 14:02:43.173748 HTTP: listening on [::]:4151[nsqd] 2018/04/08 14:02:43.173819 LOOKUPD(127.0.0.1:4160): topic REGISTER test[nsqd] 2018/04/08 14:02:43.173824 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173896 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.174423 LOOKUPD(127.0.0.1:4160): ERROR REGISTER test - dial tcp 127.0.0.1:4160: connection refused[nsqd] 2018/04/08 14:02:43.174438 LOOKUPD(127.0.0.1:4160): channel REGISTER test test-channel[nsqd] 2018/04/08 14:02:43.174440 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.174515 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.175012 LOOKUPD(127.0.0.1:4160): ERROR REGISTER test test-channel - dial tcp 127.0.0.1:4160: connection refused复制代码

2. 一个发送端 send.go

//Nsq发送测试package mainimport (	// "bufio"	"fmt"	"github.com/nsqio/go-nsq"	// "os"	"time")var producer *nsq.Producer// 主函数func main() {	strIP1 := "127.0.0.1:4150"	strIP2 := "127.0.0.1:4152"	InitProducer(strIP1)	running := true	//读取控制台输入	// reader := bufio.NewReader(os.Stdin)	index := 1	for running {		// data, _, _ := reader.ReadLine()		fmt.Println(index)		command := fmt.Sprintf("lzx , i am number %d !", index)		// command := "hello God , i am " + string(index) + " !"		fmt.Println(command)		index++		if command == "stop" {			running = false		}		for err := Publish("test", command); err != nil; err = Publish("test", command) {			//切换IP重连			strIP1, strIP2 = strIP2, strIP1			InitProducer(strIP1)		}		time.Sleep(time.Second)	}	//关闭	producer.Stop()}// 初始化生产者func InitProducer(str string) {	var err error	fmt.Println("address: ", str)	producer, err = nsq.NewProducer(str, nsq.NewConfig())	if err != nil {		panic(err)	}}//发布消息func Publish(topic string, message string) error {	var err error	if producer != nil {		if message == "" { //不能发布空串,否则会导致error			return nil		}		err = producer.Publish(topic, []byte(message)) // 发布消息		return err	}	return fmt.Errorf("producer is nil", err)}复制代码

3. 一个接收端 receive.go

//Nsq接收测试package mainimport (	"fmt"	"time"	"github.com/nsqio/go-nsq")// 消费者type ConsumerT struct{}// 主函数func main() {	InitConsumer("test", "test-channel", "127.0.0.1:4161")	for {		time.Sleep(time.Second * 10)	}}//处理消息func (*ConsumerT) HandleMessage(msg *nsq.Message) error {	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))	return nil}//初始化消费者func InitConsumer(topic string, channel string, address string) {	cfg := nsq.NewConfig()	cfg.LookupdPollInterval = time.Second          //设置重连时间	c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者	if err != nil {		panic(err)	}	c.SetLogger(nil, 0)        //屏蔽系统日志	c.AddHandler(&ConsumerT{}) // 添加消费者接口	//建立NSQLookupd连接	if err := c.ConnectToNSQLookupd(address); err != nil {		panic(err)	}	//建立多个nsqd连接	if err := c.ConnectToNSQDs([]string{
"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil { panic(err) } // 建立一个nsqd连接 // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
// panic(err) // }}复制代码

4. 运行send.go receive.go  (记得启动nsqd)

send :

1lzx , i am number 1 !2018/04/08 14:01:46 INF    1 (127.0.0.1:4150) connecting to nsqd2lzx , i am number 2 !3lzx , i am number 3 !4lzx , i am number 4 !复制代码

receive :

receive 127.0.0.1:4150 message: lzx , i am number 1 !receive 127.0.0.1:4150 message: lzx , i am number 2 !receive 127.0.0.1:4150 message: lzx , i am number 3 !receive 127.0.0.1:4150 message: lzx , i am number 4 !复制代码

5. 生成一个 nsqd.660.dat 文件,内容是:

{
"topics":[{
"channels":[{
"name":"test-channel","paused":false}],"name":"test","paused":false}],"version":"0.2.31"}复制代码

6. 注意

代码比较简单,实现了一个单机版消息队列的功能.send端每一秒发送一条信息,receive端直接去队列取.receive端会阻塞,需要重启.代码后期自己优化吧.这里是简单的例子.

二.  nsq集群

1.  nsqlookupd

nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息

简单的说nsqlookupd就是中心管理服务,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。同时为客户端提供查询功能

总的来说,nsqlookupd具有以下功能或特性

  • 唯一性,在一个Nsq服务中只有一个nsqlookupd服务。当然也可以在集群中部署多个nsqlookupd,但它们之间是没有关联的

  • 去中心化,即使nsqlookupd崩溃,也会不影响正在运行的nsqd服务

  • 充当nsqd和naqadmin信息交互的中间件

  • 提供一个http查询服务,给客户端定时更新nsqd的地址目录

未完....

(以上引用了别的链接,链接出处: https://studygolang.com/articles/9756)

推荐:

NSQ源码解析链接: https://blog.csdn.net/column/details/15449.html

你可能感兴趣的文章
Bitmap
查看>>
(转)arcgis面状文件坐标导出方法
查看>>
LPC824 周立功AM824学习笔记
查看>>
SQL数据库学习之路(三)
查看>>
开发https应用
查看>>
js轮换广告
查看>>
墨菲定律
查看>>
Maven
查看>>
MovieReview—Kingsman THE SECRET SERVICE(王牌特工之特工学院)
查看>>
C语言成长学习题(九)
查看>>
银行里的迷宫
查看>>
codevs——1294 全排列
查看>>
9.13模拟试题
查看>>
自动生成单据编号
查看>>
[noip模拟]画展<队列的基础知识>
查看>>
Java时间转换类实现
查看>>
ios之UITextfield (2)
查看>>
appium自动化测试 环境搭建
查看>>
iOS中的webView加载HTML
查看>>
popupwindow使用之异常:unable to add window -- token null is not valid
查看>>