mq_timedreceive系统调用及示例

mq_timedreceive函数详解

  1. 函数介绍

mq_timedreceive函数是Linux系统中用于在指定时间内从POSIX消息队列接收消息的函数。它是mq_receive函数的增强版本,支持超时控制。可以把mq_timedreceive想象成一个”限时消息接收器”,它能够在指定的时间内尝试接收消息,如果超时则返回错误。

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

这个函数特别适用于需要控制接收等待时间的场景,比如实时系统、服务器应用或需要避免无限期阻塞的程序。

使用场景:

  • 实时系统的消息接收

  • 服务器程序的请求处理

  • 避免无限期阻塞的接收操作

  • 超时控制的网络应用

  • 高可用性系统中的消息处理

  1. 函数原型
1
2
3
4
5
6
#include <mqueue.h>
#include <time.h>

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abs_timeout);

  1. 功能

mq_timedreceive函数的主要功能是在指定的绝对超时时间内从消息队列接收消息。如果队列为空且在超时时间内没有消息到达,则返回错误。

  1. 参数

mqdes: 消息队列描述符

  • 类型:mqd_t

  • 含义:已打开的消息队列描述符

msg_ptr: 消息缓冲区指针

  • 类型:char*

  • 含义:指向存储接收消息的缓冲区

msg_len: 缓冲区大小

  • 类型:size_t

  • 含义:消息缓冲区的大小(字节数)

msg_prio: 消息优先级指针

  • 类型:unsigned int*

  • 含义:指向存储消息优先级的变量(可为NULL)

abs_timeout: 绝对超时时间

  • 类型:const struct timespec*

  • 含义:绝对超时时间(基于CLOCK_REALTIME)

  1. 返回值
  • 成功: 返回接收到的消息字节数

失败: 返回-1,并设置errno错误码

  • EAGAIN:超时时间内没有消息可接收

  • EBADF:无效的消息队列描述符

  • EINTR:被信号中断

  • EINVAL:参数无效

  • EMSGSIZE:缓冲区太小

  • ETIMEDOUT:超时

  1. 相似函数或关联函数
  • mq_receive(): 接收消息(阻塞)

  • mq_send(): 发送消息

  • mq_timedsend(): 限时发送消息

  • clock_gettime(): 获取当前时间

  • pthread_cond_timedwait(): 限时条件等待

  1. 示例代码

示例1:基础mq_timedreceive使用 - 超时控制接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建消息队列
mqd_t create_test_queue(const char* name) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 10,
.mq_msgsize = 256,
.mq_curmsgs = 0
};

mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return -1;
}

printf("创建测试队列: %s\n", name);
return mq;
}

// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}

abs_timeout->tv_sec += seconds;
return 0;
}

int main() {
printf("=== 基础mq_timedreceive使用示例 ===\n");

const char* queue_name = "/receive_test_queue";

// 创建测试队列
mqd_t mq = create_test_queue(queue_name);
if (mq == -1) {
exit(EXIT_FAILURE);
}

// 演示mq_timedreceive超时(空队列)
printf("1. 演示空队列超时接收:\n");

struct timespec abs_timeout;
if (calculate_absolute_timeout(&abs_timeout, 3) == -1) { // 3秒超时
mq_close(mq);
mq_unlink(queue_name);
exit(EXIT_FAILURE);
}

char buffer&#91;256];
unsigned int priority;

printf("从空队列接收消息(3秒超时):\n");
clock_t start_time = clock();
ssize_t result = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
clock_t end_time = clock();

double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;

if (result == -1) {
if (errno == ETIMEDOUT) {
printf("✗ 接收超时 (耗时: %.2f 秒)\n", elapsed_time);
} else if (errno == EAGAIN) {
printf("✗ 暂无消息: %s\n", strerror(errno));
} else {
printf("✗ 接收失败: %s\n", strerror(errno));
}
} else {
buffer&#91;result] = '\0';
printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
}

// 发送一些测试消息
printf("\n2. 发送测试消息:\n");
const char* messages&#91;] = {
"第一条测试消息",
"第二条测试消息",
"第三条测试消息"
};

for (int i = 0; i < 3; i++) {
unsigned int priorities&#91;] = {1, 5, 3};
if (mq_send(mq, messages&#91;i], strlen(messages&#91;i]), priorities&#91;i]) == -1) {
perror("发送消息失败");
} else {
printf("发送: %s (优先级: %u)\n", messages&#91;i], priorities&#91;i]);
}
}

// 演示成功的mq_timedreceive
printf("\n3. 演示成功接收消息:\n");

if (calculate_absolute_timeout(&abs_timeout, 5) == 0) { // 5秒超时
printf("接收消息(队列有消息):\n");

// 接收所有消息
while (1) {
ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
if (bytes_received > 0) {
buffer&#91;bytes_received] = '\0';
printf("✓ 接收到消息: %s (优先级: %u)\n", buffer, priority);
} else {
if (errno == ETIMEDOUT || errno == EAGAIN) {
printf("无更多消息可接收\n");
break;
} else {
printf("接收失败: %s\n", strerror(errno));
break;
}
}
}
}

// 演示缓冲区大小处理
printf("\n4. 缓冲区大小处理演示:\n");

// 发送一条长消息
char long_message&#91;200];
memset(long_message, 'A', sizeof(long_message) - 1);
long_message&#91;sizeof(long_message) - 1] = '\0';

if (mq_send(mq, long_message, strlen(long_message), 0) == 0) {
printf("发送长消息成功\n");

// 使用过小的缓冲区接收
char small_buffer&#91;50];
if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
ssize_t bytes_received = mq_timedreceive(mq, small_buffer, sizeof(small_buffer), NULL, &abs_timeout);
if (bytes_received == -1) {
if (errno == EMSGSIZE) {
printf("✗ 缓冲区太小 (预期错误)\n");
} else {
printf("✗ 其他错误: %s\n", strerror(errno));
}
} else {
small_buffer&#91;bytes_received] = '\0';
printf("✓ 接收到截断消息: %s (%zd 字节)\n", small_buffer, bytes_received);
}
}

// 使用足够大的缓冲区接收
char large_buffer&#91;256];
if (calculate_absolute_timeout(&abs_timeout, 2) == 0) {
ssize_t bytes_received = mq_timedreceive(mq, large_buffer, sizeof(large_buffer), NULL, &abs_timeout);
if (bytes_received > 0) {
large_buffer&#91;bytes_received] = '\0';
printf("✓ 接收到完整消息 (%zd 字节)\n", bytes_received);
}
}
}

// 演示优先级接收
printf("\n5. 优先级接收演示:\n");

// 发送不同优先级的消息
struct {
const char* message;
unsigned int priority;
} priority_messages&#91;] = {
{"低优先级消息", 1},
{"中优先级消息", 5},
{"高优先级消息", 10},
{"最高优先级消息", 15}
};

for (int i = 0; i < 4; i++) {
if (mq_send(mq, priority_messages&#91;i].message, strlen(priority_messages&#91;i].message),
priority_messages&#91;i].priority) == 0) {
printf("发送: %s (优先级: %u)\n", priority_messages&#91;i].message, priority_messages&#91;i].priority);
}
}

// 接收消息(应该按优先级顺序接收)
printf("按优先级顺序接收消息:\n");
if (calculate_absolute_timeout(&abs_timeout, 3) == 0) {
for (int i = 0; i < 4; i++) {
ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
if (bytes_received > 0) {
buffer&#91;bytes_received] = '\0';
printf("接收: %s (优先级: %u)\n", buffer, priority);
}
}
}

// 清理资源
printf("\n6. 清理资源:\n");
mq_close(mq);
mq_unlink(queue_name);
printf("队列已清理\n");

printf("\n=== 基础mq_timedreceive演示完成 ===\n");

return 0;
}

示例2:服务器应用中的超时消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

#define MAX_REQUESTS 100
#define REQUEST_SIZE 512

volatile sig_atomic_t server_running = 1;

// 服务器请求结构
typedef struct {
char client_id&#91;32];
char request_data&#91;256];
time_t timestamp;
int request_id;
} server_request_t;

// 服务器响应结构
typedef struct {
int request_id;
char response_data&#91;256];
int status;
time_t timestamp;
} server_response_t;

// 信号处理函数
void signal_handler(int sig) {
printf("服务器收到停止信号 %d\n", sig);
server_running = 0;
}

// 创建服务器队列
mqd_t create_server_queues(const char* request_queue, const char* response_queue) {
struct mq_attr request_attr = {
.mq_flags = 0,
.mq_maxmsg = 20,
.mq_msgsize = sizeof(server_request_t),
.mq_curmsgs = 0
};

struct mq_attr response_attr = {
.mq_flags = 0,
.mq_maxmsg = 20,
.mq_msgsize = sizeof(server_response_t),
.mq_curmsgs = 0
};

// 创建请求队列
mqd_t req_mq = mq_open(request_queue, O_CREAT | O_RDONLY, 0644, &request_attr);
if (req_mq == (mqd_t)-1) {
perror("创建请求队列失败");
return -1;
}

// 创建响应队列
mqd_t resp_mq = mq_open(response_queue, O_CREAT | O_WRONLY, 0644, &response_attr);
if (resp_mq == (mqd_t)-1) {
perror("创建响应队列失败");
mq_close(req_mq);
return -1;
}

printf("服务器队列创建成功:\n");
printf(" 请求队列: %s\n", request_queue);
printf(" 响应队列: %s\n", response_queue);

return req_mq; // 返回请求队列描述符
}

// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}

long seconds = milliseconds / 1000;
long nanoseconds = (milliseconds % 1000) * 1000000;

abs_timeout->tv_sec += seconds;
abs_timeout->tv_nsec += nanoseconds;

if (abs_timeout->tv_nsec >= 1000000000) {
abs_timeout->tv_sec++;
abs_timeout->tv_nsec -= 1000000000;
}

return 0;
}

// 处理服务器请求
void process_server_requests(mqd_t req_mq, mqd_t resp_mq) {
printf("服务器开始处理请求...\n");

int processed_requests = 0;
time_t last_status_time = time(NULL);

while (server_running) {
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, 1000) == -1) { // 1秒超时
continue;
}

server_request_t request;
unsigned int priority;

ssize_t bytes_received = mq_timedreceive(req_mq, (char*)&request, sizeof(request), &priority, &abs_timeout);

if (bytes_received > 0) {
// 处理请求
printf("处理请求 #%d 来自客户端 %s\n", request.request_id, request.client_id);

// 模拟处理时间
usleep(100000); // 0.1秒

// 构造响应
server_response_t response;
response.request_id = request.request_id;
snprintf(response.response_data, sizeof(response.response_data),
"请求 #%d 已处理完成", request.request_id);
response.status = 200;
response.timestamp = time(NULL);

// 发送响应
if (mq_send(resp_mq, (char*)&response, sizeof(response), priority) == 0) {
printf("响应已发送: 请求 #%d\n", request.request_id);
processed_requests++;
} else {
printf("发送响应失败: %s\n", strerror(errno));
}
} else if (errno == ETIMEDOUT || errno == EAGAIN) {
// 超时或无消息,继续循环
} else {
printf("接收请求失败: %s\n", strerror(errno));
if (errno != EINTR) {
break;
}
}

// 定期显示状态
time_t current_time = time(NULL);
if (current_time - last_status_time >= 5) {
printf("服务器状态: 已处理 %d 个请求\n", processed_requests);
last_status_time = current_time;
}
}

printf("服务器停止,总共处理 %d 个请求\n", processed_requests);
}

// 客户端模拟器
void client_simulator(const char* request_queue, const char* response_queue, int client_id) {
printf("客户端 %d 启动\n", client_id);

// 打开队列
mqd_t req_mq = mq_open(request_queue, O_WRONLY);
mqd_t resp_mq = mq_open(response_queue, O_RDONLY);

if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
perror("客户端打开队列失败");
if (req_mq != (mqd_t)-1) mq_close(req_mq);
if (resp_mq != (mqd_t)-1) mq_close(resp_mq);
exit(EXIT_FAILURE);
}

srand(time(NULL) + client_id);

// 发送请求
for (int i = 0; i < 5; i++) {
server_request_t request;
snprintf(request.client_id, sizeof(request.client_id), "Client_%d", client_id);
snprintf(request.request_data, sizeof(request.request_data), "请求数据_%d", i + 1);
request.timestamp = time(NULL);
request.request_id = client_id * 100 + i + 1;

unsigned int priority = rand() % 10;

if (mq_send(req_mq, (char*)&request, sizeof(request), priority) == 0) {
printf("客户端 %d 发送请求 #%d\n", client_id, request.request_id);
} else {
printf("客户端 %d 发送请求失败: %s\n", client_id, strerror(errno));
continue;
}

// 等待响应
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, 3000) == 0) { // 3秒超时
server_response_t response;
ssize_t bytes_received = mq_timedreceive(resp_mq, (char*)&response, sizeof(response), NULL, &abs_timeout);

if (bytes_received > 0) {
printf("客户端 %d 收到响应: %s (状态: %d)\n",
client_id, response.response_data, response.status);
} else if (errno == ETIMEDOUT) {
printf("客户端 %d 等待响应超时\n", client_id);
} else {
printf("客户端 %d 接收响应失败: %s\n", client_id, strerror(errno));
}
}

sleep(1); // 客户端间隔
}

mq_close(req_mq);
mq_close(resp_mq);
printf("客户端 %d 完成\n", client_id);
}

int main() {
printf("=== 服务器应用超时消息处理示例 ===\n");

const char* request_queue = "/server_requests";
const char* response_queue = "/server_responses";

// 设置信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);

// 启动服务器进程
pid_t server_pid = fork();
if (server_pid == 0) {
// 服务器进程
mqd_t req_mq = mq_open(request_queue, O_RDONLY);
mqd_t resp_mq = mq_open(response_queue, O_WRONLY);

if (req_mq == (mqd_t)-1 || resp_mq == (mqd_t)-1) {
perror("服务器打开队列失败");
exit(EXIT_FAILURE);
}

process_server_requests(req_mq, resp_mq);

mq_close(req_mq);
mq_close(resp_mq);
exit(EXIT_SUCCESS);
}

// 等待服务器启动
sleep(1);

// 启动多个客户端进程
pid_t clients&#91;3];
for (int i = 0; i < 3; i++) {
clients&#91;i] = fork();
if (clients&#91;i] == 0) {
client_simulator(request_queue, response_queue, i + 1);
exit(EXIT_SUCCESS);
}
}

// 等待客户端完成
for (int i = 0; i < 3; i++) {
waitpid(clients&#91;i], NULL, 0);
}

// 停止服务器
server_running = 0;
sleep(2);
waitpid(server_pid, NULL, 0);

// 清理队列
mq_unlink(request_queue);
mq_unlink(response_queue);

printf("\n=== 服务器应用演示完成 ===\n");

return 0;
}

编译和运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 编译示例(需要链接实时库)
gcc -o mq_unlink_example1 mq_unlink_example1.c -lrt
gcc -o mq_timedsend_example1 mq_timedsend_example1.c -lrt
gcc -o mq_timedsend_example2 mq_timedsend_example2.c -lrt
gcc -o mq_timedreceive_example1 mq_timedreceive_example1.c -lrt
gcc -o mq_timedreceive_example2 mq_timedreceive_example2.c -lrt

# 运行示例
./mq_unlink_example1
./mq_timedsend_example1
./mq_timedsend_example2
./mq_timedreceive_example1
./mq_timedreceive_example2

重要注意事项

权限要求: 需要适当的文件系统权限来创建和访问消息队列

名称规范: 消息队列名称必须以’/‘开头

超时时间: 使用绝对时间而非相对时间

资源管理: 必须正确关闭队列描述符和删除队列

错误处理: 必须检查返回值并处理各种错误情况

线程安全: 消息队列操作是线程安全的

系统限制: 受系统消息队列数量和大小限制

最佳实践

资源清理: 及时关闭队列描述符和删除不需要的队列

超时设置: 合理设置超时时间以避免无限期等待

错误处理: 完善的错误处理和恢复机制

优先级使用: 合理使用消息优先级

缓冲区管理: 确保缓冲区大小足够

信号处理: 正确处理信号中断

性能监控: 监控队列性能和系统资源使用

通过这些示例,你可以理解POSIX消息队列相关函数在进程间通信方面的强大功能,它们为Linux系统提供了高效、可靠的IPC机制,特别适用于实时系统、服务器应用和分布式系统。

data-ad-format="auto" data-full-width-responsive="true">