pselect系统调用及示例

Linux 高级 I/O 多路复用系统调用详解

相关文章:select系统调用及示例 Linux I/O 多路复用机制对比分析poll/ppoll/epoll/select pselect系统调用及示例

  1. pselect 函数详解

1. 函数介绍

pselect 是 Linux 系统中用于同时监视多个文件描述符的系统调用。可以把 pselect 想象成”智能的交通指挥官”——它能够同时观察多个”道路”(文件描述符),当某条道路有”车辆”(数据)到达时,立即通知你进行处理。

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

与传统的 select 相比,pselect 提供了更好的信号处理机制,避免了信号处理函数中的竞态条件问题。

2. 函数原型

1
2
3
4
5
6
7
#define _POSIX_C_SOURCE 200112L
#include <sys/select.h>

int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);

3. 功能

pselect 函数用于同时监视多个文件描述符,等待其中任何一个变为就绪状态(可读、可写或有异常)。它能够在等待期间临时设置信号屏蔽集,提供更好的信号处理安全性。

4. 参数

  • nfds: 要监视的最大文件描述符值加1

  • readfds: 指向要监视可读事件的文件描述符集合的指针

  • writefds: 指向要监视可写事件的文件描述符集合的指针

  • exceptfds: 指向要监视异常事件的文件描述符集合的指针

  • timeout: 指向超时时间的指针(NULL 表示无限等待)

  • sigmask: 指向信号屏蔽集的指针(NULL 表示不改变信号屏蔽)

5. timespec 结构体

1
2
3
4
5
struct timespec {
time_t tv_sec; /* 秒数 */
long tv_nsec; /* 纳秒数 */
};

6. 返回值

  • 成功: 返回准备就绪的文件描述符数量(0 表示超时)

  • 失败: 返回 -1,并设置相应的 errno 错误码

7. 常见错误码

  • EBADF: 一个或多个文件描述符无效

  • EINTR: 被未屏蔽的信号中断

  • EINVAL: 参数无效

  • ENOMEM: 内存不足

8. 相似函数或关联函数

  • select: 传统的文件描述符监视函数

  • poll: 更现代的文件描述符监视函数

  • ppoll: 带信号屏蔽的 poll

  • epoll_wait: epoll 接口的等待函数

  • read/write: 文件读写操作

  • signal/sigaction: 信号处理函数

9. 示例代码

示例1:基础用法 - 监视标准输入和定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

// 信号处理标志
volatile sig_atomic_t signal_received = 0;

// 信号处理函数
void signal_handler(int sig) {
signal_received = sig;
printf("\n收到信号 %d\n", sig);
}

int main() {
fd_set read_fds;
struct timespec timeout;
sigset_t sigmask;
int ready;

printf("=== pselect 基础示例 ===\n\n");

// 设置信号处理
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;

if (sigaction(SIGINT, &sa, NULL) == -1) {
perror("sigaction");
return 1;
}

if (sigaction(SIGTERM, &sa, NULL) == -1) {
perror("sigaction");
return 1;
}

// 初始化文件描述符集合
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);

// 设置超时时间 (5 秒)
timeout.tv_sec = 5;
timeout.tv_nsec = 0;

// 设置信号屏蔽集 (不屏蔽任何信号)
sigemptyset(&sigmask);

printf("监视设置:\n");
printf(" 监视文件描述符: %d (标准输入)\n", STDIN_FILENO);
printf(" 超时时间: %ld 秒\n", (long)timeout.tv_sec);
printf(" 按 Enter 键或等待超时...\n");
printf(" 按 Ctrl+C 发送信号\n\n");

// 使用 pselect 监视文件描述符
ready = pselect(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout, &sigmask);

if (ready == -1) {
if (errno == EINTR) {
printf("pselect 被信号中断\n");
if (signal_received) {
printf("收到信号: %d\n", signal_received);
}
} else {
perror("pselect 失败");
}
} else if (ready == 0) {
printf("超时: 没有文件描述符准备就绪\n");
} else {
printf("准备就绪的文件描述符数量: %d\n", ready);

if (FD_ISSET(STDIN_FILENO, &read_fds)) {
printf("标准输入有数据可读\n");

// 读取输入
char buffer&#91;256];
ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf("读取到: %s", buffer);
}
}
}

printf("\n=== pselect 特点 ===\n");
printf("1. 原子操作: 等待和信号屏蔽是原子的\n");
printf("2. 精确超时: 支持纳秒级超时\n");
printf("3. 信号安全: 避免信号处理中的竞态条件\n");
printf("4. 灵活屏蔽: 可以精确控制信号屏蔽\n");
printf("5. 高效监视: 同时监视多个文件描述符\n");

return 0;
}

示例2:多文件描述符监视

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>

// 创建临时文件用于测试
int create_test_file(const char *filename) {
int fd = open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644);
if (fd == -1) {
perror("创建测试文件失败");
return -1;
}

const char *content = "这是测试文件的内容\n用于演示 pselect 功能\n";
write(fd, content, strlen(content));
lseek(fd, 0, SEEK_SET);

printf("创建测试文件: %s\n", filename);
return fd;
}

// 显示文件描述符集合状态
void show_fd_set_status(fd_set *fds, int max_fd, const char *description) {
printf("%s:\n", description);
for (int i = 0; i <= max_fd; i++) {
if (FD_ISSET(i, fds)) {
printf(" fd %d: 就绪\n", i);
}
}
printf("\n");
}

int main() {
fd_set read_fds, master_fds;
struct timespec timeout;
sigset_t sigmask;
int test_fd1, test_fd2;
int max_fd;
int ready;

printf("=== pselect 多文件描述符监视示例 ===\n\n");

// 创建测试文件
test_fd1 = create_test_file("test1.txt");
test_fd2 = create_test_file("test2.txt");

if (test_fd1 == -1 || test_fd2 == -1) {
if (test_fd1 != -1) close(test_fd1);
if (test_fd2 != -1) close(test_fd2);
unlink("test1.txt");
unlink("test2.txt");
return 1;
}

// 初始化文件描述符集合
FD_ZERO(&master_fds);
FD_SET(STDIN_FILENO, &master_fds);
FD_SET(test_fd1, &master_fds);
FD_SET(test_fd2, &master_fds);

// 找到最大的文件描述符
max_fd = STDIN_FILENO;
if (test_fd1 > max_fd) max_fd = test_fd1;
if (test_fd2 > max_fd) max_fd = test_fd2;

// 设置超时时间 (3 秒)
timeout.tv_sec = 3;
timeout.tv_nsec = 0;

// 设置信号屏蔽集
sigemptyset(&sigmask);

printf("监视设置:\n");
printf(" 最大文件描述符: %d\n", max_fd);
printf(" 监视的文件描述符: %d(标准输入), %d(文件1), %d(文件2)\n",
STDIN_FILENO, test_fd1, test_fd2);
printf(" 超时时间: %ld 秒\n", (long)timeout.tv_sec);
printf("\n");

// 主循环
printf("开始监视... (按 Ctrl+C 退出)\n");
while (1) {
// 复制主集合到工作集合
read_fds = master_fds;

printf("等待事件...\n");
ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);

if (ready == -1) {
if (errno == EINTR) {
printf("pselect 被信号中断\n");
break;
} else {
perror("pselect 失败");
break;
}
} else if (ready == 0) {
printf("超时: 没有文件描述符准备就绪\n");
} else {
printf("准备就绪的文件描述符数量: %d\n", ready);

// 处理标准输入
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
printf("标准输入有数据:\n");
char buffer&#91;256];
ssize_t bytes_read = read(STDIN_FILENO, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf(" 读取到: %s", buffer);

if (strncmp(buffer, "quit", 4) == 0) {
printf("收到退出命令\n");
break;
}
}
}

// 处理测试文件1
if (FD_ISSET(test_fd1, &read_fds)) {
printf("测试文件1有数据可读\n");
lseek(test_fd1, 0, SEEK_SET);
char buffer&#91;128];
ssize_t bytes_read = read(test_fd1, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf(" 文件1内容: %s", buffer);
}
}

// 处理测试文件2
if (FD_ISSET(test_fd2, &read_fds)) {
printf("测试文件2有数据可读\n");
lseek(test_fd2, 0, SEEK_SET);
char buffer&#91;128];
ssize_t bytes_read = read(test_fd2, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf(" 文件2内容: %s", buffer);
}
}

printf("\n");
}
}

// 清理资源
printf("清理资源...\n");
close(test_fd1);
close(test_fd2);
unlink("test1.txt");
unlink("test2.txt");

printf("程序正常退出\n");
return 0;
}

示例3:完整的事件驱动服务器模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
#define _POSIX_C_SOURCE 200112L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <getopt.h>

// 服务器配置结构体
struct server_config {
int max_clients;
int timeout_seconds;
int verbose;
int use_signals;
};

// 客户端信息结构体
struct client_info {
int fd;
int connected;
time_t connect_time;
char buffer&#91;1024];
size_t buffer_pos;
};

// 服务器状态结构体
struct server_state {
int running;
int client_count;
struct client_info *clients;
int max_clients;
};

// 全局变量
volatile sig_atomic_t server_terminate = 0;

// 信号处理函数
void signal_handler(int sig) {
server_terminate = 1;
printf("\n收到终止信号 %d\n", sig);
}

// 初始化服务器状态
int init_server_state(struct server_state *state, int max_clients) {
state->running = 1;
state->client_count = 0;
state->max_clients = max_clients;

state->clients = calloc(max_clients, sizeof(struct client_info));
if (!state->clients) {
perror("分配客户端数组失败");
return -1;
}

return 0;
}

// 添加客户端
int add_client(struct server_state *state, int client_fd) {
if (state->client_count >= state->max_clients) {
fprintf(stderr, "客户端数量已达上限: %d\n", state->max_clients);
return -1;
}

for (int i = 0; i < state->max_clients; i++) {
if (!state->clients&#91;i].connected) {
state->clients&#91;i].fd = client_fd;
state->clients&#91;i].connected = 1;
state->clients&#91;i].connect_time = time(NULL);
state->clients&#91;i].buffer_pos = 0;
state->client_count++;

printf("添加客户端 %d (fd: %d),当前客户端数: %d\n",
i, client_fd, state->client_count);
return i;
}
}

fprintf(stderr, "找不到空闲的客户端槽位\n");
return -1;
}

// 移除客户端
void remove_client(struct server_state *state, int client_index) {
if (client_index >= 0 && client_index < state->max_clients &&
state->clients&#91;client_index].connected) {

close(state->clients&#91;client_index].fd);
memset(&state->clients&#91;client_index], 0, sizeof(struct client_info));
state->client_count--;

printf("移除客户端 %d,剩余客户端数: %d\n",
client_index, state->client_count);
}
}

// 显示服务器状态
void show_server_status(const struct server_state *state) {
printf("=== 服务器状态 ===\n");
printf("运行状态: %s\n", state->running ? "运行中" : "已停止");
printf("客户端数量: %d/%d\n", state->client_count, state->max_clients);
printf("当前时间: %s", ctime(&(time_t){time(NULL)}));

if (state->client_count > 0) {
printf("连接的客户端:\n");
for (int i = 0; i < state->max_clients; i++) {
if (state->clients&#91;i].connected) {
printf(" &#91;%d] fd=%d 连接时间: %s",
i, state->clients&#91;i].fd,
ctime(&state->clients&#91;i].connect_time));
}
}
}
printf("\n");
}

int main(int argc, char *argv&#91;]) {
struct server_config config = {
.max_clients = 10,
.timeout_seconds = 30,
.verbose = 0,
.use_signals = 0
};

struct server_state server_state_struct;
fd_set read_fds;
struct timespec timeout;
sigset_t sigmask;
int max_fd = STDIN_FILENO;
int ready;

printf("=== pselect 事件驱动服务器模拟器 ===\n\n");

// 解析命令行参数
static struct option long_options&#91;] = {
{"clients", required_argument, 0, 'c'},
{"timeout", required_argument, 0, 't'},
{"verbose", no_argument, 0, 'v'},
{"signals", no_argument, 0, 's'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};

int opt;
while ((opt = getopt_long(argc, argv, "c:t:vsh", long_options, NULL)) != -1) {
switch (opt) {
case 'c':
config.max_clients = atoi(optarg);
if (config.max_clients <= 0) config.max_clients = 10;
break;
case 't':
config.timeout_seconds = atoi(optarg);
if (config.timeout_seconds <= 0) config.timeout_seconds = 30;
break;
case 'v':
config.verbose = 1;
break;
case 's':
config.use_signals = 1;
break;
case 'h':
printf("用法: %s &#91;选项]\n", argv&#91;0]);
printf("选项:\n");
printf(" -c, --clients=NUM 最大客户端数 (默认 10)\n");
printf(" -t, --timeout=SECONDS 超时时间 (默认 30 秒)\n");
printf(" -v, --verbose 详细输出\n");
printf(" -s, --signals 启用信号处理\n");
printf(" -h, --help 显示此帮助信息\n");
return 0;
default:
fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv&#91;0]);
return 1;
}
}

// 设置信号处理
if (config.use_signals) {
struct sigaction sa;
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;

sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
printf("✓ 信号处理已启用\n");
}

// 初始化服务器状态
if (init_server_state(&server_state_struct, config.max_clients) == -1) {
return 1;
}

printf("服务器配置:\n");
printf(" 最大客户端数: %d\n", config.max_clients);
printf(" 超时时间: %d 秒\n", config.timeout_seconds);
printf(" 详细输出: %s\n", config.verbose ? "是" : "否");
printf(" 信号处理: %s\n", config.use_signals ? "是" : "否");
printf("\n");

// 设置超时时间
timeout.tv_sec = config.timeout_seconds;
timeout.tv_nsec = 0;

// 设置信号屏蔽集
sigemptyset(&sigmask);

printf("启动服务器模拟...\n");
printf("可用命令:\n");
printf(" 输入 'status' 查看服务器状态\n");
printf(" 输入 'connect' 模拟客户端连接\n");
printf(" 输入 'quit' 或 'exit' 退出服务器\n");
printf(" 输入 'help' 显示帮助信息\n");
printf(" 按 Ctrl+C 发送终止信号\n");
printf("\n");

// 主循环
while (server_state_struct.running && !server_terminate) {
// 初始化文件描述符集合
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
max_fd = STDIN_FILENO;

// 添加连接的客户端
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients&#91;i].connected) {
FD_SET(server_state_struct.clients&#91;i].fd, &read_fds);
if (server_state_struct.clients&#91;i].fd > max_fd) {
max_fd = server_state_struct.clients&#91;i].fd;
}
}
}

if (config.verbose) {
printf("监视 %d 个文件描述符 (最大fd: %d)...\n",
server_state_struct.client_count + 1, max_fd);
}

// 使用 pselect 等待事件
ready = pselect(max_fd + 1, &read_fds, NULL, NULL, &timeout, &sigmask);

if (ready == -1) {
if (errno == EINTR) {
if (config.verbose) {
printf("pselect 被信号中断\n");
}
continue;
} else {
perror("pselect 失败");
break;
}
} else if (ready == 0) {
if (config.verbose) {
printf("超时: 没有事件发生\n");
}
} else {
if (config.verbose) {
printf("准备就绪的文件描述符数量: %d\n", ready);
}

// 处理标准输入事件
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
char input_buffer&#91;256];
ssize_t bytes_read = read(STDIN_FILENO, input_buffer, sizeof(input_buffer) - 1);
if (bytes_read > 0) {
input_buffer&#91;bytes_read] = '\0';

// 处理特殊命令
if (strncmp(input_buffer, "quit", 4) == 0 ||
strncmp(input_buffer, "exit", 4) == 0) {
printf("收到退出命令\n");
server_state_struct.running = 0;
server_terminate = 1;
} else if (strncmp(input_buffer, "status", 6) == 0) {
show_server_status(&server_state_struct);
} else if (strncmp(input_buffer, "help", 4) == 0) {
printf("可用命令:\n");
printf(" quit/exit - 退出服务器\n");
printf(" status - 显示服务器状态\n");
printf(" help - 显示帮助信息\n");
printf(" Ctrl+C - 发送终止信号\n");
} else if (strncmp(input_buffer, "connect", 7) == 0) {
if (server_state_struct.client_count < server_state_struct.max_clients) {
// 模拟创建客户端连接
int fake_fd = 1000 + server_state_struct.client_count;
int client_index = add_client(&server_state_struct, fake_fd);
if (client_index != -1) {
printf("模拟客户端连接成功 (fd: %d)\n", fake_fd);
}
} else {
printf("客户端数量已达上限\n");
}
} else {
printf("收到输入: %s", input_buffer);
}
}
}

// 处理客户端事件
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients&#91;i].connected &&
FD_ISSET(server_state_struct.clients&#91;i].fd, &read_fds)) {

char client_buffer&#91;256];
ssize_t bytes_read = read(server_state_struct.clients&#91;i].fd,
client_buffer, sizeof(client_buffer) - 1);

if (bytes_read > 0) {
client_buffer&#91;bytes_read] = '\0';
printf("客户端 %d 发送数据: %s", i, client_buffer);

// 回显数据
write(server_state_struct.clients&#91;i].fd, "Echo: ", 6);
write(server_state_struct.clients&#91;i].fd, client_buffer, bytes_read);

} else if (bytes_read == 0) {
printf("客户端 %d 断开连接\n", i);
remove_client(&server_state_struct, i);
} else {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("读取客户端数据失败");
remove_client(&server_state_struct, i);
}
}
}
}
}
}

// 清理资源
printf("清理资源...\n");
for (int i = 0; i < server_state_struct.max_clients; i++) {
if (server_state_struct.clients&#91;i].connected) {
remove_client(&server_state_struct, i);
}
}
free(server_state_struct.clients);

printf("服务器模拟结束\n");

printf("\n=== pselect 服务器应用说明 ===\n");
printf("核心技术:\n");
printf("1. 多路复用: 同时监视多个文件描述符\n");
printf("2. 事件驱动: 基于事件的通知机制\n");
printf("3. 信号安全: 原子的信号处理\n");
printf("4. 超时控制: 精确的超时管理\n");
printf("5. 资源管理: 动态的客户端管理\n");
printf("\n");
printf("pselect 优势:\n");
printf("1. 原子性: 等待和信号处理是原子操作\n");
printf("2. 灵活性: 可以精确控制信号屏蔽\n");
printf("3. 精确性: 纳秒级超时控制\n");
printf("4. 可扩展: 适用于中小规模并发\n");
printf("5. 安全性: 避免信号处理中的竞态条件\n");

return 0;
}

pselect系统调用及示例-CSDN博客

data-ad-format="auto" data-full-width-responsive="true">