关于非阻塞模式下的sendmsg()
之前的服务程序使用了多进程模式,创建一对socketpair然后主进程用sendmsg向各个工作进程分发连接fd,工作进程中所有其他的连接都是非阻塞的,但由于传递套接字的消息很小且是单向的,就简单使用了阻塞模式,也没有处理异步读写事件。
最近要主进程和工作进程之间互相传递大量数据,需要把socketpair改成非阻塞的。于是问题就是,socketpair既要传递fd又要传输大量数据,在非阻塞模式下如果缓冲区满了,那么fd还能否传递失败的话会不会丢失;再如果缓冲又有可写空间,是否得重新发送fd。总之对sendmsg函数在非阻塞模式下的行为不甚了解,网上G了一圈也没有发现有说明此类问题的文章。
于是就试一下,代码如下:
#include "msgio.h" #include <stdio.h> #include <unistd.h> #include <fcntl.h> #include <signal.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/socket.h> static int setnonblocking( int fd ) { int opts; if( fd<=0 ) return -1; opts = fcntl( fd, F_GETFL ); if( opts<0 ) return -1; if( fcntl( fd, F_SETFL, opts|O_NONBLOCK ) < 0 ) return -1; return 0; } int main() { int ret; int ss[2]; int pid; ret = socketpair( AF_UNIX, SOCK_STREAM, 0, ss ); if( ret != 0 ) { perror( "create socket pair failed" ); return 1; } pid = fork(); switch( pid ) { case -1: perror( "fork failed" ); return 2; case 0: close( ss[0] ); setnonblocking( ss[1] ); recver( ss[1] ); break; default: close( ss[1] ); setnonblocking( ss[0] ); sender( ss[0] ); break; } if( pid ) { sleep( 2 ); kill( pid, SIGTERM ); wait( &ret ); } return 0; }
#include "msgio.h" #include <stdio.h> #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #define MSG_DATA "xyz" #define MSG_LEN sizeof(MSG_DATA) #define LOOP_TIMES 10000 #define print( fmt, ... ) \ fprintf( stderr, fmt "\n", ##__VA_ARGS__ ) int sender( int fd ) { int ret; int i; int xfd; struct msghdr msghdr; struct iovec iov[4]; union { struct cmsghdr cm; char data[CMSG_SPACE(sizeof(int))]; } cmsg; xfd = open( "/dev/zero", O_RDONLY ); if( xfd < 0 ) { perror( "open xfd failed" ); return -1; } cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; *(int*)CMSG_DATA(&(cmsg.cm)) = xfd; iov[0].iov_base = MSG_DATA; iov[0].iov_len = MSG_LEN; iov[1].iov_base = MSG_DATA; iov[1].iov_len = MSG_LEN; iov[2].iov_base = MSG_DATA; iov[2].iov_len = MSG_LEN; iov[3].iov_base = MSG_DATA; iov[3].iov_len = MSG_LEN; msghdr.msg_name = NULL; msghdr.msg_namelen = 0; msghdr.msg_iov = iov; msghdr.msg_iovlen = 4; msghdr.msg_control = (caddr_t)&cmsg; msghdr.msg_controllen = sizeof(cmsg); print( "to send %d", MSG_LEN ); for( i=0; i<2; i++ ) { ret = sendmsg( fd, &msghdr, MSG_DONTWAIT ); if( ret < 0 ) { perror( "sendmsg failed" ); continue; } print( "sendmsg %5d sent %d, fd %d", i, ret, xfd ); } return 0; } int recver( int fd ) { int ret; int i; unsigned char buf[10]; int xfd; struct msghdr msghdr; struct iovec iov[1]; union { struct cmsghdr cm; char data[CMSG_SPACE(sizeof(int))]; } cmsg; iov[0].iov_base = buf; iov[0].iov_len = sizeof(buf); msghdr.msg_name = NULL; msghdr.msg_namelen = 0; msghdr.msg_iov = iov; msghdr.msg_iovlen = 1; msghdr.msg_control = (caddr_t)&cmsg; msghdr.msg_controllen = sizeof(cmsg); sleep(1); for( i=0; i<4; i++ ) { ret = recvmsg( fd, &msghdr, 0 ); if( ret < 0 ) { perror( "recvmsg failed" ); continue; } if( cmsg.cm.cmsg_len < CMSG_LEN(sizeof(int)) ) { print( "msg control data len %u", cmsg.cm.cmsg_len ); continue; } if( cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS ) { print( "msg control level %d, type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type ); continue; } xfd = *(int*)CMSG_DATA( &(cmsg.cm) ); print( "recvmsg %5d recv %d, fd %d", i, ret, xfd ); //close( xfd ); } return 0; }
测试结果为:
to send 4
sendmsg 0 sent 16, fd 4
sendmsg 1 sent 16, fd 4
recvmsg 0 recv 10, fd 3
recvmsg 1 recv 10, fd 5
recvmsg 2 recv 10, fd 5
recvmsg 3 recv 2, fd 5
父进程分两个发送了32字节数据,每次附带一个fd。子进程分四次接收,数据接收完整,也只收到了两个fd。
特别是第二次的接收,数据中包含父进程两次消息中的数据,但fd为第二个消息的fd。随后的接收到的都是同一个fd。
由此可以猜测,fd的传递和数据类似于两个通道。由于一个进程中的fd是唯一的,子进程在收到fd后可以判断出是否是新的fd。
将recver()函数中的buf大小改为 10*MSG_LEN 进行测试,结果为:
to send 4
sendmsg 0 sent 16, fd 4
sendmsg 1 sent 16, fd 4
recvmsg 0 recv 16, fd 3
recvmsg 1 recv 16, fd 5
recvmsg failed: Resource temporarily unavailable
recvmsg failed: Resource temporarily unavailable
可见两次发送的消息没有合并,猜测内核对此进行了处理,只允许一个消息中包含一个fd。
over