io_pgetevents系统调用及示例

io_pgetevents系统调用及示例 io_setup/io_submit系统调用及示例

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

我们继续按照您的要求学习 Linux 系统编程中的重要函数。这次我们介绍 io_pgetevents。

1. 函数介绍

io_pgetevents 是一个 Linux 系统调用,它是 Linux AIO (Asynchronous I/O) 子系统的一部分。它是 io_getevents 函数的增强版本,主要增加了对信号屏蔽(signal mask)的支持。

简单来说,io_pgetevents 的作用是:

等待并获取之前提交给 Linux AIO 子系统的异步 I/O 操作的完成状态。

想象一下你去邮局寄很多封信(异步 I/O 请求):

  • 你把所有信件交给邮局(调用 io_submit),然后你就可以去做别的事情了,不需要在邮局柜台等着。

  • 过了一段时间,你想知道哪些信已经寄出去了(I/O 操作完成了)。

  • 你就可以使用 io_pgetevents 这个功能去邮局查询(等待)并取回那些已经处理完毕的回执单(I/O 完成事件)。

io_pgetevents 相比 io_getevents 的优势在于,它允许你在等待 I/O 完成的同时,原子性地设置一个临时的信号屏蔽字。这在需要精确控制信号处理的多线程程序中非常有用,可以避免竞态条件。

2. 函数原型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <linux/aio_abi.h> // 包含 io_event 结构体等 AIO 相关定义
#include <signal.h> // sigset_t
#include <sys/syscall.h> // syscall
#include <unistd.h>

// 注意:glibc 通常不直接包装 io_pgetevents,需要使用 syscall
// 系统调用号在不同架构上不同,例如 x86_64 上是 333 (SYS_io_pgetevents)

// 通过 syscall 调用的原型 (概念上)
long io_pgetevents(aio_context_t ctx_id,
long min_nr,
long nr,
struct io_event *events,
struct timespec *timeout,
const struct __aio_sigset *usig);

重要: 与 rseq 类似,io_pgetevents 在标准的 C 库 (glibc) 中通常没有直接的包装函数。你需要使用 syscall() 来调用它。

3. 功能

  • 等待 AIO 事件: 阻塞调用线程,直到至少 min_nr 个异步 I/O 事件完成,或者达到 timeout 指定的时间。

  • 获取完成事件: 将已完成的 I/O 事件信息填充到调用者提供的 events 数组中,最多填充 nr 个。

  • 原子性信号控制: 在等待期间,根据 usig 参数临时设置线程的信号屏蔽字。等待结束后,信号屏蔽字会恢复到调用前的状态。这是 io_getevents 所不具备的功能。

  • 超时控制: 可以指定一个等待超时时间,避免无限期阻塞。

4. 参数

  • aio_context_t ctx_id: 这是通过 io_setup 创建的 AIO 上下文(或称为 AIO 完成端口)的 ID。所有相关的异步 I/O 操作都提交到这个上下文中。

long min_nr: 调用希望获取的最少事件数量。

  • 如果设置为 1,则函数在至少有一个事件完成时返回。

  • 如果设置为 N(N > 1),则函数会等待,直到至少有 N 个事件完成(或超时)。

long nr: events 数组的大小,即调用者希望获取的最大事件数量。

  • 函数返回时,实际返回的事件数会 <= nr。

struct io_event *events: 指向一个 struct io_event 类型数组的指针。这个数组用于接收完成的 I/O 事件信息。struct io_event (定义在 <linux/aio_abi.h>) 通常包含:struct io_event { __u64 data; // 用户在 iocb 中指定的数据 (与请求关联) __u64 obj; // 指向完成的 iocb 的指针 __s64 res; // 操作结果 (例如 read/write 返回的字节数,或负的 errno) __s64 res2; // 额外的结果信息 (通常为 0) };

struct timespec *timeout: 指向一个 struct timespec 结构的指针,用于指定超时时间。

  • 如果为 NULL,则调用会无限期阻塞,直到至少 min_nr 个事件完成。

  • 如果 timeout->tv_sec == 0 && timeout->tv_nsec == 0,则函数变为非阻塞检查,立即返回已有的完成事件。

  • 否则,函数最多阻塞 timeout 指定的时间。

const struct __aio_sigset *usig: 这是 io_pgetevents 相比 io_getevents 新增的关键参数。它指向一个 struct __aio_sigset 结构,用于指定在等待期间要使用的临时信号屏蔽字。struct __aio_sigset { const sigset_t *sigmask; // 指向新的信号屏蔽字 size_t sigsetsize; // sigmask 指向的内存大小 (通常用 sizeof(sigset_t)) };

  • 如果 usig 为 NULL,则不修改信号屏蔽字,行为类似于 io_getevents。

  • 如果 usig 非 NULL,则在进入内核等待状态之前,线程的信号屏蔽字会被原子性地替换为 *usig->sigmask。在等待结束(无论是因事件到达还是超时)后,信号屏蔽字会恢复。

5. 返回值

  • 成功时: 返回实际获取到的事件数量(一个非负整数,且 >= min_nr 除非超时或被信号中断)。

  • 失败时: 返回 -1,并设置全局变量 errno 来指示具体的错误原因(例如 EFAULT events 或 timeout 指针无效,EINVAL ctx_id 无效或 min_nr/nr 无效,EINTR 调用被信号中断等)。

6. 相似函数,或关联函数

  • io_getevents: 功能与 io_pgetevents 相同,但不支持 usig 参数,无法原子性地控制信号屏蔽字。

  • io_setup: 创建 AIO 上下文。

  • io_destroy: 销毁 AIO 上下文。

  • io_submit: 向 AIO 上下文提交异步 I/O 请求。

  • io_cancel: 尝试取消一个已提交但尚未完成的异步 I/O 请求。

  • io_uring: Linux 5.1+ 引入的更新、更高效的异步 I/O 接口,通常比传统的 aio 性能更好且功能更强大。

7. 示例代码

重要提示: AIO 编程本身就比较复杂,涉及多个系统调用。下面的示例将展示 io_pgetevents 的使用,但会简化一些错误处理和资源清理,以突出重点。

示例 1:使用 io_pgetevents 读取文件并原子性地屏蔽信号

这个例子演示了如何设置 AIO 上下文,提交异步读取请求,然后使用 io_pgetevents 等待完成,并在等待期间原子性地屏蔽 SIGUSR1 信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// aio_pgetevents_example.c
// 编译: gcc -o aio_pgetevents_example aio_pgetevents_example.c
#define _GNU_SOURCE // For syscall, SIGUSR1, etc.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/syscall.h>
#include <linux/aio_abi.h>
#include <signal.h>
#include <sys/stat.h>
#include <assert.h>
#include <pthread.h> // For pthread_kill in signal sender

// 定义系统调用号 (x86_64)
#ifndef SYS_io_pgetevents
#define SYS_io_pgetevents 333
#endif
#ifndef SYS_io_setup
#define SYS_io_setup 206
#endif
#ifndef SYS_io_destroy
#define SYS_io_destroy 207
#endif
#ifndef SYS_io_submit
#define SYS_io_submit 209
#endif
#ifndef SYS_io_getevents
#define SYS_io_getevents 208
#endif

// 包装 io_pgetevents 系统调用
static inline int io_pgetevents(aio_context_t ctx, long min_nr, long nr,
struct io_event *events,
struct timespec *timeout,
struct __aio_sigset *usig) {
return syscall(SYS_io_pgetevents, ctx, min_nr, nr, events, timeout, usig);
}

// 包装 io_setup
static inline int io_setup(unsigned nr_events, aio_context_t *ctx_idp) {
return syscall(SYS_io_setup, nr_events, ctx_idp);
}

// 包装 io_destroy
static inline int io_destroy(aio_context_t ctx) {
return syscall(SYS_io_destroy, ctx);
}

// 包装 io_submit
static inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) {
return syscall(SYS_io_submit, ctx, nr, iocbpp);
}

#define NUM_REQUESTS 2
#define BUFFER_SIZE 1024

// 信号处理函数
void signal_handler(int sig) {
printf("Signal %d received in main thread!\n", sig);
}

// 发送信号的线程函数
void* signal_sender_thread(void *arg) {
pid_t main_tid = *(pid_t*)arg;
sleep(2); // 等待 main 线程进入 io_pgetevents
printf("Signal sender: Sending SIGUSR1 to main thread (TID %d)...\n", main_tid);
// 注意:pthread_kill 发送给线程,kill 发送给进程
// 这里假设 main_tid 是线程 ID (实际获取线程 ID 需要 gettid() 或其他方法)
// 为简化,我们用 kill 发送给整个进程
// pthread_kill 需要更复杂的设置,这里用 kill 演示
if (kill(getpid(), SIGUSR1) != 0) {
perror("kill SIGUSR1");
}
return NULL;
}

int main() {
const char *filename = "test_aio_file.txt";
int fd;
aio_context_t ctx = 0;
struct iocb iocbs&#91;NUM_REQUESTS];
struct iocb *iocb_ptrs&#91;NUM_REQUESTS];
struct io_event events&#91;NUM_REQUESTS];
char buffers&#91;NUM_REQUESTS]&#91;BUFFER_SIZE];
struct sigaction sa;
sigset_t block_sigusr1, oldset;
struct __aio_sigset aio_sigset;
pthread_t sig_thread;
pid_t main_tid = getpid(); // Simplification for example

// 1. 创建测试文件
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd == -1) {
perror("open test file for writing");
exit(EXIT_FAILURE);
}
const char *test_data = "This is test data for asynchronous I/O operation number one.\n"
"This is test data for asynchronous I/O operation number two.\n";
if (write(fd, test_data, strlen(test_data)) != (ssize_t)strlen(test_data)) {
perror("write test data");
close(fd);
exit(EXIT_FAILURE);
}
close(fd);
printf("Created test file '%s'.\n", filename);

// 2. 设置信号处理
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0; // No SA_RESTART for demonstration
if (sigaction(SIGUSR1, &sa, NULL) == -1) {
perror("sigaction SIGUSR1");
exit(EXIT_FAILURE);
}
printf("SIGUSR1 handler installed.\n");

// 3. 打开文件进行异步读取
fd = open(filename, O_RDONLY);
if (fd == -1) {
perror("open test file for reading");
exit(EXIT_FAILURE);
}

// 4. 初始化 AIO 上下文
if (io_setup(NUM_REQUESTS, &ctx) < 0) {
perror("io_setup");
close(fd);
exit(EXIT_FAILURE);
}
printf("AIO context created.\n");

// 5. 准备 AIO 读取请求
for (int i = 0; i < NUM_REQUESTS; i++) {
// 初始化 iocb 结构
memset(&iocbs&#91;i], 0, sizeof(struct iocb));
iocbs&#91;i].aio_fildes = fd;
iocbs&#91;i].aio_lio_opcode = IOCB_CMD_PREAD; // 异步预读
iocbs&#91;i].aio_reqprio = 0;
iocbs&#91;i].aio_buf = (uint64_t)(buffers&#91;i]); // 读入缓冲区
iocbs&#91;i].aio_nbytes = BUFFER_SIZE / 2; // 读取一半缓冲区大小
iocbs&#91;i].aio_offset = i * (BUFFER_SIZE / 2); // 从不同偏移量开始读
iocbs&#91;i].aio_data = i + 1; // 用户数据,用于标识请求
iocb_ptrs&#91;i] = &iocbs&#91;i];
}

// 6. 提交 AIO 请求
printf("Submitting %d AIO read requests...\n", NUM_REQUESTS);
int ret = io_submit(ctx, NUM_REQUESTS, iocb_ptrs);
if (ret < 0) {
perror("io_submit");
io_destroy(ctx);
close(fd);
exit(EXIT_FAILURE);
} else if (ret != NUM_REQUESTS) {
fprintf(stderr, "Submitted %d requests, expected %d\n", ret, NUM_REQUESTS);
} else {
printf("Successfully submitted %d AIO requests.\n", ret);
}

// 7. 设置信号屏蔽 (用于 io_pgetevents)
sigemptyset(&block_sigusr1);
sigaddset(&block_sigusr1, SIGUSR1);
aio_sigset.sigmask = &block_sigusr1;
aio_sigset.sigsetsize = sizeof(block_sigusr1);

// 8. 启动信号发送线程
if (pthread_create(&sig_thread, NULL, signal_sender_thread, &main_tid) != 0) {
perror("pthread_create signal sender");
io_destroy(ctx);
close(fd);
exit(EXIT_FAILURE);
}

printf("Main thread: Waiting for AIO events with SIGUSR1 blocked atomically...\n");

// 9. 关键:使用 io_pgetevents 等待,原子性地屏蔽 SIGUSR1
// 这意味着在内核等待期间,SIGUSR1 会被阻塞。
// 如果在此期间有 SIGUSR1 到达,它会被挂起,直到 io_pgetevents 返回。
struct timespec timeout;
timeout.tv_sec = 10; // 10 秒超时
timeout.tv_nsec = 0;

ret = io_pgetevents(ctx, 1, NUM_REQUESTS, events, &timeout, &aio_sigset);

if (ret < 0) {
if (errno == EINTR) {
printf("io_pgetevents was interrupted by a signal (EINTR).\n");
} else {
perror("io_pgetevents");
}
} else {
printf("io_pgetevents returned %d events:\n", ret);
for (int i = 0; i < ret; i++) {
printf(" Event %d: data=%llu, res=%lld\n",
i, (unsigned long long)events&#91;i].data, (long long)events&#91;i].res);
if (events&#91;i].res > 0) {
buffers&#91;events&#91;i].data - 1]&#91;events&#91;i].res] = '\0'; // Null-terminate
printf(" Data: %s", buffers&#91;events&#91;i].data - 1]);
}
}
}

printf("Main thread: io_pgetevents finished.\n");

// 10. 等待信号发送线程结束
pthread_join(sig_thread, NULL);

// 11. 清理资源
io_destroy(ctx);
close(fd);
unlink(filename); // 删除测试文件

printf("Example finished.\n");
return 0;
}

代码解释:

定义系统调用: 由于 glibc 可能没有包装,我们手动定义了 io_pgetevents 及相关 AIO 系统调用的包装函数。

创建测试文件: 程序首先创建一个包含测试数据的文件。

设置信号处理: 为 SIGUSR1 安装一个处理函数,用于演示信号处理。

打开文件: 以只读方式打开测试文件。

初始化 AIO 上下文: 调用 io_setup 创建一个可以处理 NUM_REQUESTS 个并发请求的上下文。

准备 AIO 请求: 初始化两个 struct iocb 结构,设置为从文件不同偏移量异步预读取数据。

提交请求: 调用 io_submit 将这两个读取请求提交给 AIO 引擎。

设置信号屏蔽: 创建一个包含 SIGUSR1 的信号集 block_sigusr1,并填充 struct __aio_sigset 结构 aio_sigset。

启动信号发送线程: 创建一个线程,它会在 2 秒后向主进程发送 SIGUSR1 信号。这用来测试信号屏蔽效果。

关键步骤 - io_pgetevents:

  • 设置 10 秒超时。

  • 调用 io_pgetevents(ctx, 1, NUM_REQUESTS, events, &timeout, &aio_sigset)。

  • min_nr=1: 至少等待 1 个事件完成。

  • &aio_sigset: 传递信号集,告诉内核在等待期间原子性地屏蔽 SIGUSR1。

等待和处理: 主线程在 io_pgetevents 中等待。在此期间,SIGUSR1 被屏蔽。信号发送线程发出的 SIGUSR1 会被挂起。当 io_pgetevents 返回时(因为 I/O 完成或超时),信号屏蔽恢复,被挂起的 SIGUSR1 随即被递达,信号处理函数得以执行。

输出结果: 打印获取到的事件信息和读取到的数据。

清理: 等待信号发送线程结束,销毁 AIO 上下文,关闭文件,删除测试文件。

核心概念:

  • io_pgetevents 的 usig 参数使得信号屏蔽和等待 I/O 成为一个原子操作。这避免了在设置信号屏蔽和调用 io_getevents 之间收到信号的竞态条件。

  • 如果使用 io_getevents,你需要先调用 pthread_sigmask(SIG_SETMASK, …) 设置屏蔽,然后调用 io_getevents,最后再调用 pthread_sigmask(SIG_SETMASK, …) 恢复。在这三步之间,信号可能会到达,导致竞态。

重要提示与注意事项:

内核版本: io_pgetevents 需要 Linux 内核 4.18 或更高版本。

glibc 支持: 标准 C 库可能不提供直接包装,需要使用 syscall。

复杂性: AIO 本身就是一个复杂的子系统,涉及上下文管理、请求提交、事件获取等多个步骤。

性能: 传统的 aio 性能可能不如现代的 io_uring。对于新项目,考虑使用 io_uring。

信号安全: io_pgetevents 本身不是异步信号安全的,不应在信号处理函数中直接调用。

usig 参数: 这是 io_pgetevents 的核心优势。正确使用它可以编写出在信号处理方面更健壮的代码。

错误处理: 始终检查返回值和 errno,尤其是在处理 EINTR(被信号中断)时。

总结:

io_pgetevents 是 Linux AIO 系统调用 io_getevents 的增强版,关键改进是增加了对原子性信号屏蔽的支持。这使得在等待异步 I/O 完成时能够更安全、更精确地控制信号处理,避免了传统方法中的竞态条件。虽然使用起来比较底层和复杂,但对于需要高性能异步 I/O 并且对信号处理有严格要求的系统级编程来说,它是一个非常有价值的工具。

data-ad-format="auto" data-full-width-responsive="true">