关于非阻塞模式下的sendmsg()

之前的服务程序使用了多进程模式,创建一对socketpair然后主进程用sendmsg向各个工作进程分发连接fd,工作进程中所有其他的连接都是非阻塞的,但由于传递套接字的消息很小且是单向的,就简单使用了阻塞模式,也没有处理异步读写事件。

最近要主进程和工作进程之间互相传递大量数据,需要把socketpair改成非阻塞的。于是问题就是,socketpair既要传递fd又要传输大量数据,在非阻塞模式下如果缓冲区满了,那么fd还能否传递失败的话会不会丢失;再如果缓冲又有可写空间,是否得重新发送fd。总之对sendmsg函数在非阻塞模式下的行为不甚了解,网上G了一圈也没有发现有说明此类问题的文章。

于是就试一下,代码如下:

#include "msgio.h"
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>

static int setnonblocking( int fd )
{
    int opts;

    if( fd<=0 )
        return -1;

    opts = fcntl( fd, F_GETFL );
    if( opts<0 )
        return -1;

    if( fcntl( fd, F_SETFL, opts|O_NONBLOCK ) < 0 )
        return -1;

    return 0;
}

int main()
{
    int ret;
    int ss[2];
    int pid;

    ret = socketpair( AF_UNIX, SOCK_STREAM, 0, ss );
    if( ret != 0 )
    {
        perror( "create socket pair failed" );
        return 1;
    }

    pid = fork();
    switch( pid )
    {
        case -1:
            perror( "fork failed" );
            return 2;
        case 0:
            close( ss[0] );
            setnonblocking( ss[1] );
            recver( ss[1] );
            break;
        default:
            close( ss[1] );
            setnonblocking( ss[0] );
            sender( ss[0] );
            break;
    }

    if( pid )
    {
        sleep( 2 );
        kill( pid, SIGTERM );
        wait( &ret );
    }

    return 0;
}
#include "msgio.h"
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>

#define MSG_DATA "xyz"
#define MSG_LEN sizeof(MSG_DATA)

#define LOOP_TIMES 10000

#define print( fmt, ... ) \
    fprintf( stderr, fmt "\n", ##__VA_ARGS__ )


int sender( int fd )
{
    int ret;
    int i;
    int xfd;

    struct msghdr msghdr;
    struct iovec iov[4];
    union {
        struct cmsghdr cm;
        char data[CMSG_SPACE(sizeof(int))];
    } cmsg;

    xfd = open( "/dev/zero", O_RDONLY );
    if( xfd < 0 )
    {
        perror( "open xfd failed" );
        return -1;
    }

    cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
    cmsg.cm.cmsg_level = SOL_SOCKET;
    cmsg.cm.cmsg_type = SCM_RIGHTS;
    *(int*)CMSG_DATA(&(cmsg.cm)) = xfd;

    iov[0].iov_base = MSG_DATA;
    iov[0].iov_len = MSG_LEN;
    iov[1].iov_base = MSG_DATA;
    iov[1].iov_len = MSG_LEN;
    iov[2].iov_base = MSG_DATA;
    iov[2].iov_len = MSG_LEN;
    iov[3].iov_base = MSG_DATA;
    iov[3].iov_len = MSG_LEN;

    msghdr.msg_name = NULL;
    msghdr.msg_namelen = 0;
    msghdr.msg_iov = iov;
    msghdr.msg_iovlen = 4;
    msghdr.msg_control = (caddr_t)&cmsg;
    msghdr.msg_controllen = sizeof(cmsg);

    print( "to send %d", MSG_LEN );

    for( i=0; i<2; i++ )
    {
        ret = sendmsg( fd, &msghdr, MSG_DONTWAIT );
        if( ret < 0 )
        {
            perror( "sendmsg failed" );
            continue;
        }

        print( "sendmsg %5d sent %d, fd %d", i, ret, xfd );
    }

    return 0;
}


int recver( int fd )
{
    int ret;
    int i;
    unsigned char buf[10];
    int xfd;

    struct msghdr msghdr;
    struct iovec iov[1];
    union {
        struct cmsghdr cm;
        char data[CMSG_SPACE(sizeof(int))];
    } cmsg;

    iov[0].iov_base = buf;
    iov[0].iov_len = sizeof(buf);

    msghdr.msg_name = NULL;
    msghdr.msg_namelen = 0;
    msghdr.msg_iov = iov;
    msghdr.msg_iovlen = 1;
    msghdr.msg_control = (caddr_t)&cmsg;
    msghdr.msg_controllen = sizeof(cmsg);

    sleep(1);

    for( i=0; i<4; i++ )
    {
        ret = recvmsg( fd, &msghdr, 0 );
        if( ret < 0 )
        {
            perror( "recvmsg failed" );
            continue;
        }

        if( cmsg.cm.cmsg_len < CMSG_LEN(sizeof(int)) )
        {
            print( "msg control data len %u", cmsg.cm.cmsg_len );
            continue;
        }
        if( cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS )
        {
            print( "msg control level %d, type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type );
            continue;
        }

        xfd = *(int*)CMSG_DATA( &(cmsg.cm) );
        print( "recvmsg %5d recv %d, fd %d", i, ret, xfd );
        //close( xfd );
    }

    return 0;
}

测试结果为:

to send 4
sendmsg     0 sent 16, fd 4
sendmsg     1 sent 16, fd 4
recvmsg     0 recv 10, fd 3
recvmsg     1 recv 10, fd 5
recvmsg     2 recv 10, fd 5
recvmsg     3 recv 2, fd 5

父进程分两个发送了32字节数据,每次附带一个fd。子进程分四次接收,数据接收完整,也只收到了两个fd。

特别是第二次的接收,数据中包含父进程两次消息中的数据,但fd为第二个消息的fd。随后的接收到的都是同一个fd。

由此可以猜测,fd的传递和数据类似于两个通道。由于一个进程中的fd是唯一的,子进程在收到fd后可以判断出是否是新的fd。

将recver()函数中的buf大小改为 10*MSG_LEN 进行测试,结果为:

to send 4
sendmsg     0 sent 16, fd 4
sendmsg     1 sent 16, fd 4
recvmsg     0 recv 16, fd 3
recvmsg     1 recv 16, fd 5
recvmsg failed: Resource temporarily unavailable
recvmsg failed: Resource temporarily unavailable
可见两次发送的消息没有合并,猜测内核对此进行了处理,只允许一个消息中包含一个fd。

over