mq_unlink系统调用及示例

mq_unlink函数详解

  1. 函数介绍

mq_unlink函数是Linux系统中用于删除POSIX消息队列的函数。可以把mq_unlink想象成一个”消息队列删除器”,它能够从系统中移除指定名称的消息队列。

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

POSIX消息队列是进程间通信(IPC)的一种机制,允许不同进程通过队列发送和接收消息。mq_unlink的作用类似于文件系统的unlink函数,它删除消息队列的名称,但不会立即销毁队列本身。只有当所有打开该队列的进程都关闭了队列描述符后,队列才会被真正销毁。

使用场景:

  • 进程间通信系统的清理

  • 服务器程序的资源管理

  • 系统维护和清理脚本

  • 消息队列生命周期管理

  1. 函数原型
1
2
3
4
#include <mqueue.h>

int mq_unlink(const char *name);

  1. 功能

mq_unlink函数的主要功能是删除指定名称的POSIX消息队列。它从系统中移除队列的名称,使得后续无法通过该名称打开队列,但已打开的队列描述符仍然有效。

  1. 参数

name: 消息队列名称

  • 类型:const char*

  • 含义:要删除的消息队列名称

  • 名称必须以’/‘开头,如”/my_queue”

  1. 返回值
  • 成功: 返回0

失败: 返回-1,并设置errno错误码

  • EACCES:权限不足

  • ENOENT:指定名称的消息队列不存在

  • EINVAL:名称无效

  1. 相似函数或关联函数
  • mq_open(): 打开或创建消息队列

  • mq_close(): 关闭消息队列描述符

  • mq_send(): 发送消息

  • mq_receive(): 接收消息

  • mq_getattr(): 获取队列属性

  • mq_setattr(): 设置队列属性

  • unlink(): 删除文件

  1. 示例代码

示例1:基础mq_unlink使用 - 简单队列删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>

// 创建消息队列
mqd_t create_message_queue(const char* name) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 10,
.mq_msgsize = 256,
.mq_curmsgs = 0
};

mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return -1;
}

printf("创建消息队列: %s (描述符: %d)\n", name, (int)mq);
return mq;
}

// 显示消息队列属性
void show_queue_attributes(mqd_t mq, const char* name) {
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("获取队列属性失败");
return;
}

printf("队列 %s 属性:\n", name);
printf(" 最大消息数: %ld\n", attr.mq_maxmsg);
printf(" 最大消息大小: %ld\n", attr.mq_msgsize);
printf(" 当前消息数: %ld\n", attr.mq_curmsgs);
printf(" 标志: %ld\n", attr.mq_flags);
}

int main() {
printf("=== 基础mq_unlink使用示例 ===\n");

const char* queue_name = "/test_queue";

// 创建消息队列
printf("1. 创建消息队列:\n");
mqd_t mq = create_message_queue(queue_name);
if (mq == -1) {
exit(EXIT_FAILURE);
}

show_queue_attributes(mq, queue_name);

// 发送一些测试消息
printf("\n2. 发送测试消息:\n");
const char* messages&#91;] = {
"第一条测试消息",
"第二条测试消息",
"第三条测试消息"
};

for (int i = 0; i < 3; i++) {
if (mq_send(mq, messages&#91;i], strlen(messages&#91;i]), 0) == -1) {
perror("发送消息失败");
} else {
printf("发送消息: %s\n", messages&#91;i]);
}
}

show_queue_attributes(mq, queue_name);

// 使用mq_unlink删除队列名称
printf("\n3. 使用mq_unlink删除队列名称:\n");
if (mq_unlink(queue_name) == 0) {
printf("✓ 成功删除队列名称: %s\n", queue_name);
printf("注意: 队列本身仍然存在,因为还有打开的描述符\n");
} else {
printf("✗ 删除队列名称失败: %s\n", strerror(errno));
}

// 验证队列名称已被删除
printf("\n4. 验证队列名称删除效果:\n");
mqd_t mq2 = mq_open(queue_name, O_RDONLY);
if (mq2 == -1) {
printf("✓ 无法通过名称重新打开队列 (预期行为): %s\n", strerror(errno));
} else {
printf("✗ 仍然可以通过名称打开队列\n");
mq_close(mq2);
}

// 原有描述符仍然可以使用
printf("\n5. 原有描述符仍然有效:\n");
char buffer&#91;256];
ssize_t bytes_received;
unsigned int priority;

while ((bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority)) > 0) {
buffer&#91;bytes_received] = '\0';
printf("接收到消息: %s (优先级: %u)\n", buffer, priority);
}

// 关闭队列描述符(此时队列才会被真正销毁)
printf("\n6. 关闭队列描述符:\n");
if (mq_close(mq) == 0) {
printf("✓ 队列描述符已关闭,队列被真正销毁\n");
} else {
perror("关闭队列描述符失败");
}

printf("\n=== 基础mq_unlink演示完成 ===\n");

return 0;
}

示例2:多个进程共享队列的删除管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>
#include <time.h>

#define MAX_MESSAGES 5
#define MESSAGE_SIZE 256

// 生产者进程
void producer_process(const char* queue_name, int producer_id) {
printf("生产者 %d 启动\n", producer_id);

// 打开已存在的队列
mqd_t mq = mq_open(queue_name, O_WRONLY);
if (mq == (mqd_t)-1) {
perror("生产者打开队列失败");
exit(EXIT_FAILURE);
}

srand(time(NULL) + producer_id);

// 发送消息
for (int i = 0; i < MAX_MESSAGES; i++) {
char message&#91;MESSAGE_SIZE];
snprintf(message, sizeof(message), "生产者%d的消息%d", producer_id, i + 1);

// 随机优先级
unsigned int priority = rand() % 10;

if (mq_send(mq, message, strlen(message), priority) == -1) {
perror("发送消息失败");
} else {
printf("生产者 %d 发送: %s (优先级: %u)\n", producer_id, message, priority);
}

sleep(1); // 模拟处理时间
}

printf("生产者 %d 完成\n", producer_id);
mq_close(mq);
}

// 消费者进程
void consumer_process(const char* queue_name, int consumer_id) {
printf("消费者 %d 启动\n", consumer_id);

// 打开已存在的队列
mqd_t mq = mq_open(queue_name, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("消费者打开队列失败");
exit(EXIT_FAILURE);
}

// 接收消息
char buffer&#91;MESSAGE_SIZE];
ssize_t bytes_received;
unsigned int priority;
int message_count = 0;

while (message_count < MAX_MESSAGES * 2) { // 期望接收所有生产者的消息
bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority);
if (bytes_received > 0) {
buffer&#91;bytes_received] = '\0';
printf("消费者 %d 接收: %s (优先级: %u)\n", consumer_id, buffer, priority);
message_count++;
} else if (errno == EAGAIN) {
// 非阻塞模式下没有消息
printf("消费者 %d: 暂无消息\n", consumer_id);
sleep(1);
} else {
perror("接收消息失败");
break;
}
}

printf("消费者 %d 完成,接收 %d 条消息\n", consumer_id, message_count);
mq_close(mq);
}

// 管理进程
void manager_process(const char* queue_name) {
printf("管理进程启动\n");

// 创建消息队列
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 20,
.mq_msgsize = MESSAGE_SIZE,
.mq_curmsgs = 0
};

mqd_t mq = mq_open(queue_name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("管理进程创建队列失败");
exit(EXIT_FAILURE);
}

printf("管理进程创建队列: %s\n", queue_name);

// 启动生产者和消费者进程
pid_t producers&#91;2], consumers&#91;2];

// 启动生产者
for (int i = 0; i < 2; i++) {
producers&#91;i] = fork();
if (producers&#91;i] == 0) {
producer_process(queue_name, i + 1);
exit(EXIT_SUCCESS);
}
}

// 启动消费者
for (int i = 0; i < 2; i++) {
consumers&#91;i] = fork();
if (consumers&#91;i] == 0) {
consumer_process(queue_name, i + 1);
exit(EXIT_SUCCESS);
}
}

// 等待生产者完成
printf("管理进程等待生产者完成...\n");
for (int i = 0; i < 2; i++) {
waitpid(producers&#91;i], NULL, 0);
}

printf("所有生产者已完成\n");

// 模拟一段时间让消费者处理完消息
sleep(3);

// 删除队列名称(但队列仍存在,因为消费者还在使用)
printf("管理进程删除队列名称...\n");
if (mq_unlink(queue_name) == 0) {
printf("✓ 队列名称已删除,但队列仍存在(消费者仍在使用)\n");
} else {
printf("✗ 删除队列名称失败: %s\n", strerror(errno));
}

// 等待消费者完成
printf("管理进程等待消费者完成...\n");
for (int i = 0; i < 2; i++) {
waitpid(consumers&#91;i], NULL, 0);
}

printf("所有消费者已完成\n");

// 现在队列才会被真正销毁(所有描述符都已关闭)
printf("队列已被真正销毁\n");
mq_close(mq);

printf("管理进程完成\n");
}

int main() {
printf("=== 多进程共享队列删除管理示例 ===\n");

const char* queue_name = "/shared_queue";

// 启动管理进程
pid_t manager = fork();
if (manager == 0) {
manager_process(queue_name);
exit(EXIT_SUCCESS);
}

// 父进程等待管理进程完成
waitpid(manager, NULL, 0);

// 验证队列是否已被删除
printf("\n验证队列删除效果:\n");
mqd_t mq = mq_open(queue_name, O_RDONLY);
if (mq == -1) {
printf("✓ 队列已成功删除: %s\n", strerror(errno));
} else {
printf("✗ 队列仍然存在\n");
mq_close(mq);
}

printf("\n=== 多进程队列管理演示完成 ===\n");

return 0;
}
data-ad-format="auto" data-full-width-responsive="true">