最新下载
热门教程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
nanomsg通信库的pubsub及survey
时间:2022-06-24 20:43:27 编辑:袖梨 来源:一聚教程网
nanomsg实验——pubsub
发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。
服务端
代码如下 | 复制代码 |
#include <stdio.h> #include <stdlib.h> #include <time.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> void usage(const char *name) { fprintf(stderr, "%s [ bind url]n", name); } int main(int argc, char **argv) { if(argc != 2) { usage(argv[0]); exit(-1); } const char *url = argv[1]; int sock = nn_socket(AF_SP, NN_PUB); if(sock < 0) { fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno)); exit(-1); } if(nn_bind(sock, url) < 0) { fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno)); exit(-1); } while(1) { time_t rawtime; struct tm * timeinfo; time (&rawtime); timeinfo = localtime (&rawtime); char *text = asctime (timeinfo); int textLen = strlen(text); text[textLen - 1] = ''; printf ("SERVER: PUBLISHING DATE %sn", text); nn_send(sock, text, textLen, 0); sleep(1); } return 0; } |
nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。
pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555
github上源码貌似已经支持websocket了。
nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。
bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。
客户端
代码如下 | 复制代码 |
#include <stdio.h> #include <stdlib.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> int main(int argc, char **argv) { if(argc != 3) { fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]); exit(-1); } const char *name = argv[1]; const char *url = argv[2]; int sock = nn_socket (AF_SP, NN_SUB); if(sock < 0) { fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno)); exit(-1); } if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno)); exit(-1); } if (nn_connect(sock, url) < 0) { fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno)); exit(-1); } while ( 1 ) { char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); printf ("CLIENT (%s): RECEIVED %sn", name, buf); nn_freemsg (buf); } nn_shutdown(sock, 0); return 0; } |
客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。
测试
首先是编译,和普通c程序相同,只是增加链接nanomsg。
gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg
为了方便测试,写了一个简单的shell脚本:
代码如下 | 复制代码 |
#!/bin/bash BASE="$( cd "$( dirname "$0" )" && pwd )" PUB=$BASE/pubserver SUB=$BASE/pubclient URL="tcp://127.0.0.1:1234" echo "start pubserver to bind tcp: $URL" $PUB tcp://127.0.0.1:1234 & echo "start to start pubclient" for((i = 0; i < 10; i++)) do echo "start client$i" $SUB client$i $URL & sleep 1 done sleep 20 echo "kill all process and exit" for pid in `jobs -p` do echo "kill $pid" kill $pid done wait |
脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。
脚本的输出:
代码如下 | 复制代码 |
start pubserver to bind tcp: tcp://127.0.0.1:1234 start to start pubclient start client0 SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015 start client1 SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015 start client2 SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015 start client3 SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015 ... SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015 CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015 CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015 kill all process and exit |
可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。
nanomsg实验——survey
survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。
客户端
客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)
代码如下 | 复制代码 |
#include <cstdio> #include <cstdlib> #include <cstring> #include <ctime> #include <nanomsg/nn.h> #include <nanomsg/survey.h> using namespace std; int main(int argc, const char **argv) { if(argc != 3) { fprintf(stderr, "usage: %s NAME URLn", argv[0]); exit(-1); } const char *name = argv[1]; const char *url = argv[2]; int sock = nn_socket(AF_SP, NN_RESPONDENT); if(sock < 0){ fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno)); exit(-1); } if(nn_connect(sock, url) < 0) { fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno)); exit(-1); } while(1){ char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); if(bytes > 0) { printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf); nn_freemsg (buf); char sendBuffer[128]; time_t rawtime; struct tm * timeinfo; time (&rawtime); timeinfo = localtime (&rawtime); char *timeText = asctime (timeinfo); int textLen = strlen(timeText); timeText[textLen - 1] = ''; sprintf(sendBuffer, "[ %s ] %s", name, timeText); int sendSize = strlen(sendBuffer) + 1; int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0); if(actualSendSize != sendSize) { fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize); continue; } } } nn_shutdown(sock, 0); return 0; } |
这里收到消息后,就简单的打印,然后将响应数据写会给服务端。
服务端
服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。
代码如下 | 复制代码 |
#include <cstdio> #include <cstdlib> #include <cstring> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/survey.h> using namespace std; const char *SURVEY_TYPE = "DATE"; int main(int argc, char** argv) { if ( argc != 2 ) { fprintf(stderr, "usage: %s URLn", argv[0]); exit(-1); } const char *url = argv[1]; int sock = nn_socket(AF_SP, NN_SURVEYOR); if(sock < 0) { fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno)); exit(-1); } if(nn_bind(sock, url) < 0) { fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno)); exit(-1); } while(1) { int sendSize = strlen(SURVEY_TYPE) + 1; int actualSendSize; printf ("SERVER: SENDING DATE SURVEY REQUESTn"); if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) { fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize); continue; } int count = 0; while(1) { char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); if (bytes < 0 && nn_errno() == ETIMEDOUT) break; if (bytes >= 0) { printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf); ++count; nn_freemsg (buf); } else { fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno)); break; } } printf("SERVER: current receive %d survey response.n", count); sleep(1); } nn_shutdown(sock, 0); return 0; } |
这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。
之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:
代码如下 | 复制代码 |
/* If no survey is going on return EFSM error. */ if (nn_slow (!nn_surveyor_inprogress (surveyor))) return -EFSM; |
测试
测试和前文差不多,先启动一个server,然后再一个个启动client:
代码如下 | 复制代码 |
#!/bin/bash BASE="$( cd "$( dirname "$0" )" && pwd )" SERVER=$BASE/surveyserver CLIENT=$BASE/surveyclient URL="tcp://127.0.0.1:1234" echo "start surveyserver to bind tcp: $URL" $SERVER tcp://127.0.0.1:1234 & echo "start to start surveyclient" for((i = 0; i < 10; i++)) do echo "start client$i" $CLIENT client$i $URL & sleep 1 done sleep 20 echo "kill all process and exit" for pid in `jobs -p` do echo "kill $pid" kill $pid done wait |
输出为:
代码如下 | 复制代码 |
start surveyserver to bind tcp: tcp://127.0.0.1:1234 start to start surveyclient start client0 SERVER: SENDING DATE SURVEY REQUEST start client1 nn_recv fail: Operation cannot be performed in this state SERVER: current receive 0 survey response. start client2 SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE nn_recv fail: Operation cannot be performed in this state SERVER: current receive 2 survey response. start client3 SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST ... SERVER: SENDING DATE SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE nn_recv fail: Operation cannot be performed in this state SERVER: current receive 10 survey response. |
从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。
相关文章
- 《彩色点点战争》推图常用三大主c玩法详解 01-23
- 《燕云十六声》池鱼林木任务攻略 01-23
- 《大连地铁e出行》查看行程记录方法 01-23
- 《明日方舟》2025春节限定干员余角色介绍 01-23
- 《崩坏:星穹铁道》万敌光锥搭配攻略 01-23
- 《燕云十六声》一药千金任务攻略 01-23