// cc -g -O2 -std=c17 -luring writeorder-pipe.c -o writeorder-pipe #define _GNU_SOURCE #undef NDEBUG #include #include #include #include #include #include #include #include #define BATCH_SIZE 10 #define WAIT_COUNT 1 int writer(int fd) { struct io_uring ring = {}; struct io_uring_params params = { .sq_entries = BATCH_SIZE, .flags = IORING_SETUP_SINGLE_ISSUER, }; int r = io_uring_queue_init_params(BATCH_SIZE, &ring, ¶ms); assert(r >= 0); uint64_t completion_seq = 0; uint64_t submission_seq = 0; uint64_t data[BATCH_SIZE]; uint32_t ncqe = BATCH_SIZE; uint64_t print = 0; while (true) { if (print++ % 1000 == 999) { printf("%12lu submitted | %12lu completed %.2f MiB\n", submission_seq, completion_seq, (double)(completion_seq*sizeof(submission_seq)) / 1048576.0); } uint64_t batch_submission_head = submission_seq; uint64_t batch_submission_tail = submission_seq + (uint64_t)ncqe; for (uint32_t i = 0; i < ncqe; i++) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); assert(sqe != NULL); sqe->flags |= IOSQE_IO_LINK; sqe->user_data = submission_seq; data[i] = submission_seq; io_uring_prep_write(sqe, fd, &data[i], sizeof(*data), 0); submission_seq++; } int submitted = io_uring_submit_and_wait(&ring, WAIT_COUNT); assert(submitted == ncqe); uint32_t head = 0; ncqe = 0; struct io_uring_cqe* cqe; io_uring_for_each_cqe(&ring, head, cqe) { if (cqe->res < 0) errx(1, "write: %s", strerror(-cqe->res)); uint64_t actual_seq = cqe->user_data; if (actual_seq != completion_seq) { printf("Expected %lu but got %lu (%s)\n", completion_seq, actual_seq, actual_seq == batch_submission_head ? "batch head" : actual_seq == batch_submission_tail ? "batch tail" : "middle of batch"); return 1; } ncqe++; completion_seq++; } assert(ncqe <= BATCH_SIZE); io_uring_cq_advance(&ring, ncqe); } return 0; } int reader(int fd) { uint64_t actual_seq, expected_seq = 0; for (;;) { ssize_t n = read(fd, &actual_seq, sizeof(uint64_t)); if (n < 0) err(1, "read"); assert(n == sizeof(uint64_t)); if (actual_seq != expected_seq) { printf("Reader got out of order seq: %lu, expected %lu\n", actual_seq, expected_seq); return 1; } expected_seq++; } return 0; } int main(int argc, char* argv[]) { int pipe_fds[2]; assert(pipe(pipe_fds) == 0); pid_t pid = fork(); assert(pid != -1); if (pid == 0) { close(pipe_fds[1]); return reader(pipe_fds[0]); } else { close(pipe_fds[0]); return writer(pipe_fds[1]); } }