io_uring实现高效大文件多线程写入

io_uring实现高效大文件多线程写入

​多线程 + io_uring 实现高效大文件写入(64MB数据块,2GB文件分割)​​​

以下是完整的代码实现,使用 ​​两个线程​​:

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

​​生产者线程​​:生成 ​​64MB 数据块​​,放入队列。

​​消费者线程​​:从队列取出数据,通过 io_uring ​​异步写入文件​​,并在文件超过 ​​2GB 时自动切分​​。

​​1. 完整代码​​

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
#include <sys/stat.h>
#include <stdatomic.h>

#define BLOCK_SIZE (64 * 1024 * 1024) // 64MB 数据块
#define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024) // 2GB 文件分割阈值
#define QUEUE_SIZE 8 // 队列容量(防止内存爆炸)

// 数据块结构
typedef struct {
char *data;
size_t size;
} DataBlock;

// 线程安全队列
typedef struct {
DataBlock blocks&#91;QUEUE_SIZE];
atomic_int head, tail;
pthread_mutex_t mutex;
pthread_cond_t not_empty, not_full;
} BlockQueue;

// 全局队列
BlockQueue block_queue;
atomic_int file_counter = 0; // 文件计数器(用于切分)
atomic_ullong current_file_size = 0; // 当前文件大小

// 初始化队列
void init_queue(BlockQueue *q) {
q->head = q->tail = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->not_empty, NULL);
pthread_cond_init(&q->not_full, NULL);
}

// 生产者:生成随机数据并放入队列
void *producer_thread(void *arg) {
while (1) {
DataBlock block;
block.data = malloc(BLOCK_SIZE);
if (!block.data) {
perror("malloc");
exit(EXIT_FAILURE);
}
block.size = BLOCK_SIZE;

// 填充随机数据
for (size_t i = 0; i < BLOCK_SIZE; i++) {
block.data&#91;i] = rand() % 256;
}

// 放入队列
pthread_mutex_lock(&block_queue.mutex);
while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) {
pthread_cond_wait(&block_queue.not_full, &block_queue.mutex);
}
block_queue.blocks&#91;block_queue.tail] = block;
block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE;
pthread_cond_signal(&block_queue.not_empty);
pthread_mutex_unlock(&block_queue.mutex);
}
return NULL;
}

// 消费者:从队列取出数据,用 io_uring 写入文件
void *consumer_thread(void *arg) {
struct io_uring ring;
int fd = -1;
char filename&#91;256];

// 初始化 io_uring
if (io_uring_queue_init(8, &ring, 0) < 0) {
perror("io_uring_queue_init");
exit(EXIT_FAILURE);
}

while (1) {
DataBlock block;

// 从队列取出数据
pthread_mutex_lock(&block_queue.mutex);
while (block_queue.head == block_queue.tail) {
pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex);
}
block = block_queue.blocks&#91;block_queue.head];
block_queue.head = (block_queue.head + 1) % QUEUE_SIZE;
pthread_cond_signal(&block_queue.not_full);
pthread_mutex_unlock(&block_queue.mutex);

// 检查是否需要切分文件
if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) {
if (fd != -1) close(fd);
snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++);
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
perror("open");
exit(EXIT_FAILURE);
}
current_file_size = 0;
printf("Created new file: %s\n", filename);
}

// 提交异步写入请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size);
io_uring_sqe_set_data(sqe, block.data); // 关联数据块(用于释放)
io_uring_submit(&ring);

// 等待写入完成
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
exit(EXIT_FAILURE);
}
if (cqe->res < 0) {
fprintf(stderr, "Write error: %s\n", strerror(-cqe->res));
exit(EXIT_FAILURE);
}

// 更新文件大小并释放内存
current_file_size += cqe->res;
io_uring_cqe_seen(&ring, cqe);
free(block.data);
}

io_uring_queue_exit(&ring);
return NULL;
}

int main() {
pthread_t producer, consumer;

// 初始化队列
init_queue(&block_queue);

// 启动生产者线程
if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}

// 启动消费者线程
if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}

// 等待线程结束(实际上不会结束)
pthread_join(producer, NULL);
pthread_join(consumer, NULL);

return 0;
}

​​2. 关键设计​​

​​(1)线程分工​​

线程任务​​生产者线程​​生成 ​​64MB 随机数据​​,放入队列​​消费者线程​​从队列取出数据,用 io_uring ​​异步写入文件​​

​​(2)线程安全队列​​

  • ​​环形缓冲区​​(BlockQueue)避免频繁 malloc/free。

​​互斥锁(pthread_mutex_t)​​ + ​​条件变量(pthread_cond_t)​​ 保证线程安全:

  • not_empty:队列非空时唤醒消费者。

  • not_full:队列未满时唤醒生产者。

​​(3)文件切分逻辑​​

  • ​​current_file_size​​ 记录当前文件大小。

​​超过 2GB 时​​:

  • 关闭当前文件。

  • 创建新文件(large_file_0.bin, large_file_1.bin, …)。

​​(4)io_uring 优化​​

  • ​​批量化提交​​:可调整 QUEUE_SIZE 提高并发。

  • ​​内存对齐​​:建议 posix_memalign 分配内存(减少内核拷贝)。

  • ​​轮询模式​​:可启用 IORING_SETUP_SQPOLL 减少系统调用(需root)。

​​3. 编译与运行​​

1
2
gcc -o io_uring_multi_thread io_uring_multi_thread.c -luring -lpthread
./io_uring_multi_thread

​​输出示例​​:

1
2
3
Created new file: large_file_0.bin
Created new file: large_file_1.bin
...

​​4. 验证结果​​

1
2
ls -lh large_file_*.bin  # 检查文件大小
md5sum large_file_*.bin # 验证数据完整性

​​5. 性能优化建议​​

优化点说明​​内存池​​预分配多个 64MB 块,减少 malloc/free 开销​​批量提交​​一次提交多个 io_uring 请求(提高吞吐)​​O_DIRECT​​直接 I/O 绕过页缓存(需内存对齐)​​多消费者线程​​多个消费者线程并行处理队列

​​6. 总结​​

  • ​​生产者-消费者模型​​:解耦数据生成和写入,提高并行度。

  • ​​io_uring 异步 I/O​​:最大化磁盘写入性能。

  • ​​自动文件切分​​:避免单个文件过大(2GB 限制)。

适用于 ​​日志系统、数据库、大数据存储​​ 等场景。 🚀

data-ad-format="auto" data-full-width-responsive="true">