recvmmsg 函数详解 链接到标题
1. 函数介绍 链接到标题
recvmmsg
是Linux 2.6.39引入的高效批量接收消息系统调用。它是 recvmsg
的批量版本,允许应用程序在单次系统调用中接收多个消息,显著减少了系统调用开销,特别适用于高吞吐量的网络服务器和实时应用。
2. 函数原型 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
int flags, struct timespec *timeout);
3. 功能 链接到标题
recvmmsg
允许从套接字批量接收多个消息,每个消息可以包含数据和控制信息。它支持分散缓冲区接收、地址信息获取、控制数据接收等功能,是构建高性能网络应用的关键工具。
4. 参数 链接到标题
- int sockfd: 套接字文件描述符
- *struct mmsghdr msgvec: 消息向量数组,描述多个接收消息
- unsigned int vlen: 消息向量数组的长度(最大可接收的消息数)
- int flags: 接收标志,与recvmsg相同
- *struct timespec timeout: 超时时间(NULL表示无限等待)
5. 返回值 链接到标题
- 成功: 返回实际接收到的消息数量
- 超时: 返回0
- 失败: 返回-1,并设置errno
6. 相似函数,或关联函数 链接到标题
- recvmsg: 单消息接收函数
- recv: 基本接收函数
- sendmmsg: 对应的批量发送函数
- poll/epoll: I/O多路复用函数
7. 示例代码 链接到标题
示例1:基础recvmmsg使用 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
/**
* 演示recvmmsg的基本使用方法
*/
int demo_recvmmsg_basic() {
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
struct mmsghdr msgvec[5]; // 批量接收5个消息
struct iovec iov[5][1]; // 每个消息1个缓冲区
char buffers[5][256]; // 5个缓冲区
ssize_t bytes_sent;
int messages_received;
printf("=== recvmmsg 基本使用示例 ===\n");
// 创建UDP服务器套接字
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);
// 绑定套接字
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server_fd);
return -1;
}
printf("UDP服务器监听在端口 8080\n");
// 启动UDP客户端发送多个消息
if (fork() == 0) {
// 客户端代码
sleep(1); // 等待服务器启动
int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8080);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
// 发送5个消息
for (int i = 0; i < 5; i++) {
char message[100];
snprintf(message, sizeof(message), "Message %d from client", i + 1);
bytes_sent = sendto(client_sock, message, strlen(message), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
if (bytes_sent != -1) {
printf("客户端发送消息 %d: %s\n", i + 1, message);
}
usleep(100000); // 100ms间隔
}
close(client_sock);
exit(0);
}
// 准备批量接收消息结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < 5; i++) {
// 设置每个消息的缓冲区
iov[i][0].iov_base = buffers[i];
iov[i][0].iov_len = sizeof(buffers[i]) - 1;
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = &client_addr;
msgvec[i].msg_hdr.msg_namelen = client_len;
}
printf("准备批量接收最多5个消息...\n");
// 批量接收消息(设置5秒超时)
struct timespec timeout = {5, 0}; // 5秒超时
messages_received = recvmmsg(server_fd, msgvec, 5, 0, &timeout);
if (messages_received == -1) {
perror("recvmmsg 失败");
close(server_fd);
return -1;
}
printf("成功接收 %d 个消息:\n", messages_received);
// 处理接收到的消息
for (int i = 0; i < messages_received; i++) {
buffers[i][msgvec[i].msg_len] = '\0'; // 添加字符串终止符
// 获取发送者地址信息
struct sockaddr_in *sender_addr = (struct sockaddr_in*)msgvec[i].msg_hdr.msg_name;
printf(" 消息 %d: %s (来自 %s:%d, 长度: %u 字节)\n",
i + 1, buffers[i],
inet_ntoa(sender_addr->sin_addr), ntohs(sender_addr->sin_port),
msgvec[i].msg_len);
}
close(server_fd);
// 等待客户端结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_recvmmsg_basic();
}
示例2:高性能UDP服务器 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <poll.h>
/**
* 高性能UDP服务器结构
*/
typedef struct {
int sockfd;
int port;
unsigned long total_messages;
unsigned long total_bytes;
time_t start_time;
} high_perf_udp_server_t;
/**
* 初始化高性能UDP服务器
*/
int server_init(high_perf_udp_server_t *server, int port) {
struct sockaddr_in server_addr;
memset(server, 0, sizeof(high_perf_udp_server_t));
server->port = port;
server->start_time = time(NULL);
// 创建UDP套接字
server->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (server->sockfd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
// 绑定套接字
if (bind(server->sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server->sockfd);
return -1;
}
printf("高性能UDP服务器启动,监听端口 %d\n", port);
return 0;
}
/**
* 使用recvmmsg处理批量消息
*/
int server_process_batch(high_perf_udp_server_t *server) {
const int BATCH_SIZE = 32; // 每次批量处理32个消息
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][2]; // 每个消息2个缓冲区
char buffers[BATCH_SIZE][2][256]; // 双缓冲区
struct sockaddr_in client_addrs[BATCH_SIZE];
int messages_received;
// 准备批量接收结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < BATCH_SIZE; i++) {
// 设置分散缓冲区
iov[i][0].iov_base = buffers[i][0];
iov[i][0].iov_len = sizeof(buffers[i][0]) - 1;
iov[i][1].iov_base = buffers[i][1];
iov[i][1].iov_len = sizeof(buffers[i][1]) - 1;
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 2;
msgvec[i].msg_hdr.msg_name = &client_addrs[i];
msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
}
// 批量接收消息(非阻塞,100ms超时)
struct timespec timeout = {0, 100000000}; // 100ms超时
messages_received = recvmmsg(server->sockfd, msgvec, BATCH_SIZE, MSG_DONTWAIT, &timeout);
if (messages_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0; // 没有数据可读
}
perror("recvmmsg 失败");
return -1;
}
if (messages_received == 0) {
return 0; // 超时
}
// 处理接收到的消息
for (int i = 0; i < messages_received; i++) {
// 合并分散缓冲区的内容
size_t total_len = msgvec[i].msg_len;
size_t buf1_len = (total_len > sizeof(buffers[i][0]) - 1) ?
sizeof(buffers[i][0]) - 1 : total_len;
size_t buf2_len = (total_len > buf1_len) ? total_len - buf1_len : 0;
buffers[i][0][buf1_len] = '\0';
if (buf2_len > 0) {
buffers[i][1][buf2_len] = '\0';
}
// 统计信息
server->total_messages++;
server->total_bytes += msgvec[i].msg_len;
// 简单回显处理
if (strncmp(buffers[i][0], "STATS", 5) == 0) {
char response[256];
snprintf(response, sizeof(response),
"Messages: %lu, Bytes: %lu, Uptime: %lds",
server->total_messages, server->total_bytes,
time(NULL) - server->start_time);
sendto(server->sockfd, response, strlen(response), 0,
(struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
} else {
// 回显原始消息
sendto(server->sockfd, buffers[i][0], buf1_len, 0,
(struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
if (buf2_len > 0) {
sendto(server->sockfd, buffers[i][1], buf2_len, 0,
(struct sockaddr*)&client_addrs[i], sizeof(client_addrs[i]));
}
}
if (server->total_messages % 1000 == 0) {
printf("已处理 %lu 条消息\n", server->total_messages);
}
}
return messages_received;
}
/**
* 运行高性能服务器
*/
int server_run(high_perf_udp_server_t *server, int duration_seconds) {
time_t start_time = time(NULL);
time_t current_time;
int total_batches = 0;
printf("服务器开始运行 %d 秒...\n", duration_seconds);
while ((current_time = time(NULL)) - start_time < duration_seconds) {
int batch_result = server_process_batch(server);
if (batch_result > 0) {
total_batches++;
} else if (batch_result == -1) {
break;
}
// 短暂休眠以避免过度占用CPU
usleep(1000); // 1ms
}
printf("服务器运行结束\n");
printf("统计信息:\n");
printf(" 总消息数: %lu\n", server->total_messages);
printf(" 总字节数: %lu\n", server->total_bytes);
printf(" 平均吞吐量: %.2f 消息/秒\n",
server->total_messages / (double)duration_seconds);
printf(" 处理批次: %d\n", total_batches);
return 0;
}
/**
* 演示高性能UDP服务器
*/
int demo_high_performance_udp_server() {
high_perf_udp_server_t server;
const int SERVER_PORT = 8081;
const int TEST_DURATION = 30; // 30秒测试
printf("=== 高性能UDP服务器示例 ===\n");
// 初始化服务器
if (server_init(&server, SERVER_PORT) != 0) {
return -1;
}
// 启动压力测试客户端
if (fork() == 0) {
sleep(1); // 等待服务器启动
// 创建多个并发客户端
for (int client_id = 0; client_id < 3; client_id++) {
if (fork() == 0) {
int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERVER_PORT);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
printf("客户端 %d 开始发送测试消息...\n", client_id + 1);
// 发送测试消息
for (int i = 0; i < 1000; i++) {
char message[128];
snprintf(message, sizeof(message),
"Client %d Test Message %d", client_id + 1, i + 1);
sendto(client_sock, message, strlen(message), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
// 随机间隔发送
usleep(1000 + (rand() % 5000)); // 1-6ms间隔
}
// 请求统计信息
const char *stats_cmd = "STATS";
sendto(client_sock, stats_cmd, strlen(stats_cmd), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
close(client_sock);
exit(0);
}
}
// 等待所有客户端完成
for (int i = 0; i < 3; i++) {
int status;
wait(&status);
}
exit(0);
}
// 运行服务器
server_run(&server, TEST_DURATION);
// 清理资源
close(server.sockfd);
// 等待测试客户端结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_high_performance_udp_server();
}
示例3:带控制信息的批量接收 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
/**
* 演示recvmmsg接收控制信息
*/
int demo_recvmmsg_control_data() {
int server_fd;
struct sockaddr_in server_addr;
struct mmsghdr msgvec[3];
struct iovec iov[3][1];
char buffers[3][256];
char control_buffers[3][1024];
struct sockaddr_in client_addrs[3];
int messages_received;
printf("=== recvmmsg 控制信息接收示例 ===\n");
// 创建UDP服务器套接字
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 启用时间戳选项
int timestamp_on = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_TIMESTAMP,
×tamp_on, sizeof(timestamp_on)) == -1) {
printf("警告: 无法启用时间戳选项: %s\n", strerror(errno));
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8082);
// 绑定套接字
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server_fd);
return -1;
}
printf("带控制信息的UDP服务器监听在端口 8082\n");
// 启动客户端发送消息
if (fork() == 0) {
sleep(1);
int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8082);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
// 发送3个消息
for (int i = 0; i < 3; i++) {
char message[100];
snprintf(message, sizeof(message), "Control Message %d", i + 1);
sendto(client_sock, message, strlen(message), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
printf("客户端发送: %s\n", message);
usleep(200000); // 200ms间隔
}
close(client_sock);
exit(0);
}
// 准备批量接收结构(带控制信息)
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < 3; i++) {
// 设置数据缓冲区
iov[i][0].iov_base = buffers[i];
iov[i][0].iov_len = sizeof(buffers[i]) - 1;
// 设置消息头
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = &client_addrs[i];
msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
// 设置控制缓冲区
msgvec[i].msg_hdr.msg_control = control_buffers[i];
msgvec[i].msg_hdr.msg_controllen = sizeof(control_buffers[i]);
}
printf("准备接收带控制信息的消息...\n");
// 批量接收消息
struct timespec timeout = {5, 0};
messages_received = recvmmsg(server_fd, msgvec, 3, 0, &timeout);
if (messages_received == -1) {
perror("recvmmsg 失败");
close(server_fd);
return -1;
}
printf("接收到 %d 个消息,处理控制信息:\n", messages_received);
// 处理接收到的消息和控制信息
for (int i = 0; i < messages_received; i++) {
buffers[i][msgvec[i].msg_len] = '\0';
printf("\n消息 %d:\n", i + 1);
printf(" 数据: %s\n", buffers[i]);
printf(" 长度: %u 字节\n", msgvec[i].msg_len);
// 解析控制信息
struct msghdr *hdr = &msgvec[i].msg_hdr;
struct cmsghdr *cmsg;
printf(" 控制信息:\n");
int control_found = 0;
for (cmsg = CMSG_FIRSTHDR(hdr); cmsg != NULL; cmsg = CMSG_NXTHDR(hdr, cmsg)) {
control_found = 1;
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
struct timeval *tv = (struct timeval*)CMSG_DATA(cmsg);
printf(" 时间戳: %ld.%06ld\n", tv->tv_sec, tv->tv_usec);
// 转换为可读格式
char time_str[64];
time_t sec = tv->tv_sec;
struct tm *tm_info = localtime(&sec);
strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", tm_info);
printf(" 可读时间: %s.%06ld\n", time_str, tv->tv_usec);
} else {
printf(" 其他控制信息: level=%d, type=%d, len=%zu\n",
cmsg->cmsg_level, cmsg->cmsg_type, cmsg->cmsg_len);
}
}
if (!control_found) {
printf(" 没有控制信息\n");
}
}
close(server_fd);
// 等待客户端结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_recvmmsg_control_data();
}
示例4:实时数据采集应用 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
/**
* 实时数据采集结构
*/
typedef struct {
int sockfd;
int port;
unsigned long packets_received;
unsigned long bytes_received;
unsigned long batches_processed;
time_t start_time;
volatile int running;
} data_collector_t;
// 全局变量用于信号处理
static data_collector_t *g_collector = NULL;
/**
* 信号处理函数
*/
void signal_handler(int sig) {
if (g_collector) {
g_collector->running = 0;
printf("\n收到信号 %d,准备退出...\n", sig);
}
}
/**
* 初始化数据采集器
*/
int collector_init(data_collector_t *collector, int port) {
struct sockaddr_in server_addr;
memset(collector, 0, sizeof(data_collector_t));
collector->port = port;
collector->start_time = time(NULL);
collector->running = 1;
// 设置全局指针用于信号处理
g_collector = collector;
// 注册信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 创建UDP套接字
collector->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (collector->sockfd == -1) {
perror("创建UDP套接字失败");
return -1;
}
// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
// 绑定套接字
if (bind(collector->sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(collector->sockfd);
return -1;
}
printf("实时数据采集器启动,监听端口 %d\n", port);
return 0;
}
/**
* 处理批量数据
*/
int collector_process_batch(data_collector_t *collector) {
const int BATCH_SIZE = 64; // 每次处理64个数据包
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][1];
char buffers[BATCH_SIZE][512];
struct sockaddr_in client_addrs[BATCH_SIZE];
int messages_received;
// 准备批量接收结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < BATCH_SIZE; i++) {
iov[i][0].iov_base = buffers[i];
iov[i][0].iov_len = sizeof(buffers[i]) - 1;
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
msgvec[i].msg_hdr.msg_name = &client_addrs[i];
msgvec[i].msg_hdr.msg_namelen = sizeof(client_addrs[i]);
}
// 批量接收消息(非阻塞,50ms超时)
struct timespec timeout = {0, 50000000}; // 50ms超时
messages_received = recvmmsg(collector->sockfd, msgvec, BATCH_SIZE,
MSG_DONTWAIT, &timeout);
if (messages_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0; // 没有数据
}
perror("recvmmsg 失败");
return -1;
}
if (messages_received == 0) {
return 0; // 超时
}
// 更新统计信息
collector->batches_processed++;
collector->packets_received += messages_received;
// 处理接收到的数据包
for (int i = 0; i < messages_received; i++) {
collector->bytes_received += msgvec[i].msg_len;
// 解析数据包(假设是简单的传感器数据格式)
buffers[i][msgvec[i].msg_len] = '\0';
// 简单的数据包解析示例
if (strncmp(buffers[i], "SENSOR:", 7) == 0) {
// 传感器数据格式: SENSOR:id,value,timestamp
char *token = strtok(buffers[i] + 7, ",");
if (token) {
int sensor_id = atoi(token);
token = strtok(NULL, ",");
if (token) {
double value = atof(token);
token = strtok(NULL, ",");
if (token) {
long timestamp = atol(token);
// 实时处理传感器数据
if (collector->packets_received % 1000 == 0) {
printf("传感器%d: 值=%.2f, 时间戳=%ld\n",
sensor_id, value, timestamp);
}
}
}
}
}
}
return messages_received;
}
/**
* 显示实时统计信息
*/
void collector_show_stats(data_collector_t *collector) {
time_t current_time = time(NULL);
double uptime = difftime(current_time, collector->start_time);
if (uptime > 0) {
double packets_per_sec = collector->packets_received / uptime;
double bytes_per_sec = collector->bytes_received / uptime;
printf("\r运行时间: %.0fs | 数据包: %lu | 字节: %lu | "
"速率: %.0f包/s %.2fMB/s | 批次: %lu",
uptime, collector->packets_received, collector->bytes_received,
packets_per_sec, bytes_per_sec / (1024 * 1024),
collector->batches_processed);
fflush(stdout);
}
}
/**
* 运行数据采集器
*/
int collector_run(data_collector_t *collector) {
printf("数据采集器开始运行,按 Ctrl+C 停止\n");
time_t last_stats_time = time(NULL);
while (collector->running) {
int result = collector_process_batch(collector);
if (result == -1) {
break;
}
// 定期显示统计信息
time_t current_time = time(NULL);
if (current_time - last_stats_time >= 1) {
collector_show_stats(collector);
last_stats_time = current_time;
}
}
printf("\n数据采集器停止\n");
return 0;
}
/**
* 模拟传感器数据发送器
*/
int sensor_data_sender(int port, int packet_count) {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in serv_addr;
char message[256];
if (sock == -1) {
perror("创建发送套接字失败");
return -1;
}
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
printf("传感器数据发送器开始发送 %d 个数据包...\n", packet_count);
srand(time(NULL));
for (int i = 0; i < packet_count; i++) {
// 生成模拟传感器数据
int sensor_id = rand() % 10;
double value = (rand() % 1000) / 10.0;
long timestamp = time(NULL);
snprintf(message, sizeof(message), "SENSOR:%d,%.2f,%ld",
sensor_id, value, timestamp);
sendto(sock, message, strlen(message), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
// 控制发送速率
usleep(1000); // 1ms间隔
}
printf("传感器数据发送完成\n");
close(sock);
return 0;
}
/**
* 演示实时数据采集应用
*/
int demo_real_time_data_collection() {
data_collector_t collector;
const int SERVER_PORT = 8083;
printf("=== 实时数据采集应用示例 ===\n");
// 初始化采集器
if (collector_init(&collector, SERVER_PORT) != 0) {
return -1;
}
// 启动传感器数据发送器
if (fork() == 0) {
sleep(1); // 等待采集器启动
sensor_data_sender(SERVER_PORT, 5000); // 发送5000个数据包
exit(0);
}
// 运行采集器30秒
sleep(1);
time_t start_time = time(NULL);
while (collector.running && (time(NULL) - start_time < 30)) {
collector_process_batch(&collector);
// 每秒显示一次统计
static time_t last_stats = 0;
time_t now = time(NULL);
if (now - last_stats >= 1) {
collector_show_stats(&collector);
last_stats = now;
}
usleep(10000); // 10ms循环间隔
}
collector.running = 0;
// 显示最终统计
printf("\n\n最终统计:\n");
printf(" 总数据包: %lu\n", collector.packets_received);
printf(" 总字节数: %lu\n", collector.bytes_received);
printf(" 处理批次: %lu\n", collector.batches_processed);
printf(" 平均处理速率: %.0f 包/秒\n",
collector.packets_received / 30.0);
// 清理资源
close(collector.sockfd);
// 等待发送器结束
int status;
wait(&status);
return 0;
}
int main() {
return demo_real_time_data_collection();
}
示例5:性能对比测试 链接到标题
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
/**
* 性能测试结构
*/
typedef struct {
const char *name;
unsigned long messages_sent;
unsigned long messages_received;
struct timeval start_time;
struct timeval end_time;
} perf_test_t;
/**
* 使用传统recvmsg进行测试
*/
int test_recvmsg_performance(int sockfd, int message_count) {
struct msghdr msg;
struct iovec iov[1];
char buffer[256];
ssize_t bytes_received;
int received_count = 0;
// 准备接收结构
memset(&msg, 0, sizeof(msg));
iov[0].iov_base = buffer;
iov[0].iov_len = sizeof(buffer) - 1;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
// 接收指定数量的消息
while (received_count < message_count) {
bytes_received = recvmsg(sockfd, &msg, MSG_DONTWAIT);
if (bytes_received > 0) {
received_count++;
} else if (bytes_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(1000); // 短暂等待
continue;
} else {
break;
}
} else {
break; // 连接关闭
}
}
return received_count;
}
/**
* 使用recvmmsg进行测试
*/
int test_recvmmsg_performance(int sockfd, int message_count) {
const int BATCH_SIZE = 32;
struct mmsghdr msgvec[BATCH_SIZE];
struct iovec iov[BATCH_SIZE][1];
char buffers[BATCH_SIZE][256];
int total_received = 0;
int messages_received;
// 准备批量接收结构
memset(msgvec, 0, sizeof(msgvec));
for (int i = 0; i < BATCH_SIZE; i++) {
iov[i][0].iov_base = buffers[i];
iov[i][0].iov_len = sizeof(buffers[i]) - 1;
msgvec[i].msg_hdr.msg_iov = iov[i];
msgvec[i].msg_hdr.msg_iovlen = 1;
}
// 批量接收消息
while (total_received < message_count) {
int to_receive = (message_count - total_received < BATCH_SIZE) ?
message_count - total_received : BATCH_SIZE;
struct timespec timeout = {1, 0}; // 1秒超时
messages_received = recvmmsg(sockfd, msgvec, to_receive, MSG_DONTWAIT, &timeout);
if (messages_received > 0) {
total_received += messages_received;
} else if (messages_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(1000);
continue;
} else {
break;
}
} else {
break; // 超时或无数据
}
}
return total_received;
}
/**
* UDP性能测试服务器
*/
int perf_test_server(int port) {
int server_fd;
struct sockaddr_in server_addr;
server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP套接字失败");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server_fd);
return -1;
}
printf("性能测试服务器启动,监听端口 %d\n", port);
return server_fd;
}
/**
* 消息发送器
*/
int message_sender(int port, int message_count, int message_size) {
int client_sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in serv_addr;
char *message;
ssize_t bytes_sent;
if (client_sock == -1) {
perror("创建客户端套接字失败");
return -1;
}
// 分配消息缓冲区
message = malloc(message_size + 1);
if (!message) {
perror("分配消息缓冲区失败");
close(client_sock);
return -1;
}
// 填充消息内容
for (int i = 0; i < message_size; i++) {
message[i] = 'A' + (i % 26);
}
message[message_size] = '\0';
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
printf("发送 %d 个大小为 %d 字节的消息...\n", message_count, message_size);
for (int i = 0; i < message_count; i++) {
bytes_sent = sendto(client_sock, message, message_size, 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr));
if (bytes_sent == -1) {
perror("发送消息失败");
break;
}
if (i % 1000 == 0) {
printf("已发送 %d 个消息\n", i);
}
}
printf("消息发送完成\n");
free(message);
close(client_sock);
return 0;
}
/**
* 演示性能对比测试
*/
int demo_performance_comparison() {
int server_fd;
const int SERVER_PORT = 8084;
const int MESSAGE_COUNT = 10000;
const int MESSAGE_SIZE = 128;
perf_test_t tests[2];
printf("=== recvmmsg vs recvmsg 性能对比测试 ===\n");
// 初始化测试结构
tests[0].name = "recvmsg";
tests[0].messages_sent = MESSAGE_COUNT;
tests[0].messages_received = 0;
tests[1].name = "recvmmsg";
tests[1].messages_sent = MESSAGE_COUNT;
tests[1].messages_received = 0;
// 启动服务器
server_fd = perf_test_server(SERVER_PORT);
if (server_fd == -1) {
return -1;
}
// 启动消息发送器
if (fork() == 0) {
sleep(1); // 等待服务器启动
message_sender(SERVER_PORT, MESSAGE_COUNT, MESSAGE_SIZE);
exit(0);
}
// 等待发送器启动并发送一些消息
sleep(2);
// 测试1: 使用recvmsg
printf("\n测试1: 使用传统recvmsg...\n");
gettimeofday(&tests[0].start_time, NULL);
tests[0].messages_received = test_recvmsg_performance(server_fd, MESSAGE_COUNT);
gettimeofday(&tests[0].end_time, NULL);
printf("recvmsg测试完成: 接收 %lu 个消息\n", tests[0].messages_received);
// 重新启动发送器进行第二个测试
if (fork() == 0) {
sleep(1);
message_sender(SERVER_PORT, MESSAGE_COUNT, MESSAGE_SIZE);
exit(0);
}
sleep(2);
// 测试2: 使用recvmmsg
printf("\n测试2: 使用recvmmsg...\n");
gettimeofday(&tests[1].start_time, NULL);
tests[1].messages_received = test_recvmmsg_performance(server_fd, MESSAGE_COUNT);
gettimeofday(&tests[1].end_time, NULL);
printf("recvmmsg测试完成: 接收 %lu 个消息\n", tests[1].messages_received);
// 计算并显示结果
printf("\n=== 性能测试结果 ===\n");
for (int i = 0; i < 2; i++) {
double elapsed_time = (tests[i].end_time.tv_sec - tests[i].start_time.tv_sec) +
(tests[i].end_time.tv_usec - tests[i].start_time.tv_usec) / 1000000.0;
double messages_per_sec = tests[i].messages_received / elapsed_time;
printf("%s 测试:\n", tests[i].name);
printf(" 接收消息数: %lu\n", tests[i].messages_received);
printf(" 耗时: %.3f 秒\n", elapsed_time);
printf(" 吞吐量: %.0f 消息/秒\n", messages_per_sec);
printf(" 平均延迟: %.3f 微秒/消息\n",
(elapsed_time * 1000000) / tests[i].messages_received);
printf("\n");
}
// 计算性能提升
double recvmsg_time = (tests[0].end_time.tv_sec - tests[0].start_time.tv_sec) +
(tests[0].end_time.tv_usec - tests[0].start_time.tv_usec) / 1000000.0;
double recvmmsg_time = (tests[1].end_time.tv_sec - tests[1].start_time.tv_sec) +
(tests[1].end_time.tv_usec - tests[1].start_time.tv_usec) / 1000000.0;
if (recvmsg_time > 0 && recvmmsg_time > 0) {
double improvement = (recvmsg_time - recvmmsg_time) / recvmsg_time * 100;
printf("性能提升: %.1f%%\n", improvement);
}
close(server_fd);
// 等待发送器结束
int status;
wait(&status);
wait(&status);
return 0;
}
int main() {
return demo_performance_comparison();
}
recvmmsg 标志参数详解 链接到标题
常用标志: 链接到标题
- MSG_OOB: 接收带外数据
- MSG_PEEK: 查看数据但不从队列中移除
- MSG_WAITALL: 等待接收完整的消息
- MSG_DONTWAIT: 非阻塞操作
- MSG_TRUNC: 返回数据包的实际长度(UDP)
高级标志: 链接到标题
- MSG_ERRQUEUE: 接收错误队列中的数据
- MSG_NOSIGNAL: 接收时不产生SIGPIPE信号
使用注意事项 链接到标题
性能优化: 链接到标题
- 批量大小: 根据应用需求选择合适的批量大小
- 缓冲区管理: 合理分配缓冲区避免内存浪费
- 超时设置: 设置合适的超时时间平衡响应性和效率
错误处理: 链接到标题
- 部分接收: 处理实际接收消息数少于请求的情况
- 超时处理: 正确处理超时返回值
- 资源清理: 及时关闭套接字和释放内存
系统要求: 链接到标题
- 内核版本: 需要Linux 2.6.39或更高版本
- glibc版本: 需要支持recvmmsg的glibc版本
- 编译选项: 需要定义_GNU_SOURCE
总结 链接到标题
recvmmsg
是构建高性能网络应用的重要工具,它提供了:
- 批量消息接收能力,显著减少系统调用开销
- 与recvmsg相同的完整功能集
- 更好的性能和可扩展性
- 适用于高吞吐量的实时应用
通过合理使用 recvmmsg
,可以大幅提升网络应用的性能,特别是在需要处理大量小消息的场景中效果显著。