一. 单机nsqd
nsqd 是一个守护进程,负责接收,排队,投递消息给客户端
简单的说,真正干活的就是这个服务,它主要负责message的收发,队列的维护。nsqd会默认监听一个tcp端口(4150)和一个http端口(4151)以及一个可选的https端口
总的来说,nsqd 具有以下功能或特性
对订阅了同一个topic,同一个channel的消费者使用负载均衡策略(不是轮询)
只要channel存在,即使没有该channel的消费者,也会将生产者的message缓存到队列中(注意消息的过期处理)
保证队列中的message至少会被消费一次,即使nsqd退出,也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)
限定内存占用,能够配置nsqd中每个channel队列在内存中缓存的message数量,一旦超出,message将被缓存到磁盘中
topic,channel一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的topic和channel,避免资源的浪费
由于nsq已经有负载均衡等机制,所以在使用过程中,当多个消费者时候,就会触发.下面例子,可以用多个接收者试试.
1. 启动一个nsqd
nsqd --lookupd-tcp-address=127.0.0.1:4160复制代码
显示:
bogon:~ xuanpro$ nsqd --lookupd-tcp-address=127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.170663 nsqd v0.2.31 (built w/go1.3.1)[nsqd] 2018/04/08 14:02:43.170753 ID: 660[nsqd] 2018/04/08 14:02:43.170953 TOPIC(test): created[nsqd] 2018/04/08 14:02:43.172795 TOPIC(test): new channel(test-channel)[nsqd] 2018/04/08 14:02:43.172810 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.173508 DISKQUEUE(test:test-channel): readOne() opened test:test-channel.diskqueue.000000.dat[nsqd] 2018/04/08 14:02:43.173599 LOOKUP: adding peer 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173610 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173732 TCP: listening on [::]:4150[nsqd] 2018/04/08 14:02:43.173748 HTTP: listening on [::]:4151[nsqd] 2018/04/08 14:02:43.173819 LOOKUPD(127.0.0.1:4160): topic REGISTER test[nsqd] 2018/04/08 14:02:43.173824 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.173896 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.174423 LOOKUPD(127.0.0.1:4160): ERROR REGISTER test - dial tcp 127.0.0.1:4160: connection refused[nsqd] 2018/04/08 14:02:43.174438 LOOKUPD(127.0.0.1:4160): channel REGISTER test test-channel[nsqd] 2018/04/08 14:02:43.174440 LOOKUP connecting to 127.0.0.1:4160[nsqd] 2018/04/08 14:02:43.174515 NSQ: persisting topic/channel metadata to nsqd.660.dat[nsqd] 2018/04/08 14:02:43.175012 LOOKUPD(127.0.0.1:4160): ERROR REGISTER test test-channel - dial tcp 127.0.0.1:4160: connection refused复制代码
2. 一个发送端 send.go
//Nsq发送测试package mainimport ( // "bufio" "fmt" "github.com/nsqio/go-nsq" // "os" "time")var producer *nsq.Producer// 主函数func main() { strIP1 := "127.0.0.1:4150" strIP2 := "127.0.0.1:4152" InitProducer(strIP1) running := true //读取控制台输入 // reader := bufio.NewReader(os.Stdin) index := 1 for running { // data, _, _ := reader.ReadLine() fmt.Println(index) command := fmt.Sprintf("lzx , i am number %d !", index) // command := "hello God , i am " + string(index) + " !" fmt.Println(command) index++ if command == "stop" { running = false } for err := Publish("test", command); err != nil; err = Publish("test", command) { //切换IP重连 strIP1, strIP2 = strIP2, strIP1 InitProducer(strIP1) } time.Sleep(time.Second) } //关闭 producer.Stop()}// 初始化生产者func InitProducer(str string) { var err error fmt.Println("address: ", str) producer, err = nsq.NewProducer(str, nsq.NewConfig()) if err != nil { panic(err) }}//发布消息func Publish(topic string, message string) error { var err error if producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = producer.Publish(topic, []byte(message)) // 发布消息 return err } return fmt.Errorf("producer is nil", err)}复制代码
3. 一个接收端 receive.go
//Nsq接收测试package mainimport ( "fmt" "time" "github.com/nsqio/go-nsq")// 消费者type ConsumerT struct{}// 主函数func main() { InitConsumer("test", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) }}//处理消息func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil}//初始化消费者func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系统日志 c.AddHandler(&ConsumerT{}) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } //建立多个nsqd连接 if err := c.ConnectToNSQDs([]string{ "127.0.0.1:4150", "127.0.0.1:4152"}); err != nil { panic(err) } // 建立一个nsqd连接 // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // panic(err) // }}复制代码
4. 运行send.go receive.go (记得启动nsqd)
send :
1lzx , i am number 1 !2018/04/08 14:01:46 INF 1 (127.0.0.1:4150) connecting to nsqd2lzx , i am number 2 !3lzx , i am number 3 !4lzx , i am number 4 !复制代码
receive :
receive 127.0.0.1:4150 message: lzx , i am number 1 !receive 127.0.0.1:4150 message: lzx , i am number 2 !receive 127.0.0.1:4150 message: lzx , i am number 3 !receive 127.0.0.1:4150 message: lzx , i am number 4 !复制代码
5. 生成一个 nsqd.660.dat 文件,内容是:
{ "topics":[{ "channels":[{ "name":"test-channel","paused":false}],"name":"test","paused":false}],"version":"0.2.31"}复制代码
6. 注意
代码比较简单,实现了一个单机版消息队列的功能.send端每一秒发送一条信息,receive端直接去队列取.receive端会阻塞,需要重启.代码后期自己优化吧.这里是简单的例子.
二. nsq集群
1. nsqlookupd
nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息简单的说nsqlookupd就是中心管理服务,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。同时为客户端提供查询功能
总的来说,nsqlookupd具有以下功能或特性
唯一性,在一个Nsq服务中只有一个nsqlookupd服务。当然也可以在集群中部署多个nsqlookupd,但它们之间是没有关联的
去中心化,即使nsqlookupd崩溃,也会不影响正在运行的nsqd服务
充当nsqd和naqadmin信息交互的中间件
提供一个http查询服务,给客户端定时更新nsqd的地址目录
未完....
(以上引用了别的链接,链接出处: https://studygolang.com/articles/9756)
推荐:
NSQ源码解析链接: https://blog.csdn.net/column/details/15449.html