1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h>
// 消息类型定义 #define MSG_NORMAL 0 #define MSG_HIGH_PRIORITY 1 #define MSG_CONTROL 2 #define MSG_DATA 3
// 模拟的 STREAMS 消息队列 struct message_queue { int count; struct { int priority; int type; time_t timestamp; int control_len; char control_data[128]; int data_len; char data[512]; } messages[10]; };
// 模拟的 strbuf 结构 struct my_strbuf { int maxlen; int len; char *buf; };
// 全局消息队列 struct message_queue global_queue = {0};
// 向队列添加消息 int add_message(int priority, int type, const char *control, const char *data) { if (global_queue.count >= 10) { printf("消息队列已满\n"); return -1; } int index = global_queue.count++; global_queue.messages[index].priority = priority; global_queue.messages[index].type = type; global_queue.messages[index].timestamp = time(NULL); if (control) { global_queue.messages[index].control_len = strlen(control) + 1; strncpy(global_queue.messages[index].control_data, control, 127); global_queue.messages[index].control_data[127] = '\0'; } else { global_queue.messages[index].control_len = 0; global_queue.messages[index].control_data[0] = '\0'; } if (data) { global_queue.messages[index].data_len = strlen(data) + 1; strncpy(global_queue.messages[index].data, data, 511); global_queue.messages[index].data[511] = '\0'; } else { global_queue.messages[index].data_len = 0; global_queue.messages[index].data[0] = '\0'; } printf("添加消息: 优先级=%d, 类型=%d, 控制=%s, 数据=%s\n", priority, type, control ? control : "无", data ? data : "无"); return 0; }
// 模拟的 getmsg 实现 int my_getmsg(struct my_strbuf *ctlptr, struct my_strbuf *dataptr, int *flagsp) { if (global_queue.count == 0) { printf("消息队列为空\n"); return -1; } // 查找高优先级消息(如果请求) int msg_index = 0; if (flagsp && (*flagsp & 1)) { // 模拟 RS_HIPRI for (int i = 0; i < global_queue.count; i++) { if (global_queue.messages[i].priority > 0) { msg_index = i; break; } } } // 获取消息 struct message_queue *msg = &global_queue.messages[msg_index]; // 复制控制数据 if (ctlptr && ctlptr->buf) { int copy_len = (msg->control_len < ctlptr->maxlen) ? msg->control_len : ctlptr->maxlen; memcpy(ctlptr->buf, msg->control_data, copy_len); ctlptr->len = copy_len; } // 复制数据 if (dataptr && dataptr->buf) { int copy_len = (msg->data_len < dataptr->maxlen) ? msg->data_len : dataptr->maxlen; memcpy(dataptr->buf, msg->data, copy_len); dataptr->len = copy_len; } // 设置返回标志 if (flagsp) { *flagsp = (msg->priority > 0) ? 1 : 0; // 高优先级标志 } // 从队列中移除消息 for (int i = msg_index; i < global_queue.count - 1; i++) { global_queue.messages[i] = global_queue.messages[i + 1]; } global_queue.count--; return 0; }
// 显示队列状态 void show_queue_status() { printf("\n=== 消息队列状态 ===\n"); printf("队列中消息数量: %d\n", global_queue.count); for (int i = 0; i < global_queue.count; i++) { printf("消息 %d: 优先级=%d, 类型=%d, 时间=%s", i, global_queue.messages[i].priority, global_queue.messages[i].type, ctime(&global_queue.messages[i].timestamp)); printf(" 控制: %s\n", global_queue.messages[i].control_data); printf(" 数据: %s\n", global_queue.messages[i].data); } }
int main() { struct my_strbuf ctlbuf, databuf; char ctl_buffer[256], data_buffer[1024]; int flags; int result; printf("=== 完整的 STREAMS 消息系统模拟 ===\n\n"); // 初始化缓冲区 ctlbuf.maxlen = sizeof(ctl_buffer); ctlbuf.buf = ctl_buffer; ctlbuf.len = 0; databuf.maxlen = sizeof(data_buffer); databuf.buf = data_buffer; databuf.len = 0; // 添加测试消息 printf("添加测试消息...\n"); add_message(0, MSG_NORMAL, "NORMAL_CTL", "普通消息数据"); add_message(1, MSG_HIGH_PRIORITY, "HIGH_CTL", "高优先级消息"); add_message(0, MSG_DATA, "DATA_CTL", "另一个普通消息"); add_message(1, MSG_CONTROL, "CTRL_CTL", "高优先级控制消息"); show_queue_status(); // 测试接收普通消息 printf("\n--- 测试1: 接收普通消息 ---\n"); flags = 0; // 普通消息 result = my_getmsg(&ctlbuf, &databuf, &flags); if (result == 0) { printf("成功接收消息:\n"); printf(" 控制数据: %.*s\n", ctlbuf.len, ctlbuf.buf); printf(" 数据: %.*s\n", databuf.len, databuf.buf); printf(" 优先级: %s\n", (flags > 0) ? "高" : "普通"); } // 测试接收高优先级消息 printf("\n--- 测试2: 接收高优先级消息 ---\n"); flags = 1; // 高优先级消息 result = my_getmsg(&ctlbuf, &databuf, &flags); if (result == 0) { printf("成功接收高优先级消息:\n"); printf(" 控制数据: %.*s\n", ctlbuf.len, ctlbuf.buf); printf(" 数据: %.*s\n", databuf.len, databuf.buf); printf(" 优先级: %s\n", (flags > 0) ? "高" : "普通"); } show_queue_status(); printf("\n=== STREAMS 概念说明 ===\n"); printf("STREAMS 是 System V 中的消息传递机制\n"); printf("特点:\n"); printf("1. 消息包含控制部分和数据部分\n"); printf("2. 支持消息优先级\n"); printf("3. 模块化处理架构\n"); printf("4. 主要在 Solaris 等系统中使用\n"); printf("\n在 Linux 中,类似功能可通过以下方式实现:\n"); printf("- Unix 域套接字\n"); printf("- 管道和 FIFO\n"); printf("- netlink 套接字\n"); printf("- D-Bus 消息系统\n"); return 0; }
|