recvmmsg 函数详解 見出しへのリンク

1. 函数介绍 見出しへのリンク

recvmmsg 是Linux 2.6.39引入的高效批量接收消息系统调用。它是 recvmsg 的批量版本,允许应用程序在单次系统调用中接收多个消息,显著减少了系统调用开销,特别适用于高吞吐量的网络服务器和实时应用。

2. 函数原型 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
             int flags, struct timespec *timeout);

3. 功能 見出しへのリンク

recvmmsg 允许从套接字批量接收多个消息,每个消息可以包含数据和控制信息。它支持分散缓冲区接收、地址信息获取、控制数据接收等功能,是构建高性能网络应用的关键工具。

4. 参数 見出しへのリンク

  • int sockfd: 套接字文件描述符
  • *struct mmsghdr msgvec: 消息向量数组,描述多个接收消息
  • unsigned int vlen: 消息向量数组的长度(最大可接收的消息数)
  • int flags: 接收标志,与recvmsg相同
  • *struct timespec timeout: 超时时间(NULL表示无限等待)

5. 返回值 見出しへのリンク

  • 成功: 返回实际接收到的消息数量
  • 超时: 返回0
  • 失败: 返回-1,并设置errno

6. 相似函数,或关联函数 見出しへのリンク

  • recvmsg: 单消息接收函数
  • recv: 基本接收函数
  • sendmmsg: 对应的批量发送函数
  • poll/epoll: I/O多路复用函数

7. 示例代码 見出しへのリンク

示例1:基础recvmmsg使用 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>

/**
 * 演示recvmmsg的基本使用方法
 */
int demo_recvmmsg_basic() {
    int server_fd, client_fd;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    struct mmsghdr msgvec[5];  // 批量接收5个消息
    struct iovec iov[5][1];    // 每个消息1个缓冲区
    char buffers[5][256];      // 5个缓冲区
    ssize_t bytes_sent;
    int messages_received;
    
    printf("=== recvmmsg 基本使用示例 ===\n");
    
    // 创建UDP服务器套接字
    server_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server_fd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8080);
    
    // 绑定套接字
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server_fd);
        return -1;
    }
    
    printf("UDP服务器监听在端口 8080\n");
    
    // 启动UDP客户端发送多个消息
    if (fork() == 0) {
        // 客户端代码
        sleep(1);  // 等待服务器启动
        int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
        struct sockaddr_in serv_addr;
        
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(8080);
        serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        
        // 发送5个消息
        for (int i = 0; i < 5; i++) {
            char message[100];
            snprintf(message, sizeof(message), "Message %d from client", i + 1);
            bytes_sent = sendto(client_sock, message, strlen(message), 0,
                               (struct sockaddr*)&serv_addr, sizeof(serv_addr));
            if (bytes_sent != -1) {
                printf("客户端发送消息 %d: %s\n", i + 1, message);
            }
            usleep(100000);  // 100ms间隔
        }
        
        close(client_sock);
        exit(0);
    }
    
    // 准备批量接收消息结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < 5; i++) {
        // 设置每个消息的缓冲区
        iov[i][0].iov_base = buffers[i];
        iov[i][0].iov_len = sizeof(buffers[i]) - 1;
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = &client_addr;
        msgvec[i].msg_hdr.msg_namelen = client_len;
    }
    
    printf("准备批量接收最多5个消息...\n");
    
    // 批量接收消息(设置5秒超时)
    struct timespec timeout = {5, 0};  // 5秒超时
    messages_received = recvmmsg(server_fd, msgvec, 5, 0, &timeout);
    
    if (messages_received == -1) {
        perror("recvmmsg 失败");
        close(server_fd);
        return -1;
    }
    
    printf("成功接收 %d 个消息:\n", messages_received);
    
    // 处理接收到的消息
    for (int i = 0; i < messages_received; i++) {
        buffers[i][msgvec[i].msg_len] = '\0';  // 添加字符串终止符
        
        // 获取发送者地址信息
        struct sockaddr_in *sender_addr = (struct sockaddr_in*)msgvec[i].msg_hdr.msg_name;
        printf("  消息 %d: %s (来自 %s:%d, 长度: %u 字节)\n", 
               i + 1, buffers[i],
               inet_ntoa(sender_addr->sin_addr), ntohs(sender_addr->sin_port),
               msgvec[i].msg_len);
    }
    
    close(server_fd);
    
    // 等待客户端结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_recvmmsg_basic();
}

示例2:高性能UDP服务器 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <poll.h>

/**
 * 高性能UDP服务器结构
 */
typedef struct {
    int sockfd;
    int port;
    unsigned long total_messages;
    unsigned long total_bytes;
    time_t start_time;
} high_perf_udp_server_t;

/**
 * 初始化高性能UDP服务器
 */
int server_init(high_perf_udp_server_t *server, int port) {
    struct sockaddr_in server_addr;
    
    memset(server, 0, sizeof(high_perf_udp_server_t));
    server->port = port;
    server->start_time = time(NULL);
    
    // 创建UDP套接字
    server->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server->sockfd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);
    
    // 绑定套接字
    if (bind(server->sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server->sockfd);
        return -1;
    }
    
    printf("高性能UDP服务器启动,监听端口 %d\n", port);
    return 0;
}

/**
 * 使用recvmmsg处理批量消息
 */
int server_process_batch(high_perf_udp_server_t *server) {
    const int BATCH_SIZE = 32;  // 每次批量处理32个消息
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][2];  // 每个消息2个缓冲区
    char buffers[BATCH_SIZE][2][256]; // 双缓冲区
    struct sockaddr_in client_addrs[BATCH_SIZE];
    int messages_received;
    
    // 准备批量接收结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        // 设置分散缓冲区
        iov[i][0].iov_base = buffers[i][0];
        iov[i][0].iov_len = sizeof(buffers[i][0]) - 1;
        iov[i][1].iov_base = buffers[i][1];
        iov[i][1].iov_len = sizeof(buffers[i][1]) - 1;
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 2;
        msgvec[i].msg_hdr.msg_name = &client_addrs[i];
        msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
    }
    
    // 批量接收消息(非阻塞,100ms超时)
    struct timespec timeout = {0, 100000000};  // 100ms超时
    messages_received = recvmmsg(server->sockfd, msgvec, BATCH_SIZE, MSG_DONTWAIT, &timeout);
    
    if (messages_received == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return 0;  // 没有数据可读
        }
        perror("recvmmsg 失败");
        return -1;
    }
    
    if (messages_received == 0) {
        return 0;  // 超时
    }
    
    // 处理接收到的消息
    for (int i = 0; i < messages_received; i++) {
        // 合并分散缓冲区的内容
        size_t total_len = msgvec[i].msg_len;
        size_t buf1_len = (total_len > sizeof(buffers[i][0]) - 1) ? 
                         sizeof(buffers[i][0]) - 1 : total_len;
        size_t buf2_len = (total_len > buf1_len) ? total_len - buf1_len : 0;
        
        buffers[i][0][buf1_len] = '\0';
        if (buf2_len > 0) {
            buffers[i][1][buf2_len] = '\0';
        }
        
        // 统计信息
        server->total_messages++;
        server->total_bytes += msgvec[i].msg_len;
        
        // 简单回显处理
        if (strncmp(buffers[i][0], "STATS", 5) == 0) {
            char response[256];
            snprintf(response, sizeof(response), 
                    "Messages: %lu, Bytes: %lu, Uptime: %lds",
                    server->total_messages, server->total_bytes,
                    time(NULL) - server->start_time);
            
            sendto(server->sockfd, response, strlen(response), 0,
                   (struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
        } else {
            // 回显原始消息
            sendto(server->sockfd, buffers[i][0], buf1_len, 0,
                   (struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
            if (buf2_len > 0) {
                sendto(server->sockfd, buffers[i][1], buf2_len, 0,
                       (struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
            }
        }
        
        if (server->total_messages % 1000 == 0) {
            printf("已处理 %lu 条消息\n", server->total_messages);
        }
    }
    
    return messages_received;
}

/**
 * 运行高性能服务器
 */
int server_run(high_perf_udp_server_t *server, int duration_seconds) {
    time_t start_time = time(NULL);
    time_t current_time;
    int total_batches = 0;
    
    printf("服务器开始运行 %d 秒...\n", duration_seconds);
    
    while ((current_time = time(NULL)) - start_time < duration_seconds) {
        int batch_result = server_process_batch(server);
        if (batch_result > 0) {
            total_batches++;
        } else if (batch_result == -1) {
            break;
        }
        
        // 短暂休眠以避免过度占用CPU
        usleep(1000);  // 1ms
    }
    
    printf("服务器运行结束\n");
    printf("统计信息:\n");
    printf("  总消息数: %lu\n", server->total_messages);
    printf("  总字节数: %lu\n", server->total_bytes);
    printf("  平均吞吐量: %.2f 消息/秒\n", 
           server->total_messages / (double)duration_seconds);
    printf("  处理批次: %d\n", total_batches);
    
    return 0;
}

/**
 * 演示高性能UDP服务器
 */
int demo_high_performance_udp_server() {
    high_perf_udp_server_t server;
    const int SERVER_PORT = 8081;
    const int TEST_DURATION = 30;  // 30秒测试
    
    printf("=== 高性能UDP服务器示例 ===\n");
    
    // 初始化服务器
    if (server_init(&server, SERVER_PORT) != 0) {
        return -1;
    }
    
    // 启动压力测试客户端
    if (fork() == 0) {
        sleep(1);  // 等待服务器启动
        
        // 创建多个并发客户端
        for (int client_id = 0; client_id < 3; client_id++) {
            if (fork() == 0) {
                int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
                struct sockaddr_in serv_addr;
                
                memset(&serv_addr, 0, sizeof(serv_addr));
                serv_addr.sin_family = AF_INET;
                serv_addr.sin_port = htons(SERVER_PORT);
                serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
                
                printf("客户端 %d 开始发送测试消息...\n", client_id + 1);
                
                // 发送测试消息
                for (int i = 0; i < 1000; i++) {
                    char message[128];
                    snprintf(message, sizeof(message), 
                            "Client %d Test Message %d", client_id + 1, i + 1);
                    
                    sendto(client_sock, message, strlen(message), 0,
                           (struct sockaddr*)&serv_addr, sizeof(serv_addr));
                    
                    // 随机间隔发送
                    usleep(1000 + (rand() % 5000));  // 1-6ms间隔
                }
                
                // 请求统计信息
                const char *stats_cmd = "STATS";
                sendto(client_sock, stats_cmd, strlen(stats_cmd), 0,
                       (struct sockaddr*)&serv_addr, sizeof(serv_addr));
                
                close(client_sock);
                exit(0);
            }
        }
        
        // 等待所有客户端完成
        for (int i = 0; i < 3; i++) {
            int status;
            wait(&status);
        }
        
        exit(0);
    }
    
    // 运行服务器
    server_run(&server, TEST_DURATION);
    
    // 清理资源
    close(server.sockfd);
    
    // 等待测试客户端结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_high_performance_udp_server();
}

示例3:带控制信息的批量接收 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>

/**
 * 演示recvmmsg接收控制信息
 */
int demo_recvmmsg_control_data() {
    int server_fd;
    struct sockaddr_in server_addr;
    struct mmsghdr msgvec[3];
    struct iovec iov[3][1];
    char buffers[3][256];
    char control_buffers[3][1024];
    struct sockaddr_in client_addrs[3];
    int messages_received;
    
    printf("=== recvmmsg 控制信息接收示例 ===\n");
    
    // 创建UDP服务器套接字
    server_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server_fd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 启用时间戳选项
    int timestamp_on = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_TIMESTAMP, 
                   &timestamp_on, sizeof(timestamp_on)) == -1) {
        printf("警告: 无法启用时间戳选项: %s\n", strerror(errno));
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8082);
    
    // 绑定套接字
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server_fd);
        return -1;
    }
    
    printf("带控制信息的UDP服务器监听在端口 8082\n");
    
    // 启动客户端发送消息
    if (fork() == 0) {
        sleep(1);
        int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
        struct sockaddr_in serv_addr;
        
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(8082);
        serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        
        // 发送3个消息
        for (int i = 0; i < 3; i++) {
            char message[100];
            snprintf(message, sizeof(message), "Control Message %d", i + 1);
            sendto(client_sock, message, strlen(message), 0,
                   (struct sockaddr*)&serv_addr, sizeof(serv_addr));
            printf("客户端发送: %s\n", message);
            usleep(200000);  // 200ms间隔
        }
        
        close(client_sock);
        exit(0);
    }
    
    // 准备批量接收结构(带控制信息)
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < 3; i++) {
        // 设置数据缓冲区
        iov[i][0].iov_base = buffers[i];
        iov[i][0].iov_len = sizeof(buffers[i]) - 1;
        
        // 设置消息头
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = &client_addrs[i];
        msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
        
        // 设置控制缓冲区
        msgvec[i].msg_hdr.msg_control = control_buffers[i];
        msgvec[i].msg_hdr.msg_controllen = sizeof(control_buffers[i]);
    }
    
    printf("准备接收带控制信息的消息...\n");
    
    // 批量接收消息
    struct timespec timeout = {5, 0};
    messages_received = recvmmsg(server_fd, msgvec, 3, 0, &timeout);
    
    if (messages_received == -1) {
        perror("recvmmsg 失败");
        close(server_fd);
        return -1;
    }
    
    printf("接收到 %d 个消息,处理控制信息:\n", messages_received);
    
    // 处理接收到的消息和控制信息
    for (int i = 0; i < messages_received; i++) {
        buffers[i][msgvec[i].msg_len] = '\0';
        printf("\n消息 %d:\n", i + 1);
        printf("  数据: %s\n", buffers[i]);
        printf("  长度: %u 字节\n", msgvec[i].msg_len);
        
        // 解析控制信息
        struct msghdr *hdr = &msgvec[i].msg_hdr;
        struct cmsghdr *cmsg;
        
        printf("  控制信息:\n");
        int control_found = 0;
        
        for (cmsg = CMSG_FIRSTHDR(hdr); cmsg != NULL; cmsg = CMSG_NXTHDR(hdr, cmsg)) {
            control_found = 1;
            
            if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
                struct timeval *tv = (struct timeval*)CMSG_DATA(cmsg);
                printf("    时间戳: %ld.%06ld\n", tv->tv_sec, tv->tv_usec);
                
                // 转换为可读格式
                char time_str[64];
                time_t sec = tv->tv_sec;
                struct tm *tm_info = localtime(&sec);
                strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", tm_info);
                printf("    可读时间: %s.%06ld\n", time_str, tv->tv_usec);
            } else {
                printf("    其他控制信息: level=%d, type=%d, len=%zu\n",
                       cmsg->cmsg_level, cmsg->cmsg_type, cmsg->cmsg_len);
            }
        }
        
        if (!control_found) {
            printf("    没有控制信息\n");
        }
    }
    
    close(server_fd);
    
    // 等待客户端结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_recvmmsg_control_data();
}

示例4:实时数据采集应用 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

/**
 * 实时数据采集结构
 */
typedef struct {
    int sockfd;
    int port;
    unsigned long packets_received;
    unsigned long bytes_received;
    unsigned long batches_processed;
    time_t start_time;
    volatile int running;
} data_collector_t;

// 全局变量用于信号处理
static data_collector_t *g_collector = NULL;

/**
 * 信号处理函数
 */
void signal_handler(int sig) {
    if (g_collector) {
        g_collector->running = 0;
        printf("\n收到信号 %d,准备退出...\n", sig);
    }
}

/**
 * 初始化数据采集器
 */
int collector_init(data_collector_t *collector, int port) {
    struct sockaddr_in server_addr;
    
    memset(collector, 0, sizeof(data_collector_t));
    collector->port = port;
    collector->start_time = time(NULL);
    collector->running = 1;
    
    // 设置全局指针用于信号处理
    g_collector = collector;
    
    // 注册信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 创建UDP套接字
    collector->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (collector->sockfd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);
    
    // 绑定套接字
    if (bind(collector->sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(collector->sockfd);
        return -1;
    }
    
    printf("实时数据采集器启动,监听端口 %d\n", port);
    return 0;
}

/**
 * 处理批量数据
 */
int collector_process_batch(data_collector_t *collector) {
    const int BATCH_SIZE = 64;  // 每次处理64个数据包
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][1];
    char buffers[BATCH_SIZE][512];
    struct sockaddr_in client_addrs[BATCH_SIZE];
    int messages_received;
    
    // 准备批量接收结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        iov[i][0].iov_base = buffers[i];
        iov[i][0].iov_len = sizeof(buffers[i]) - 1;
        
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
        msgvec[i].msg_hdr.msg_name = &client_addrs[i];
        msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
    }
    
    // 批量接收消息(非阻塞,50ms超时)
    struct timespec timeout = {0, 50000000};  // 50ms超时
    messages_received = recvmmsg(collector->sockfd, msgvec, BATCH_SIZE, 
                                MSG_DONTWAIT, &timeout);
    
    if (messages_received == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return 0;  // 没有数据
        }
        perror("recvmmsg 失败");
        return -1;
    }
    
    if (messages_received == 0) {
        return 0;  // 超时
    }
    
    // 更新统计信息
    collector->batches_processed++;
    collector->packets_received += messages_received;
    
    // 处理接收到的数据包
    for (int i = 0; i < messages_received; i++) {
        collector->bytes_received += msgvec[i].msg_len;
        
        // 解析数据包(假设是简单的传感器数据格式)
        buffers[i][msgvec[i].msg_len] = '\0';
        
        // 简单的数据包解析示例
        if (strncmp(buffers[i], "SENSOR:", 7) == 0) {
            // 传感器数据格式: SENSOR:id,value,timestamp
            char *token = strtok(buffers[i] + 7, ",");
            if (token) {
                int sensor_id = atoi(token);
                token = strtok(NULL, ",");
                if (token) {
                    double value = atof(token);
                    token = strtok(NULL, ",");
                    if (token) {
                        long timestamp = atol(token);
                        
                        // 实时处理传感器数据
                        if (collector->packets_received % 1000 == 0) {
                            printf("传感器%d: 值=%.2f, 时间戳=%ld\n", 
                                   sensor_id, value, timestamp);
                        }
                    }
                }
            }
        }
    }
    
    return messages_received;
}

/**
 * 显示实时统计信息
 */
void collector_show_stats(data_collector_t *collector) {
    time_t current_time = time(NULL);
    double uptime = difftime(current_time, collector->start_time);
    
    if (uptime > 0) {
        double packets_per_sec = collector->packets_received / uptime;
        double bytes_per_sec = collector->bytes_received / uptime;
        
        printf("\r运行时间: %.0fs | 数据包: %lu | 字节: %lu | "
               "速率: %.0f包/s %.2fMB/s | 批次: %lu",
               uptime, collector->packets_received, collector->bytes_received,
               packets_per_sec, bytes_per_sec / (1024 * 1024),
               collector->batches_processed);
        fflush(stdout);
    }
}

/**
 * 运行数据采集器
 */
int collector_run(data_collector_t *collector) {
    printf("数据采集器开始运行,按 Ctrl+C 停止\n");
    
    time_t last_stats_time = time(NULL);
    
    while (collector->running) {
        int result = collector_process_batch(collector);
        if (result == -1) {
            break;
        }
        
        // 定期显示统计信息
        time_t current_time = time(NULL);
        if (current_time - last_stats_time >= 1) {
            collector_show_stats(collector);
            last_stats_time = current_time;
        }
    }
    
    printf("\n数据采集器停止\n");
    return 0;
}

/**
 * 模拟传感器数据发送器
 */
int sensor_data_sender(int port, int packet_count) {
    int sock = socket(AF_INET, SOCK_DGRAM, 0);
    struct sockaddr_in serv_addr;
    char message[256];
    
    if (sock == -1) {
        perror("创建发送套接字失败");
        return -1;
    }
    
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    printf("传感器数据发送器开始发送 %d 个数据包...\n", packet_count);
    
    srand(time(NULL));
    
    for (int i = 0; i < packet_count; i++) {
        // 生成模拟传感器数据
        int sensor_id = rand() % 10;
        double value = (rand() % 1000) / 10.0;
        long timestamp = time(NULL);
        
        snprintf(message, sizeof(message), "SENSOR:%d,%.2f,%ld", 
                 sensor_id, value, timestamp);
        
        sendto(sock, message, strlen(message), 0,
               (struct sockaddr*)&serv_addr, sizeof(serv_addr));
        
        // 控制发送速率
        usleep(1000);  // 1ms间隔
    }
    
    printf("传感器数据发送完成\n");
    close(sock);
    return 0;
}

/**
 * 演示实时数据采集应用
 */
int demo_real_time_data_collection() {
    data_collector_t collector;
    const int SERVER_PORT = 8083;
    
    printf("=== 实时数据采集应用示例 ===\n");
    
    // 初始化采集器
    if (collector_init(&collector, SERVER_PORT) != 0) {
        return -1;
    }
    
    // 启动传感器数据发送器
    if (fork() == 0) {
        sleep(1);  // 等待采集器启动
        sensor_data_sender(SERVER_PORT, 5000);  // 发送5000个数据包
        exit(0);
    }
    
    // 运行采集器30秒
    sleep(1);
    time_t start_time = time(NULL);
    
    while (collector.running && (time(NULL) - start_time < 30)) {
        collector_process_batch(&collector);
        
        // 每秒显示一次统计
        static time_t last_stats = 0;
        time_t now = time(NULL);
        if (now - last_stats >= 1) {
            collector_show_stats(&collector);
            last_stats = now;
        }
        
        usleep(10000);  // 10ms循环间隔
    }
    
    collector.running = 0;
    
    // 显示最终统计
    printf("\n\n最终统计:\n");
    printf("  总数据包: %lu\n", collector.packets_received);
    printf("  总字节数: %lu\n", collector.bytes_received);
    printf("  处理批次: %lu\n", collector.batches_processed);
    printf("  平均处理速率: %.0f 包/秒\n", 
           collector.packets_received / 30.0);
    
    // 清理资源
    close(collector.sockfd);
    
    // 等待发送器结束
    int status;
    wait(&status);
    
    return 0;
}

int main() {
    return demo_real_time_data_collection();
}

示例5:性能对比测试 見出しへのリンク

#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>

/**
 * 性能测试结构
 */
typedef struct {
    const char *name;
    unsigned long messages_sent;
    unsigned long messages_received;
    struct timeval start_time;
    struct timeval end_time;
} perf_test_t;

/**
 * 使用传统recvmsg进行测试
 */
int test_recvmsg_performance(int sockfd, int message_count) {
    struct msghdr msg;
    struct iovec iov[1];
    char buffer[256];
    ssize_t bytes_received;
    int received_count = 0;
    
    // 准备接收结构
    memset(&msg, 0, sizeof(msg));
    iov[0].iov_base = buffer;
    iov[0].iov_len = sizeof(buffer) - 1;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
    
    // 接收指定数量的消息
    while (received_count < message_count) {
        bytes_received = recvmsg(sockfd, &msg, MSG_DONTWAIT);
        if (bytes_received > 0) {
            received_count++;
        } else if (bytes_received == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                usleep(1000);  // 短暂等待
                continue;
            } else {
                break;
            }
        } else {
            break;  // 连接关闭
        }
    }
    
    return received_count;
}

/**
 * 使用recvmmsg进行测试
 */
int test_recvmmsg_performance(int sockfd, int message_count) {
    const int BATCH_SIZE = 32;
    struct mmsghdr msgvec[BATCH_SIZE];
    struct iovec iov[BATCH_SIZE][1];
    char buffers[BATCH_SIZE][256];
    int total_received = 0;
    int messages_received;
    
    // 准备批量接收结构
    memset(msgvec, 0, sizeof(msgvec));
    
    for (int i = 0; i < BATCH_SIZE; i++) {
        iov[i][0].iov_base = buffers[i];
        iov[i][0].iov_len = sizeof(buffers[i]) - 1;
        
        msgvec[i].msg_hdr.msg_iov = iov[i];
        msgvec[i].msg_hdr.msg_iovlen = 1;
    }
    
    // 批量接收消息
    while (total_received < message_count) {
        int to_receive = (message_count - total_received < BATCH_SIZE) ? 
                        message_count - total_received : BATCH_SIZE;
        
        struct timespec timeout = {1, 0};  // 1秒超时
        messages_received = recvmmsg(sockfd, msgvec, to_receive, MSG_DONTWAIT, &timeout);
        
        if (messages_received > 0) {
            total_received += messages_received;
        } else if (messages_received == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                usleep(1000);
                continue;
            } else {
                break;
            }
        } else {
            break;  // 超时或无数据
        }
    }
    
    return total_received;
}

/**
 * UDP性能测试服务器
 */
int perf_test_server(int port) {
    int server_fd;
    struct sockaddr_in server_addr;
    
    server_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (server_fd == -1) {
        perror("创建UDP套接字失败");
        return -1;
    }
    
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);
    
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定套接字失败");
        close(server_fd);
        return -1;
    }
    
    printf("性能测试服务器启动,监听端口 %d\n", port);
    return server_fd;
}

/**
 * 消息发送器
 */
int message_sender(int port, int message_count, int message_size) {
    int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
    struct sockaddr_in serv_addr;
    char *message;
    ssize_t bytes_sent;
    
    if (client_sock == -1) {
        perror("创建客户端套接字失败");
        return -1;
    }
    
    // 分配消息缓冲区
    message = malloc(message_size + 1);
    if (!message) {
        perror("分配消息缓冲区失败");
        close(client_sock);
        return -1;
    }
    
    // 填充消息内容
    for (int i = 0; i < message_size; i++) {
        message[i] = 'A' + (i % 26);
    }
    message[message_size] = '\0';
    
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    printf("发送 %d 个大小为 %d 字节的消息...\n", message_count, message_size);
    
    for (int i = 0; i < message_count; i++) {
        bytes_sent = sendto(client_sock, message, message_size, 0,
                           (struct sockaddr*)&serv_addr, sizeof(serv_addr));
        if (bytes_sent == -1) {
            perror("发送消息失败");
            break;
        }
        
        if (i % 1000 == 0) {
            printf("已发送 %d 个消息\n", i);
        }
    }
    
    printf("消息发送完成\n");
    
    free(message);
    close(client_sock);
    return 0;
}

/**
 * 演示性能对比测试
 */
int demo_performance_comparison() {
    int server_fd;
    const int SERVER_PORT = 8084;
    const int MESSAGE_COUNT = 10000;
    const int MESSAGE_SIZE = 128;
    perf_test_t tests[2];
    
    printf("=== recvmmsg vs recvmsg 性能对比测试 ===\n");
    
    // 初始化测试结构
    tests[0].name = "recvmsg";
    tests[0].messages_sent = MESSAGE_COUNT;
    tests[0].messages_received = 0;
    
    tests[1].name = "recvmmsg";
    tests[1].messages_sent = MESSAGE_COUNT;
    tests[1].messages_received = 0;
    
    // 启动服务器
    server_fd = perf_test_server(SERVER_PORT);
    if (server_fd == -1) {
        return -1;
    }
    
    // 启动消息发送器
    if (fork() == 0) {
        sleep(1);  // 等待服务器启动
        message_sender(SERVER_PORT, MESSAGE_COUNT, MESSAGE_SIZE);
        exit(0);
    }
    
    // 等待发送器启动并发送一些消息
    sleep(2);
    
    // 测试1: 使用recvmsg
    printf("\n测试1: 使用传统recvmsg...\n");
    gettimeofday(&tests[0].start_time, NULL);
    
    tests[0].messages_received = test_recvmsg_performance(server_fd, MESSAGE_COUNT);
    
    gettimeofday(&tests[0].end_time, NULL);
    
    printf("recvmsg测试完成: 接收 %lu 个消息\n", tests[0].messages_received);
    
    // 重新启动发送器进行第二个测试
    if (fork() == 0) {
        sleep(1);
        message_sender(SERVER_PORT, MESSAGE_COUNT, MESSAGE_SIZE);
        exit(0);
    }
    
    sleep(2);
    
    // 测试2: 使用recvmmsg
    printf("\n测试2: 使用recvmmsg...\n");
    gettimeofday(&tests[1].start_time, NULL);
    
    tests[1].messages_received = test_recvmmsg_performance(server_fd, MESSAGE_COUNT);
    
    gettimeofday(&tests[1].end_time, NULL);
    
    printf("recvmmsg测试完成: 接收 %lu 个消息\n", tests[1].messages_received);
    
    // 计算并显示结果
    printf("\n=== 性能测试结果 ===\n");
    
    for (int i = 0; i < 2; i++) {
        double elapsed_time = (tests[i].end_time.tv_sec - tests[i].start_time.tv_sec) +
                             (tests[i].end_time.tv_usec - tests[i].start_time.tv_usec) / 1000000.0;
        
        double messages_per_sec = tests[i].messages_received / elapsed_time;
        
        printf("%s 测试:\n", tests[i].name);
        printf("  接收消息数: %lu\n", tests[i].messages_received);
        printf("  耗时: %.3f 秒\n", elapsed_time);
        printf("  吞吐量: %.0f 消息/秒\n", messages_per_sec);
        printf("  平均延迟: %.3f 微秒/消息\n", 
               (elapsed_time * 1000000) / tests[i].messages_received);
        printf("\n");
    }
    
    // 计算性能提升
    double recvmsg_time = (tests[0].end_time.tv_sec - tests[0].start_time.tv_sec) +
                         (tests[0].end_time.tv_usec - tests[0].start_time.tv_usec) / 1000000.0;
    double recvmmsg_time = (tests[1].end_time.tv_sec - tests[1].start_time.tv_sec) +
                          (tests[1].end_time.tv_usec - tests[1].start_time.tv_usec) / 1000000.0;
    
    if (recvmsg_time > 0 && recvmmsg_time > 0) {
        double improvement = (recvmsg_time - recvmmsg_time) / recvmsg_time * 100;
        printf("性能提升: %.1f%%\n", improvement);
    }
    
    close(server_fd);
    
    // 等待发送器结束
    int status;
    wait(&status);
    wait(&status);
    
    return 0;
}

int main() {
    return demo_performance_comparison();
}

recvmmsg 标志参数详解 見出しへのリンク

常用标志: 見出しへのリンク

  • MSG_OOB: 接收带外数据
  • MSG_PEEK: 查看数据但不从队列中移除
  • MSG_WAITALL: 等待接收完整的消息
  • MSG_DONTWAIT: 非阻塞操作
  • MSG_TRUNC: 返回数据包的实际长度(UDP)

高级标志: 見出しへのリンク

  • MSG_ERRQUEUE: 接收错误队列中的数据
  • MSG_NOSIGNAL: 接收时不产生SIGPIPE信号

使用注意事项 見出しへのリンク

性能优化: 見出しへのリンク

  1. 批量大小: 根据应用需求选择合适的批量大小
  2. 缓冲区管理: 合理分配缓冲区避免内存浪费
  3. 超时设置: 设置合适的超时时间平衡响应性和效率

错误处理: 見出しへのリンク

  1. 部分接收: 处理实际接收消息数少于请求的情况
  2. 超时处理: 正确处理超时返回值
  3. 资源清理: 及时关闭套接字和释放内存

系统要求: 見出しへのリンク

  1. 内核版本: 需要Linux 2.6.39或更高版本
  2. glibc版本: 需要支持recvmmsg的glibc版本
  3. 编译选项: 需要定义_GNU_SOURCE

总结 見出しへのリンク

recvmmsg 是构建高性能网络应用的重要工具,它提供了:

  • 批量消息接收能力,显著减少系统调用开销
  • 与recvmsg相同的完整功能集
  • 更好的性能和可扩展性
  • 适用于高吞吐量的实时应用

通过合理使用 recvmmsg,可以大幅提升网络应用的性能,特别是在需要处理大量小消息的场景中效果显著。