getpmsg 函数详解 見出しへのリンク
1. 函数介绍 見出しへのリンク
getpmsg
是 System V STREAMS 接口中的一个函数,用于从 STREAMS 设备或管道中接收带优先级的消息。可以把 STREAMS 想象成一个"智能分拣系统"——消息根据优先级被分类处理,getpmsg
就是从这个系统中按指定优先级取下消息的工具。
与 getmsg
不同,getpmsg
提供了更精细的优先级控制,允许你指定要接收的消息类型(普通优先级、高优先级等),就像邮件分拣系统可以按紧急程度分拣邮件一样。
2. 函数原型 見出しへのリンク
#include <stropts.h>
int getpmsg(int fildes, struct strbuf *ctlptr, struct strbuf *dataptr,
int *bandp, int *flagsp);
3. 功能 見出しへのリンク
getpmsg
函数用于从 STREAMS 文件描述符中接收带优先级的消息。它可以接收指定优先级(band)的消息,并且可以控制接收行为。
4. 参数 見出しへのリンク
- fildes: STREAMS 设备或管道的文件描述符
- ctlptr: 指向
strbuf
结构体的指针,用于接收控制信息 - dataptr: 指向
strbuf
结构体的指针,用于接收数据信息 - bandp: 指向优先级(band)的指针
- flagsp: 指向标志的指针,用于指定接收模式和返回消息类型
5. strbuf 结构体 見出しへのリンク
struct strbuf {
int maxlen; /* 缓冲区最大长度 */
int len; /* 实际数据长度 */
char *buf; /* 指向缓冲区的指针 */
};
6. band 和 flags 参数说明 見出しへのリンク
band 参数(优先级) 見出しへのリンク
- 0-255: 消息优先级级别,数值越高优先级越高
- 用于区分不同类型的消息
flags 参数(输入标志) 見出しへのリンク
- MSG_HIPRI: 接收高优先级消息
- MSG_ANY: 接收任何优先级的消息
- MSG_BAND: 接收指定优先级的消息
flags 参数(输出标志) 見出しへのリンク
- MSG_HIPRI: 接收到高优先级消息
- MSG_BAND: 接收到指定优先级消息
- MSG_MORECTL: 控制部分还有更多数据
- MSG_MOREDATA: 数据部分还有更多数据
7. 返回值 見出しへのリンク
- 成功: 返回 0
- 失败: 返回 -1,并设置相应的 errno 错误码
常见错误码:
EBADF
: fildes 不是有效的文件描述符EINVAL
: 参数无效EIO
: I/O 错误ENOSTR
: fildes 不是 STREAMS 设备ENOSR
: 没有足够的 STREAMS 资源EAGAIN
: 非阻塞模式下无数据可读
8. 相似函数或关联函数 見出しへのリンク
- putpmsg: 发送带优先级的消息
- getmsg: 获取普通消息(不区分优先级)
- putmsg: 发送普通消息
- ioctl: 控制 STREAMS 设备
- poll/select: 检查 STREAMS 文件描述符状态
9. 示例代码 見出しへのリンク
示例1:基础用法 - 简单的优先级消息接收 見出しへのリンク
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stropts.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
// 注意:这个示例在大多数 Linux 系统上可能无法运行
// 因为 Linux 不完全支持 STREAMS
int main() {
int fd;
struct strbuf ctlbuf, databuf;
char ctl_data[256], data_buf[1024];
int band, flags;
printf("=== getpmsg 基础示例 ===\n\n");
// 初始化缓冲区结构
ctlbuf.maxlen = sizeof(ctl_data);
ctlbuf.buf = ctl_data;
ctlbuf.len = 0;
databuf.maxlen = sizeof(data_buf);
databuf.buf = data_buf;
databuf.len = 0;
band = 0; // 优先级
flags = 0; // 接收标志
printf("注意: getpmsg 主要用于 STREAMS 系统\n");
printf("在大多数 Linux 系统上可能不可用\n\n");
printf("参数设置:\n");
printf(" 控制缓冲区最大长度: %d\n", ctlbuf.maxlen);
printf(" 数据缓冲区最大长度: %d\n", databuf.maxlen);
printf(" 优先级 (band): %d\n", band);
printf(" 标志: %d\n", flags);
printf("\n如果在支持 STREAMS 的系统上,可以这样调用:\n");
printf("result = getpmsg(fd, &ctlbuf, &databuf, &band, &flags);\n");
return 0;
}
示例2:模拟 STREAMS 优先级消息处理 見出しへのリンク
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
// 模拟的 STREAMS 消息结构
struct simulated_pmsg {
int band; // 消息优先级 (0-255)
int control_len; // 控制数据长度
char control_data[256]; // 控制数据
int data_len; // 数据长度
char data[1024]; // 实际数据
time_t timestamp; // 时间戳
};
// 模拟的消息队列
struct priority_message_queue {
int count;
struct simulated_pmsg messages[20];
};
// 全局消息队列
struct priority_message_queue msg_queue = {0};
// 模拟的 strbuf 结构
struct simulated_strbuf {
int maxlen;
int len;
char *buf;
};
// 标志定义(模拟)
#define MSG_HIPRI 0x01
#define MSG_ANY 0x02
#define MSG_BAND 0x04
#define MSG_MORECTL 0x08
#define MSG_MOREDATA 0x10
// 向队列添加优先级消息
int add_priority_message(int band, const char *control, const char *data) {
if (msg_queue.count >= 20) {
printf("消息队列已满\n");
return -1;
}
int index = msg_queue.count++;
msg_queue.messages[index].band = band;
msg_queue.messages[index].timestamp = time(NULL);
if (control) {
msg_queue.messages[index].control_len = strlen(control) + 1;
strncpy(msg_queue.messages[index].control_data, control, 255);
msg_queue.messages[index].control_data[255] = '\0';
} else {
msg_queue.messages[index].control_len = 0;
msg_queue.messages[index].control_data[0] = '\0';
}
if (data) {
msg_queue.messages[index].data_len = strlen(data) + 1;
strncpy(msg_queue.messages[index].data, data, 1023);
msg_queue.messages[index].data[1023] = '\0';
} else {
msg_queue.messages[index].data_len = 0;
msg_queue.messages[index].data[0] = '\0';
}
printf("添加优先级消息: band=%d, 控制='%s', 数据='%s'\n",
band, control ? control : "无", data ? data : "无");
return 0;
}
// 模拟的 getpmsg 实现
int simulated_getpmsg(struct simulated_strbuf *ctlptr,
struct simulated_strbuf *dataptr,
int *bandp, int *flagsp) {
if (msg_queue.count == 0) {
printf("消息队列为空\n");
return -1;
}
int msg_index = -1;
int target_band = *bandp;
int flags = *flagsp;
printf("查找消息: band=%d, flags=0x%x\n", target_band, flags);
// 根据标志查找消息
if (flags & MSG_HIPRI) {
// 查找最高优先级消息
int max_band = -1;
for (int i = 0; i < msg_queue.count; i++) {
if (msg_queue.messages[i].band > max_band) {
max_band = msg_queue.messages[i].band;
msg_index = i;
}
}
printf("查找最高优先级消息: band=%d\n", max_band);
}
else if (flags & MSG_BAND) {
// 查找指定优先级消息
for (int i = 0; i < msg_queue.count; i++) {
if (msg_queue.messages[i].band == target_band) {
msg_index = i;
break;
}
}
printf("查找指定优先级消息: band=%d\n", target_band);
}
else if (flags & MSG_ANY) {
// 查找任何消息(通常按顺序)
msg_index = 0;
printf("查找任意消息\n");
}
else {
// 默认行为:查找最高优先级
int max_band = -1;
for (int i = 0; i < msg_queue.count; i++) {
if (msg_queue.messages[i].band > max_band) {
max_band = msg_queue.messages[i].band;
msg_index = i;
}
}
printf("默认查找最高优先级消息: band=%d\n", max_band);
}
if (msg_index == -1) {
printf("未找到符合条件的消息\n");
return -1;
}
// 获取消息
struct simulated_pmsg *msg = &msg_queue.messages[msg_index];
// 复制控制数据
if (ctlptr && ctlptr->buf) {
int copy_len = (msg->control_len < ctlptr->maxlen) ?
msg->control_len : ctlptr->maxlen;
memcpy(ctlptr->buf, msg->control_data, copy_len);
ctlptr->len = copy_len;
printf("复制控制数据: %d 字节\n", copy_len);
}
// 复制数据
if (dataptr && dataptr->buf) {
int copy_len = (msg->data_len < dataptr->maxlen) ?
msg->data_len : dataptr->maxlen;
memcpy(dataptr->buf, msg->data, copy_len);
dataptr->len = copy_len;
printf("复制数据: %d 字节\n", copy_len);
}
// 更新返回参数
*bandp = msg->band;
*flagsp = (msg->band > 100) ? MSG_HIPRI : 0; // 模拟高优先级
// 从队列中移除消息
for (int i = msg_index; i < msg_queue.count - 1; i++) {
msg_queue.messages[i] = msg_queue.messages[i + 1];
}
msg_queue.count--;
printf("成功接收消息: band=%d\n", *bandp);
return 0;
}
// 显示队列状态
void show_priority_queue() {
printf("\n=== 优先级消息队列状态 ===\n");
printf("消息数量: %d\n", msg_queue.count);
for (int i = 0; i < msg_queue.count; i++) {
printf("消息 %d: band=%d, 时间=%s",
i, msg_queue.messages[i].band,
ctime(&msg_queue.messages[i].timestamp));
printf(" 控制: %s\n", msg_queue.messages[i].control_data);
printf(" 数据: %s\n", msg_queue.messages[i].data);
}
}
int main() {
struct simulated_strbuf ctlbuf, databuf;
char ctl_buffer[256], data_buffer[1024];
int band, flags;
int result;
printf("=== STREAMS 优先级消息处理模拟 ===\n\n");
// 初始化缓冲区
ctlbuf.maxlen = sizeof(ctl_buffer);
ctlbuf.buf = ctl_buffer;
ctlbuf.len = 0;
databuf.maxlen = sizeof(data_buffer);
databuf.buf = data_buffer;
databuf.len = 0;
// 添加测试消息(不同优先级)
printf("添加测试消息...\n");
add_priority_message(50, "NORMAL_CTL", "普通优先级消息");
add_priority_message(150, "HIGH_CTL", "高优先级消息");
add_priority_message(25, "LOW_CTL", "低优先级消息");
add_priority_message(200, "CRITICAL_CTL", "关键优先级消息");
add_priority_message(75, "MEDIUM_CTL", "中等优先级消息");
show_priority_queue();
// 测试1: 接收最高优先级消息
printf("\n--- 测试1: 接收最高优先级消息 ---\n");
band = 0;
flags = MSG_HIPRI;
result = simulated_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("成功接收最高优先级消息:\n");
printf(" 优先级: %d\n", band);
printf(" 控制数据: %.*s\n", ctlbuf.len, ctlbuf.buf);
printf(" 数据: %.*s\n", databuf.len, databuf.buf);
printf(" 消息标志: 0x%x\n", flags);
}
// 测试2: 接收指定优先级消息
printf("\n--- 测试2: 接收指定优先级消息 (band=75) ---\n");
band = 75;
flags = MSG_BAND;
result = simulated_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("成功接收指定优先级消息:\n");
printf(" 优先级: %d\n", band);
printf(" 控制数据: %.*s\n", ctlbuf.len, ctlbuf.buf);
printf(" 数据: %.*s\n", databuf.len, databuf.buf);
printf(" 消息标志: 0x%x\n", flags);
}
// 测试3: 接收任意消息
printf("\n--- 测试3: 接收任意消息 ---\n");
band = 0;
flags = MSG_ANY;
result = simulated_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("成功接收任意消息:\n");
printf(" 优先级: %d\n", band);
printf(" 控制数据: %.*s\n", ctlbuf.len, ctlbuf.buf);
printf(" 数据: %.*s\n", databuf.len, databuf.buf);
printf(" 消息标志: 0x%x\n", flags);
}
show_priority_queue();
printf("\n=== 优先级消息概念说明 ===\n");
printf("STREAMS 优先级消息系统:\n");
printf("1. 消息按优先级 (band) 分类\n");
printf("2. 优先级范围: 0-255 (数值越高优先级越高)\n");
printf("3. 可以按优先级选择性接收消息\n");
printf("4. 支持高优先级消息抢占\n");
return 0;
}
示例3:完整的优先级消息管理系统 見出しへのリンク
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
// 消息类型定义
#define BAND_LOW 25 // 低优先级
#define BAND_NORMAL 50 // 普通优先级
#define BAND_MEDIUM 100 // 中等优先级
#define BAND_HIGH 150 // 高优先级
#define BAND_CRITICAL 200 // 关键优先级
// 消息队列管理器
struct message_manager {
int total_messages;
int processed_messages;
int dropped_messages;
struct {
int band;
int msg_id;
char type[32];
char content[256];
time_t created_time;
time_t processed_time;
} queue[50];
};
// 全局消息管理器
struct message_manager msg_mgr = {0};
// 标志定义
#define MSG_HIPRI 0x01
#define MSG_ANY 0x02
#define MSG_BAND 0x04
// 模拟的 strbuf 结构
struct stream_buffer {
int maxlen;
int len;
char *buf;
};
// 添加消息到队列
int queue_message(int band, const char *type, const char *content) {
if (msg_mgr.total_messages >= 50) {
msg_mgr.dropped_messages++;
printf("警告: 消息队列已满,丢弃消息\n");
return -1;
}
int index = msg_mgr.total_messages++;
msg_mgr.queue[index].band = band;
msg_mgr.queue[index].msg_id = msg_mgr.total_messages;
strncpy(msg_mgr.queue[index].type, type, 31);
msg_mgr.queue[index].type[31] = '\0';
strncpy(msg_mgr.queue[index].content, content, 255);
msg_mgr.queue[index].content[255] = '\0';
msg_mgr.queue[index].created_time = time(NULL);
msg_mgr.queue[index].processed_time = 0;
printf("入队消息 #%d: band=%d, 类型=%s\n",
msg_mgr.queue[index].msg_id, band, type);
return 0;
}
// 模拟的 getpmsg 实现
int advanced_getpmsg(struct stream_buffer *ctlptr,
struct stream_buffer *dataptr,
int *bandp, int *flagsp) {
if (msg_mgr.total_messages == 0) {
return -1; // 队列为空
}
int msg_index = -1;
int target_band = *bandp;
int flags = *flagsp;
// 根据标志选择消息
if (flags & MSG_HIPRI) {
// 查找最高优先级消息
int max_band = -1;
for (int i = 0; i < msg_mgr.total_messages; i++) {
if (msg_mgr.queue[i].band > max_band) {
max_band = msg_mgr.queue[i].band;
msg_index = i;
}
}
}
else if (flags & MSG_BAND) {
// 查找指定优先级消息
for (int i = 0; i < msg_mgr.total_messages; i++) {
if (msg_mgr.queue[i].band == target_band) {
msg_index = i;
break;
}
}
}
else {
// 默认:按顺序处理
msg_index = 0;
}
if (msg_index == -1) {
return -1; // 未找到消息
}
// 处理消息
struct {
int band;
int msg_id;
char type[32];
char content[256];
time_t created_time;
time_t processed_time;
} *msg = &msg_mgr.queue[msg_index];
msg->processed_time = time(NULL);
msg_mgr.processed_messages++;
// 准备返回数据
if (ctlptr && ctlptr->buf) {
char ctl_info[256];
snprintf(ctl_info, sizeof(ctl_info),
"MSG_ID=%d,BAND=%d,TYPE=%s",
msg->msg_id, msg->band, msg->type);
int copy_len = (strlen(ctl_info) + 1 < ctlptr->maxlen) ?
strlen(ctl_info) + 1 : ctlptr->maxlen;
memcpy(ctlptr->buf, ctl_info, copy_len - 1);
ctlptr->buf[copy_len - 1] = '\0';
ctlptr->len = copy_len - 1;
}
if (dataptr && dataptr->buf) {
int copy_len = (strlen(msg->content) + 1 < dataptr->maxlen) ?
strlen(msg->content) + 1 : dataptr->maxlen;
memcpy(dataptr->buf, msg->content, copy_len - 1);
dataptr->buf[copy_len - 1] = '\0';
dataptr->len = copy_len - 1;
}
// 更新返回参数
*bandp = msg->band;
*flagsp = (msg->band >= BAND_HIGH) ? MSG_HIPRI : 0;
// 从队列中移除消息
for (int i = msg_index; i < msg_mgr.total_messages - 1; i++) {
msg_mgr.queue[i] = msg_mgr.queue[i + 1];
}
msg_mgr.total_messages--;
return 0;
}
// 显示系统统计
void show_system_stats() {
printf("\n=== 消息系统统计 ===\n");
printf("总消息数: %d\n", msg_mgr.total_messages);
printf("已处理消息: %d\n", msg_mgr.processed_messages);
printf("丢弃消息: %d\n", msg_mgr.dropped_messages);
printf("队列中消息: %d\n", msg_mgr.total_messages);
}
// 显示队列内容
void show_queue_contents() {
printf("\n=== 队列内容 ===\n");
if (msg_mgr.total_messages == 0) {
printf("队列为空\n");
return;
}
printf("优先级 ID 类型 内容\n");
printf("-------- ---- -------------- ------------------------\n");
for (int i = 0; i < msg_mgr.total_messages; i++) {
printf("%-8d %-4d %-14s %s\n",
msg_mgr.queue[i].band,
msg_mgr.queue[i].msg_id,
msg_mgr.queue[i].type,
msg_mgr.queue[i].content);
}
}
// 按优先级分类统计
void show_priority_statistics() {
int band_counts[5] = {0}; // 低、普通、中等、高、关键
for (int i = 0; i < msg_mgr.total_messages; i++) {
int band = msg_mgr.queue[i].band;
if (band <= BAND_LOW) band_counts[0]++;
else if (band <= BAND_NORMAL) band_counts[1]++;
else if (band <= BAND_MEDIUM) band_counts[2]++;
else if (band <= BAND_HIGH) band_counts[3]++;
else band_counts[4]++;
}
printf("\n=== 优先级统计 ===\n");
printf("低优先级 (0-25): %d 条消息\n", band_counts[0]);
printf("普通优先级 (26-50): %d 条消息\n", band_counts[1]);
printf("中等优先级 (51-100): %d 条消息\n", band_counts[2]);
printf("高优先级 (101-150): %d 条消息\n", band_counts[3]);
printf("关键优先级 (151-255): %d 条消息\n", band_counts[4]);
}
int main() {
struct stream_buffer ctlbuf, databuf;
char ctl_buffer[256], data_buffer[1024];
int band, flags;
int result;
printf("=== 高级优先级消息管理系统 ===\n\n");
// 初始化缓冲区
ctlbuf.maxlen = sizeof(ctl_buffer);
ctlbuf.buf = ctl_buffer;
ctlbuf.len = 0;
databuf.maxlen = sizeof(data_buffer);
databuf.buf = data_buffer;
databuf.len = 0;
// 添加各种优先级的消息
printf("初始化消息队列...\n");
queue_message(BAND_CRITICAL, "ALERT", "系统紧急告警:磁盘空间不足");
queue_message(BAND_HIGH, "ERROR", "应用程序错误:数据库连接失败");
queue_message(BAND_MEDIUM, "WARNING", "系统警告:CPU使用率过高");
queue_message(BAND_NORMAL, "INFO", "用户登录成功");
queue_message(BAND_LOW, "DEBUG", "调试信息:函数调用跟踪");
queue_message(BAND_CRITICAL, "ALERT", "安全告警:多次登录失败");
queue_message(BAND_HIGH, "ERROR", "网络错误:连接超时");
queue_message(BAND_MEDIUM, "NOTICE", "系统通知:配置文件已更新");
show_system_stats();
show_queue_contents();
show_priority_statistics();
// 处理消息的示例
printf("\n=== 消息处理演示 ===\n");
// 1. 处理最高优先级消息
printf("\n--- 处理最高优先级消息 ---\n");
band = 0;
flags = MSG_HIPRI;
result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("处理成功:\n");
printf(" 优先级: %d\n", band);
printf(" 控制信息: %s\n", ctlbuf.buf);
printf(" 消息内容: %s\n", databuf.buf);
printf(" 消息标志: %s\n", (flags & MSG_HIPRI) ? "高优先级" : "普通优先级");
}
// 2. 处理指定优先级消息
printf("\n--- 处理中等优先级消息 ---\n");
band = BAND_MEDIUM;
flags = MSG_BAND;
result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("处理成功:\n");
printf(" 优先级: %d\n", band);
printf(" 控制信息: %s\n", ctlbuf.buf);
printf(" 消息内容: %s\n", databuf.buf);
}
// 3. 处理任意消息
printf("\n--- 处理任意消息 ---\n");
band = 0;
flags = MSG_ANY;
result = advanced_getpmsg(&ctlbuf, &databuf, &band, &flags);
if (result == 0) {
printf("处理成功:\n");
printf(" 优先级: %d\n", band);
printf(" 控制信息: %s\n", ctlbuf.buf);
printf(" 消息内容: %s\n", databuf.buf);
}
// 显示处理后的状态
show_system_stats();
show_queue_contents();
printf("\n=== STREAMS 优先级消息系统特点 ===\n");
printf("1. 支持 0-255 级优先级\n");
printf("2. 可以按优先级选择性接收消息\n");
printf("3. 高优先级消息可以抢占处理\n");
printf("4. 适用于实时系统和关键任务应用\n");
printf("\nLinux 替代方案:\n");
printf("- 实时信号 (RT signals)\n");
printf("- D-Bus 消息系统\n");
printf("- systemd journal\n");
printf("- 自定义优先级队列\n");
return 0;
}
编译和运行说明 見出しへのリンク
# 编译示例程序
gcc -o getpmsg_example1 example1.c
gcc -o getpmsg_example2 example2.c
gcc -o getpmsg_example3 example3.c
# 运行示例
./getpmsg_example1
./getpmsg_example2
./getpmsg_example3
STREAMS 系统检查 見出しへのリンク
# 检查系统是否支持 STREAMS
ls /usr/include/stropts.h
# 在 Solaris 等系统上编译
gcc -D_SOLARIS -o getpmsg_real example_real.c -lstrmi
# 查看 STREAMS 相关信息
modinfo | grep stream
重要注意事项 見出しへのリンク
- 系统支持:
getpmsg
主要在 System V Unix 系统中可用 - Linux 限制: 大多数 Linux 系统不完全支持 STREAMS
- 移植性: 代码可移植性较差
- 优先级范围: band 值范围为 0-255
- 错误处理: 始终检查返回值和 errno
现代 Linux 替代方案 見出しへのリンク
使用实时信号 見出しへのリンク
#include <signal.h>
#include <sys/types.h>
// 发送带优先级的实时信号
int send_priority_signal(pid_t pid, int sig, int priority) {
union sigval value;
value.sival_int = priority;
return sigqueue(pid, sig, value);
}
使用自定义优先级队列 見出しへのリンク
#include <pthread.h>
#include <sys/queue.h>
// 自定义优先级消息队列
struct priority_msg {
int priority;
void *data;
SLIST_ENTRY(priority_msg) entries;
};
SLIST_HEAD(msg_head, priority_msg) msg_queue = SLIST_HEAD_INITIALIZER(msg_queue);
实际应用场景 見出しへのリンク
- 实时系统: 需要按优先级处理消息的实时应用
- 网络协议: 实现复杂的网络协议栈
- 设备驱动: 设备驱动中的消息处理
- 系统管理: 系统管理工具的优先级消息
- 多媒体应用: 音视频处理中的实时消息
优先级消息处理流程 見出しへのリンク
消息生产者
↓ (putpmsg)
STREAMS 系统
↓ (按优先级排队)
消息消费者
↓ (getpmsg)
应用层处理
虽然 getpmsg
在现代 Linux 系统中使用较少,但它体现了优先级消息处理的重要概念。在实际开发中,可以根据需求选择合适的现代替代方案来实现类似的功能。