POSIX 消息队列 (mq_*) 函数详解

POSIX 消息队列 (mq_*) 函数详解

  1. 函数介绍

POSIX 消息队列是一组用于进程间通信(IPC)的函数,提供了一种可靠的、基于消息的通信机制。可以把消息队列想象成”邮局系统”——发送者将消息放入邮箱(队列),接收者从邮箱中取出消息,就像现实中的邮政服务一样。

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

与传统的 System V 消息队列相比,POSIX 消息队列具有更好的可移植性和更简洁的 API。它们支持优先级消息、持久化、以及通过文件系统路径名进行命名。

  1. 核心函数原型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>

// 核心函数
mqd_t mq_open(const char *name, int oflag, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

  1. 功能

POSIX 消息队列提供以下功能:

  • 创建和打开消息队列

  • 发送和接收消息

  • 设置和获取队列属性

  • 异步通知机制

  • 持久化支持

  • 优先级消息支持

  1. 核心结构体

struct mq_attr

1
2
3
4
5
6
7
struct mq_attr {
long mq_flags; /* 消息队列标志 */
long mq_maxmsg; /* 最大消息数 */
long mq_msgsize; /* 最大消息大小 */
long mq_curmsgs; /* 当前消息数 */
};

struct sigevent (用于通知)

1
2
3
4
5
6
7
8
struct sigevent {
int sigev_notify; /* 通知类型 */
int sigev_signo; /* 信号编号 */
union sigval sigev_value; /* 传递给处理函数的数据 */
void (*sigev_notify_function)(union sigval); /* 线程函数 */
pthread_attr_t *sigev_notify_attributes; /* 线程属性 */
};

  1. 消息队列名称
  • 名称必须以 ‘/’ 开头

  • 长度限制为 NAME_MAX (通常 255 字符)

  • 示例:“/my_queue”, “/app/messages”

  1. 打开标志 (oflag)

标志说明O_RDONLY只读打开O_WRONLY只写打开O_RDWR读写打开O_CREAT不存在时创建O_EXCL与 O_CREAT 一起使用,如果存在则失败O_NONBLOCK非阻塞模式

  1. 返回值
  • mq_open: 成功返回消息队列描述符,失败返回 (mqd_t)-1

  • 其他函数: 成功返回 0,失败返回 -1

  1. 相关函数
  • pthread: 多线程支持

  • signal: 信号处理

  • fcntl: 文件控制

  • unlink: 删除文件

  1. 示例代码

示例1:基础用法 - 简单的消息发送和接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>

#define QUEUE_NAME "/example_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10

int main() {
mqd_t mq;
struct mq_attr attr;
char send_buffer&#91;MAX_MSG_SIZE];
char recv_buffer&#91;MAX_MSG_SIZE];
ssize_t bytes_read;
unsigned int priority;

printf("=== POSIX 消息队列基础示例 ===\n\n");

// 设置消息队列属性
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

// 创建并打开消息队列
printf("创建消息队列: %s\n", QUEUE_NAME);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
printf("✓ 消息队列创建成功\n\n");

// 获取并显示队列属性
printf("消息队列属性:\n");
if (mq_getattr(mq, &attr) == 0) {
printf(" 最大消息数: %ld\n", attr.mq_maxmsg);
printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);
printf(" 当前消息数: %ld\n", attr.mq_curmsgs);
printf(" 标志: %ld\n", attr.mq_flags);
}
printf("\n");

// 发送消息
printf("发送消息:\n");
const char* messages&#91;] = {
"第一条消息: Hello, World!",
"第二条消息: 欢迎使用 POSIX 消息队列",
"第三条消息: 这是优先级消息",
"第四条消息: 最后一条测试消息"
};
int priorities&#91;] = {0, 0, 10, 0}; // 优先级 (数值越大优先级越高)

for (int i = 0; i < 4; i++) {
if (mq_send(mq, messages&#91;i], strlen(messages&#91;i]) + 1, priorities&#91;i]) == 0) {
printf(" ✓ 发送消息 %d (优先级 %d): %s\n", i + 1, priorities&#91;i], messages&#91;i]);
} else {
perror(" ✗ mq_send 失败");
}
}

// 显示发送后队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("\n发送后队列状态: %ld 条消息\n", attr.mq_curmsgs);
}

// 接收消息
printf("\n接收消息 (按优先级顺序):\n");
for (int i = 0; i < 4; i++) {
bytes_read = mq_receive(mq, recv_buffer, MAX_MSG_SIZE, &priority);
if (bytes_read != -1) {
printf(" ✓ 接收消息 %d (优先级 %d, 长度 %zd): %s\n",
i + 1, priority, bytes_read, recv_buffer);
} else {
if (errno == EAGAIN) {
printf(" ⚠ 队列为空\n");
break;
} else {
perror(" ✗ mq_receive 失败");
break;
}
}
}

// 显示接收后队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("\n接收后队列状态: %ld 条消息\n", attr.mq_curmsgs);
}

// 关闭消息队列
if (mq_close(mq) == 0) {
printf("✓ 消息队列关闭成功\n");
} else {
perror("✗ mq_close 失败");
}

// 删除消息队列
if (mq_unlink(QUEUE_NAME) == 0) {
printf("✓ 消息队列删除成功\n");
} else {
perror("✗ mq_unlink 失败");
}

printf("\n=== 消息队列特点 ===\n");
printf("1. 支持优先级消息 (数值越大优先级越高)\n");
printf("2. 消息大小可配置\n");
printf("3. 消息数量有限制\n");
printf("4. 支持持久化 (直到显式删除)\n");
printf("5. 可通过文件系统路径访问\n");

return 0;
}

示例2:生产者-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>

#define QUEUE_NAME "/producer_consumer_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 20
#define NUM_MESSAGES 10

// 全局变量
mqd_t mq;
int producer_count = 0;
int consumer_count = 0;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;

// 生产者线程函数
void* producer_thread(void* arg) {
int producer_id = *(int*)arg;
char message&#91;MAX_MSG_SIZE];
time_t now;

printf("生产者 %d 启动\n", producer_id);

for (int i = 0; i < NUM_MESSAGES; i++) {
// 构造消息
time(&now);
snprintf(message, sizeof(message),
"P%d-MSG%d-TIME:%s", producer_id, i + 1, ctime(&now));

// 发送消息 (交替使用不同优先级)
unsigned int priority = (i % 3 == 0) ? 5 : 1; // 每第3条高优先级

if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
pthread_mutex_lock(&count_mutex);
producer_count++;
pthread_mutex_unlock(&count_mutex);

printf("生产者 %d 发送消息: %s (优先级 %u)\n",
producer_id, message, priority);
} else {
perror("生产者发送失败");
}

// 随机延迟
usleep((rand() % 100 + 1) * 1000); // 1-100ms
}

printf("生产者 %d 完成\n", producer_id);
return NULL;
}

// 消费者线程函数
void* consumer_thread(void* arg) {
int consumer_id = *(int*)arg;
char message&#91;MAX_MSG_SIZE];
ssize_t bytes_read;
unsigned int priority;

printf("消费者 %d 启动\n", consumer_id);

while (1) {
// 接收消息
bytes_read = mq_receive(mq, message, MAX_MSG_SIZE, &priority);
if (bytes_read != -1) {
pthread_mutex_lock(&count_mutex);
consumer_count++;
int current_count = consumer_count;
pthread_mutex_unlock(&count_mutex);

printf("消费者 %d 接收消息 %d (优先级 %u): %s",
consumer_id, current_count, priority, message);

// 检查是否接收完所有消息
if (current_count >= NUM_MESSAGES * 2) { // 2个生产者
break;
}
} else {
if (errno == EAGAIN) {
// 非阻塞模式下队列为空
usleep(10000); // 10ms
continue;
} else {
perror("消费者接收失败");
break;
}
}

// 随机延迟
usleep((rand() % 50 + 1) * 1000); // 1-50ms
}

printf("消费者 %d 完成\n", consumer_id);
return NULL;
}

int main() {
pthread_t producers&#91;2];
pthread_t consumers&#91;3];
int producer_ids&#91;2] = {1, 2};
int consumer_ids&#91;3] = {1, 2, 3};
struct mq_attr attr;

printf("=== 生产者-消费者消息队列示例 ===\n\n");

// 初始化随机数种子
srand(time(NULL) + getpid());

// 设置消息队列属性
attr.mq_flags = 0; // 阻塞模式
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

// 创建消息队列
printf("创建消息队列: %s\n", QUEUE_NAME);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
printf("✓ 消息队列创建成功\n\n");

// 创建生产者线程
printf("创建生产者线程...\n");
for (int i = 0; i < 2; i++) {
if (pthread_create(&producers&#91;i], NULL, producer_thread, &producer_ids&#91;i]) != 0) {
perror("创建生产者线程失败");
exit(1);
}
}

// 创建消费者线程
printf("创建消费者线程...\n");
for (int i = 0; i < 3; i++) {
if (pthread_create(&consumers&#91;i], NULL, consumer_thread, &consumer_ids&#91;i]) != 0) {
perror("创建消费者线程失败");
exit(1);
}
}

// 等待生产者完成
printf("等待生产者完成...\n");
for (int i = 0; i < 2; i++) {
pthread_join(producers&#91;i], NULL);
}

// 等待消费者完成
printf("等待消费者完成...\n");
for (int i = 0; i < 3; i++) {
pthread_join(consumers&#91;i], NULL);
}

// 显示统计信息
printf("\n=== 统计信息 ===\n");
printf("生产消息数: %d\n", producer_count);
printf("消费消息数: %d\n", consumer_count);

// 显示最终队列状态
if (mq_getattr(mq, &attr) == 0) {
printf("队列中剩余消息: %ld\n", attr.mq_curmsgs);
}

// 清理资源
if (mq_close(mq) == 0) {
printf("✓ 消息队列关闭成功\n");
}
if (mq_unlink(QUEUE_NAME) == 0) {
printf("✓ 消息队列删除成功\n");
}

printf("\n=== 生产者-消费者模型特点 ===\n");
printf("1. 解耦: 生产者和消费者独立运行\n");
printf("2. 异步: 生产和消费可以不同步进行\n");
printf("3. 缓冲: 消息队列提供缓冲作用\n");
printf("4. 负载均衡: 多个消费者可以并行处理\n");
printf("5. 可靠性: 消息持久化存储\n");

return 0;
}

示例3:完整的消息队列管理系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <time.h>

// 配置结构体
struct mq_config {
char *queue_name;
int max_messages;
int max_message_size;
int create_queue;
int delete_queue;
int show_info;
int send_message;
int receive_message;
int list_queues;
int priority;
char *message_content;
int non_blocking;
int verbose;
};

// 全局变量
volatile sig_atomic_t running = 1;

// 信号处理函数
void signal_handler(int sig) {
printf("\n收到信号 %d,准备退出...\n", sig);
running = 0;
}

// 设置信号处理
void setup_signal_handlers() {
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;

sigaction(SIGINT, &sa, NULL); // Ctrl+C
sigaction(SIGTERM, &sa, NULL); // 终止信号
}

// 显示消息队列信息
void show_queue_info(mqd_t mq) {
struct mq_attr attr;

if (mq_getattr(mq, &attr) == 0) {
printf("消息队列属性:\n");
printf(" 最大消息数: %ld\n", attr.mq_maxmsg);
printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);
printf(" 当前消息数: %ld\n", attr.mq_curmsgs);
printf(" 标志: %s\n", (attr.mq_flags & O_NONBLOCK) ? "非阻塞" : "阻塞");
} else {
perror("获取队列属性失败");
}
}

// 列出所有消息队列
void list_all_queues() {
printf("=== 系统消息队列列表 ===\n");
printf("注意: POSIX 消息队列通常在 /dev/mqueue/ 目录下\n");

// 尝试列出 /dev/mqueue/ 目录
if (access("/dev/mqueue", F_OK) == 0) {
printf("系统消息队列目录存在\n");
system("ls -la /dev/mqueue/ 2>/dev/null || echo '无法访问 /dev/mqueue/'");
} else {
printf("系统消息队列目录不存在\n");
}
printf("\n");
}

// 发送消息
int send_message_to_queue(mqd_t mq, const char *message, int priority, int non_blocking) {
struct mq_attr attr;

// 检查消息大小
if (mq_getattr(mq, &attr) == 0) {
if (strlen(message) + 1 > (size_t)attr.mq_msgsize) {
fprintf(stderr, "错误: 消息大小 (%zu) 超过队列限制 (%ld)\n",
strlen(message) + 1, attr.mq_msgsize);
return -1;
}
}

// 发送消息
if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {
printf("✓ 消息发送成功 (优先级 %d): %s\n", priority, message);
return 0;
} else {
if (errno == EAGAIN && non_blocking) {
printf("⚠ 队列已满,非阻塞模式下发送失败\n");
} else {
perror("✗ 消息发送失败");
}
return -1;
}
}

// 接收消息
int receive_message_from_queue(mqd_t mq, int non_blocking) {
char *buffer;
struct mq_attr attr;
ssize_t bytes_read;
unsigned int priority;

// 获取队列属性以确定缓冲区大小
if (mq_getattr(mq, &attr) != 0) {
perror("获取队列属性失败");
return -1;
}

buffer = malloc(attr.mq_msgsize);
if (!buffer) {
perror("内存分配失败");
return -1;
}

// 接收消息
bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
if (bytes_read != -1) {
printf("✓ 消息接收成功 (优先级 %u, 长度 %zd): %s",
priority, bytes_read, buffer);
free(buffer);
return 0;
} else {
if (errno == EAGAIN && non_blocking) {
printf("⚠ 队列为空,非阻塞模式下接收失败\n");
} else {
perror("✗ 消息接收失败");
}
free(buffer);
return -1;
}
}

// 创建消息队列
mqd_t create_message_queue(const char *name, int max_msgs, int max_size, int non_blocking) {
struct mq_attr attr;
int flags = O_CREAT | O_RDWR;

if (non_blocking) {
flags |= O_NONBLOCK;
}

attr.mq_flags = non_blocking ? O_NONBLOCK : 0;
attr.mq_maxmsg = max_msgs;
attr.mq_msgsize = max_size;
attr.mq_curmsgs = 0;

mqd_t mq = mq_open(name, flags, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return (mqd_t)-1;
}

printf("✓ 消息队列创建成功: %s\n", name);
return mq;
}

// 打开现有消息队列
mqd_t open_existing_queue(const char *name, int non_blocking) {
int flags = O_RDWR;

if (non_blocking) {
flags |= O_NONBLOCK;
}

mqd_t mq = mq_open(name, flags);
if (mq == (mqd_t)-1) {
perror("打开消息队列失败");
return (mqd_t)-1;
}

printf("✓ 消息队列打开成功: %s\n", name);
return mq;
}

// 显示帮助信息
void show_help(const char *program_name) {
printf("用法: %s &#91;选项]\n", program_name);
printf("\n选项:\n");
printf(" -n, --name=NAME 消息队列名称 (以 / 开头)\n");
printf(" -c, --create 创建消息队列\n");
printf(" -d, --delete 删除消息队列\n");
printf(" -i, --info 显示队列信息\n");
printf(" -l, --list 列出所有队列\n");
printf(" -s, --send=MESSAGE 发送消息\n");
printf(" -r, --receive 接收消息\n");
printf(" -p, --priority=NUM 消息优先级 (默认 0)\n");
printf(" -m, --max-msgs=NUM 最大消息数 (创建时使用)\n");
printf(" -z, --max-size=NUM 最大消息大小 (创建时使用)\n");
printf(" -b, --non-blocking 非阻塞模式\n");
printf(" -v, --verbose 详细输出\n");
printf(" -h, --help 显示此帮助信息\n");
printf("\n示例:\n");
printf(" %s -n /myqueue -c -m 10 -z 256 # 创建队列\n", program_name);
printf(" %s -n /myqueue -s \"Hello World\" # 发送消息\n", program_name);
printf(" %s -n /myqueue -r # 接收消息\n", program_name);
printf(" %s -n /myqueue -i # 显示队列信息\n", program_name);
printf(" %s -n /myqueue -d # 删除队列\n", program_name);
printf(" %s -l # 列出所有队列\n", program_name);
}

int main(int argc, char *argv&#91;]) {
struct mq_config config = {
.queue_name = NULL,
.max_messages = 10,
.max_message_size = 256,
.create_queue = 0,
.delete_queue = 0,
.show_info = 0,
.send_message = 0,
.receive_message = 0,
.list_queues = 0,
.priority = 0,
.message_content = NULL,
.non_blocking = 0,
.verbose = 0
};

printf("=== POSIX 消息队列管理系统 ===\n\n");

// 解析命令行参数
static struct option long_options&#91;] = {
{"name", required_argument, 0, 'n'},
{"create", no_argument, 0, 'c'},
{"delete", no_argument, 0, 'd'},
{"info", no_argument, 0, 'i'},
{"list", no_argument, 0, 'l'},
{"send", required_argument, 0, 's'},
{"receive", no_argument, 0, 'r'},
{"priority", required_argument, 0, 'p'},
{"max-msgs", required_argument, 0, 'm'},
{"max-size", required_argument, 0, 'z'},
{"non-blocking", no_argument, 0, 'b'},
{"verbose", no_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};

int opt;
while ((opt = getopt_long(argc, argv, "n:cdils:rp:m:z:bvh", long_options, NULL)) != -1) {
switch (opt) {
case 'n':
config.queue_name = optarg;
break;
case 'c':
config.create_queue = 1;
break;
case 'd':
config.delete_queue = 1;
break;
case 'i':
config.show_info = 1;
break;
case 'l':
config.list_queues = 1;
break;
case 's':
config.send_message = 1;
config.message_content = optarg;
break;
case 'r':
config.receive_message = 1;
break;
case 'p':
config.priority = atoi(optarg);
break;
case 'm':
config.max_messages = atoi(optarg);
break;
case 'z':
config.max_message_size = atoi(optarg);
break;
case 'b':
config.non_blocking = 1;
break;
case 'v':
config.verbose = 1;
break;
case 'h':
show_help(argv&#91;0]);
return 0;
default:
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv&#91;0]);
return 1;
}
}

// 设置信号处理
setup_signal_handlers();

// 显示系统信息
if (config.verbose) {
printf("系统信息:\n");
printf(" 当前用户 UID: %d\n", getuid());
printf(" 当前进程 PID: %d\n", getpid());
printf(" 消息队列支持: ");
system("ls /dev/mqueue/ >/dev/null 2>&1 && echo '是' || echo '否'");
printf("\n");
}

// 列出所有队列
if (config.list_queues) {
list_all_queues();
if (!config.queue_name && !config.create_queue && !config.delete_queue &&
!config.show_info && !config.send_message && !config.receive_message) {
return 0;
}
}

// 如果没有指定队列名称且需要操作队列
if (!config.queue_name && (config.create_queue || config.delete_queue ||
config.show_info || config.send_message ||
config.receive_message)) {
fprintf(stderr, "错误: 需要指定消息队列名称\n");
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv&#91;0]);
return 1;
}

// 处理队列操作
mqd_t mq = (mqd_t)-1;

if (config.create_queue) {
mq = create_message_queue(config.queue_name, config.max_messages,
config.max_message_size, config.non_blocking);
if (mq == (mqd_t)-1) {
return 1;
}

if (config.show_info) {
show_queue_info(mq);
}
} else if (config.queue_name) {
// 打开现有队列
mq = open_existing_queue(config.queue_name, config.non_blocking);
if (mq == (mqd_t)-1) {
return 1;
}
}

// 显示队列信息
if (config.show_info && mq != (mqd_t)-1) {
show_queue_info(mq);
}

// 发送消息
if (config.send_message && config.message_content && mq != (mqd_t)-1) {
send_message_to_queue(mq, config.message_content,
config.priority, config.non_blocking);
}

// 接收消息
if (config.receive_message && mq != (mqd_t)-1) {
if (config.non_blocking) {
receive_message_from_queue(mq, config.non_blocking);
} else {
printf("等待接收消息 (按 Ctrl+C 退出)...\n");
while (running) {
if (receive_message_from_queue(mq, config.non_blocking) == -1) {
if (errno != EAGAIN) {
break;
}
}
if (!config.non_blocking) {
sleep(1); // 阻塞模式下定期检查
}
}
}
}

// 删除队列
if (config.delete_queue && config.queue_name) {
if (mq_unlink(config.queue_name) == 0) {
printf("✓ 消息队列删除成功: %s\n", config.queue_name);
} else {
perror("✗ 消息队列删除失败");
}
}

// 关闭队列
if (mq != (mqd_t)-1) {
if (mq_close(mq) == 0) {
if (config.verbose) {
printf("✓ 消息队列关闭成功\n");
}
} else {
perror("✗ 消息队列关闭失败");
}
}

// 显示使用建议
printf("\n=== POSIX 消息队列使用建议 ===\n");
printf("适用场景:\n");
printf("1. 进程间通信 (IPC)\n");
printf("2. 生产者-消费者模式\n");
printf("3. 异步消息处理\n");
printf("4. 系统服务通信\n");
printf("5. 微服务架构\n");
printf("\n");
printf("优势:\n");
printf("1. 可靠性: 消息持久化存储\n");
printf("2. 优先级: 支持消息优先级\n");
printf("3. 可移植: POSIX 标准\n");
printf("4. 灵活性: 支持阻塞和非阻塞模式\n");
printf("5. 安全性: 通过文件系统权限控制\n");
printf("\n");
printf("注意事项:\n");
printf("1. 需要链接实时库: -lrt\n");
printf("2. 队列名称必须以 / 开头\n");
printf("3. 消息大小和数量有限制\n");
printf("4. 需要适当权限才能创建/删除队列\n");
printf("5. 应该及时关闭和清理队列资源\n");

return 0;
}

编译和运行说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 编译示例程序(需要链接实时库)
gcc -o mq_example1 example1.c -lrt
gcc -o mq_example2 example2.c -lrt -lpthread
gcc -o mq_example3 example3.c -lrt -lpthread

# 运行示例
./mq_example1
./mq_example2
./mq_example3 --help

# 基本操作示例
./mq_example3 -n /test_queue -c -m 5 -z 128
./mq_example3 -n /test_queue -s "Hello, Message Queue!"
./mq_example3 -n /test_queue -r
./mq_example3 -n /test_queue -i
./mq_example3 -n /test_queue -d

# 列出所有队列
./mq_example3 -l

系统要求检查

1
2
3
4
5
6
7
8
9
10
11
12
13
# 检查系统支持
ls /dev/mqueue/ 2>/dev/null || echo "消息队列目录不存在"

# 检查内核配置
grep -i mq /boot/config-$(uname -r)

# 检查库支持
ldd --version

# 查看系统限制
ulimit -a | grep -i msg
cat /proc/sys/fs/mqueue/

重要注意事项

编译要求: 需要链接实时库 -lrt

权限要求: 创建/删除队列通常需要适当权限

名称规范: 队列名称必须以 ‘/’ 开头

资源限制: 受系统消息队列限制约束

清理责任: 应该及时关闭和删除队列

线程安全: 消息队列描述符在多线程间共享是安全的

实际应用场景

微服务通信: 服务间异步消息传递

日志系统: 异步日志记录

任务队列: 后台任务处理

事件驱动: 事件通知和处理

数据流: 实时数据处理管道

系统监控: 状态变更通知

最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 安全的消息队列操作函数
mqd_t safe_mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) {
mqd_t mq = mq_open(name, oflag, mode, attr);
if (mq == (mqd_t)-1) {
switch (errno) {
case EACCES:
fprintf(stderr, "权限不足访问队列: %s\n", name);
break;
case EEXIST:
fprintf(stderr, "队列已存在: %s\n", name);
break;
case ENOENT:
fprintf(stderr, "队列不存在: %s\n", name);
break;
case EINVAL:
fprintf(stderr, "无效的队列名称或参数: %s\n", name);
break;
default:
perror("mq_open 失败");
break;
}
}
return mq;
}

// 可靠的消息发送函数
int reliable_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned msg_prio, int timeout_seconds) {
struct timespec timeout;
int result;

if (timeout_seconds > 0) {
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_seconds;
result = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &timeout);
} else {
result = mq_send(mqdes, msg_ptr, msg_len, msg_prio);
}

return result;
}

// 带重试的消息接收函数
ssize_t retry_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned *msg_prio, int max_retries) {
ssize_t result;
int retries = 0;

while (retries < max_retries) {
result = mq_receive(mqdes, msg_ptr, msg_len, msg_prio);
if (result != -1) {
return result; // 成功接收
}

if (errno == EAGAIN) {
retries++;
usleep(100000); // 100ms 延迟后重试
} else {
break; // 其他错误,不再重试
}
}

return result;
}

这些示例展示了 POSIX 消息队列的各种使用方法,从基础的消息发送接收到完整的管理系统,帮助你全面掌握 Linux 系统中的消息队列机制。

data-ad-format="auto" data-full-width-responsive="true">