1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
| #define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <sys/uio.h> #include <sys/socket.h> #include <netinet/in.h> #include <string.h> #include <errno.h> #include <time.h> #include <pthread.h>
#define BUFFER_SIZE 8192 #define MAX_PACKETS 1000
// 数据包结构 typedef struct { char data[BUFFER_SIZE]; size_t length; int packet_id; time_t timestamp; } data_packet_t;
// 转发器统计信息 typedef struct { volatile long long packets_forwarded; volatile long long bytes_forwarded; volatile long long packets_dropped; time_t start_time; } forwarder_stats_t;
forwarder_stats_t stats = {0};
// 模拟网络接收缓冲区 typedef struct { char* buffer; size_t size; size_t offset; } network_buffer_t;
// 创建模拟网络缓冲区 network_buffer_t* create_network_buffer(size_t size) { network_buffer_t* nb = malloc(sizeof(network_buffer_t)); if (!nb) return NULL; nb->buffer = malloc(size); if (!nb->buffer) { free(nb); return NULL; } nb->size = size; nb->offset = 0; // 填充模拟数据 for (size_t i = 0; i < size; i++) { nb->buffer[i] = 'A' + (i % 26); } return nb; }
// 从网络缓冲区读取数据包 int read_packet_from_buffer(network_buffer_t* nb, char* packet_buffer, size_t max_size) { if (nb->offset >= nb->size) { return 0; // 没有更多数据 } // 模拟不同大小的数据包 size_t packet_size = 100 + (rand() % 1000); if (packet_size > max_size) { packet_size = max_size; } if (nb->offset + packet_size > nb->size) { packet_size = nb->size - nb->offset; } if (packet_size > 0) { memcpy(packet_buffer, nb->buffer + nb->offset, packet_size); nb->offset += packet_size; return packet_size; } return 0; }
// 使用vmsplice转发数据包 int forward_packet_with_vmsplice(int pipe_fd, const char* packet_data, size_t packet_size) { struct iovec iov; iov.iov_base = (void*)packet_data; iov.iov_len = packet_size; ssize_t result = vmsplice(pipe_fd, &iov, 1, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); if (result > 0) { __atomic_fetch_add(&stats.packets_forwarded, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&stats.bytes_forwarded, result, __ATOMIC_RELAXED); return 0; // 成功 } else if (result == -1) { if (errno == EAGAIN) { // 管道满,数据包被丢弃 __atomic_fetch_add(&stats.packets_dropped, 1, __ATOMIC_RELAXED); return 1; // 丢弃 } else { perror("vmsplice转发失败"); return -1; // 错误 } } return -1; }
// 数据包生成器线程 void* packet_generator_thread(void* arg) { int pipe_write_fd = *(int*)arg; network_buffer_t* nb = create_network_buffer(1024 * 1024); // 1MB缓冲区 if (!nb) { printf("生成器: 创建网络缓冲区失败\n"); return NULL; } printf("生成器线程启动\n"); char packet_buffer[BUFFER_SIZE]; int packet_count = 0; // 生成数据包 while (packet_count < MAX_PACKETS) { int packet_size = read_packet_from_buffer(nb, packet_buffer, sizeof(packet_buffer)); if (packet_size > 0) { // 使用vmsplice转发数据包 int result = forward_packet_with_vmsplice(pipe_write_fd, packet_buffer, packet_size); if (result == 0) { packet_count++; if (packet_count % 100 == 0) { printf("生成器: 已生成 %d 个数据包\n", packet_count); } } else if (result == 1) { printf("生成器: 数据包被丢弃 (管道满)\n"); } else { printf("生成器: 转发错误\n"); break; } // 模拟网络延迟 usleep(1000); // 1毫秒 } else { break; // 没有更多数据 } } free(nb->buffer); free(nb); printf("生成器线程完成,共生成 %d 个数据包\n", packet_count); return NULL; }
// 数据包处理器线程 void* packet_processor_thread(void* arg) { int pipe_read_fd = *(int*)arg; printf("处理器线程启动\n"); char buffer[BUFFER_SIZE]; int processed_packets = 0; // 处理数据包 while (processed_packets < MAX_PACKETS) { ssize_t bytes_received = read(pipe_read_fd, buffer, sizeof(buffer)); if (bytes_received > 0) { // 模拟数据包处理 processed_packets++; if (processed_packets % 100 == 0) { printf("处理器: 已处理 %d 个数据包\n", processed_packets); } // 模拟处理时间 usleep(500); // 0.5毫秒 } else if (bytes_received == 0) { printf("处理器: 管道已关闭\n"); break; } else { if (errno == EAGAIN) { usleep(1000); // 等待1毫秒后重试 continue; } else { perror("处理器: 读取数据失败"); break; } } } printf("处理器线程完成,共处理 %d 个数据包\n", processed_packets); return NULL; }
// 显示转发统计 void show_forwarding_statistics() { time_t current_time = time(NULL); double elapsed_time = difftime(current_time, stats.start_time); long long packets_forwarded = __atomic_load_n(&stats.packets_forwarded, __ATOMIC_RELAXED); long long bytes_forwarded = __atomic_load_n(&stats.bytes_forwarded, __ATOMIC_RELAXED); long long packets_dropped = __atomic_load_n(&stats.packets_dropped, __ATOMIC_RELAXED); printf("\n=== 转发统计 ===\n"); printf("转发数据包: %lld\n", packets_forwarded); printf("转发字节数: %lld (%.2f MB)\n", bytes_forwarded, bytes_forwarded / (1024.0 * 1024.0)); printf("丢弃数据包: %lld\n", packets_dropped); printf("运行时间: %.2f 秒\n", elapsed_time); if (elapsed_time > 0) { printf("平均转发速率: %.2f 包/秒\n", packets_forwarded / elapsed_time); printf("平均吞吐量: %.2f MB/s\n", (bytes_forwarded / (1024.0 * 1024.0)) / elapsed_time); } if (packets_forwarded + packets_dropped > 0) { double drop_rate = (double)packets_dropped / (packets_forwarded + packets_dropped) * 100; printf("丢包率: %.2f%%\n", drop_rate); } printf("================\n\n"); }
// 性能基准测试 void performance_benchmark() { printf("=== vmsplice性能基准测试 ===\n"); int pipefd[2]; if (pipe(pipefd) == -1) { perror("创建管道失败"); return; } // 设置非阻塞模式 int flags = fcntl(pipefd[1], F_GETFL); fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK); flags = fcntl(pipefd[0], F_GETFL); fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK); // 准备测试数据 size_t test_sizes[] = {1024, 4096, 16384, 65536, 262144}; // 1KB到256KB int num_sizes = sizeof(test_sizes) / sizeof(test_sizes[0]); printf("%-10s %-15s %-15s %-15s\n", "大小", "vmsplice", "write", "性能提升"); printf("%-10s %-15s %-15s %-15s\n", "----", "--------", "-----", "--------"); for (int i = 0; i < num_sizes; i++) { size_t size = test_sizes[i]; char* test_data = malloc(size); if (!test_data) continue; // 填充测试数据 memset(test_data, 'X', size); struct iovec iov; iov.iov_base = test_data; iov.iov_len = size; // 测试vmsplice clock_t start = clock(); ssize_t vmsplice_result = vmsplice(pipefd[1], &iov, 1, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); clock_t end = clock(); double vmsplice_time = ((double)(end - start)) / CLOCKS_PER_SEC; // 清空管道 char dummy_buffer[1024 * 1024]; while (read(pipefd[0], dummy_buffer, sizeof(dummy_buffer)) > 0); // 测试传统write start = clock(); ssize_t write_result = write(pipefd[1], test_data, size); end = clock(); double write_time = ((double)(end - start)) / CLOCKS_PER_SEC; // 清空管道 while (read(pipefd[0], dummy_buffer, sizeof(dummy_buffer)) > 0); double speedup = (write_time > 0) ? (write_time / vmsplice_time) : 0; printf("%-10zu %-15.6f %-15.6f %-15.2fx\n", size, vmsplice_time, write_time, speedup); free(test_data); } close(pipefd[0]); close(pipefd[1]); printf("=== 性能基准测试完成 ===\n\n"); }
int main() { printf("=== 网络数据转发应用示例 ===\n"); // 执行性能基准测试 performance_benchmark(); // 初始化统计 stats.start_time = time(NULL); // 创建管道用于线程间通信 int pipefd[2]; if (pipe(pipefd) == -1) { perror("创建管道失败"); exit(EXIT_FAILURE); } // 设置管道为非阻塞模式 int flags = fcntl(pipefd[1], F_GETFL); fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK); flags = fcntl(pipefd[0], F_GETFL); fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK); printf("创建转发管道: 读端=%d, 写端=%d\n", pipefd[0], pipefd[1]); // 创建线程 pthread_t generator_thread, processor_thread; // 启动数据包生成器线程 if (pthread_create(&generator_thread, NULL, packet_generator_thread, &pipefd[1]) != 0) { perror("创建生成器线程失败"); close(pipefd[0]); close(pipefd[1]); exit(EXIT_FAILURE); } // 启动数据包处理器线程 if (pthread_create(&processor_thread, NULL, packet_processor_thread, &pipefd[0]) != 0) { perror("创建处理器线程失败"); close(pipefd[0]); close(pipefd[1]); exit(EXIT_FAILURE); } // 主线程定期显示统计信息 for (int i = 0; i < 30; i++) { // 运行30秒 sleep(2); show_forwarding_statistics(); } // 等待线程完成 pthread_join(generator_thread, NULL); pthread_join(processor_thread, NULL); // 显示最终统计 show_forwarding_statistics(); // 清理资源 close(pipefd[0]); close(pipefd[1]); printf("=== 网络数据转发应用演示完成 ===\n"); return 0; }
|