forked from runshenzhu/6.828-MIT-OS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipe.c
191 lines (163 loc) · 4 KB
/
pipe.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include <inc/lib.h>
#define debug 0
static ssize_t devpipe_read(struct Fd *fd, void *buf, size_t n);
static ssize_t devpipe_write(struct Fd *fd, const void *buf, size_t n);
static int devpipe_stat(struct Fd *fd, struct Stat *stat);
static int devpipe_close(struct Fd *fd);
struct Dev devpipe =
{
.dev_id = 'p',
.dev_name = "pipe",
.dev_read = devpipe_read,
.dev_write = devpipe_write,
.dev_close = devpipe_close,
.dev_stat = devpipe_stat,
};
#define PIPEBUFSIZ 32 // small to provoke races
struct Pipe {
off_t p_rpos; // read position
off_t p_wpos; // write position
uint8_t p_buf[PIPEBUFSIZ]; // data buffer
};
int
pipe(int pfd[2])
{
int r;
struct Fd *fd0, *fd1;
void *va;
// allocate the file descriptor table entries
if ((r = fd_alloc(&fd0)) < 0
|| (r = sys_page_alloc(0, fd0, PTE_P|PTE_W|PTE_U|PTE_SHARE)) < 0)
goto err;
if ((r = fd_alloc(&fd1)) < 0
|| (r = sys_page_alloc(0, fd1, PTE_P|PTE_W|PTE_U|PTE_SHARE)) < 0)
goto err1;
// allocate the pipe structure as first data page in both
va = fd2data(fd0);
if ((r = sys_page_alloc(0, va, PTE_P|PTE_W|PTE_U|PTE_SHARE)) < 0)
goto err2;
if ((r = sys_page_map(0, va, 0, fd2data(fd1), PTE_P|PTE_W|PTE_U|PTE_SHARE)) < 0)
goto err3;
// set up fd structures
fd0->fd_dev_id = devpipe.dev_id;
fd0->fd_omode = O_RDONLY;
fd1->fd_dev_id = devpipe.dev_id;
fd1->fd_omode = O_WRONLY;
if (debug)
cprintf("[%08x] pipecreate %08x\n", thisenv->env_id, uvpt[PGNUM(va)]);
pfd[0] = fd2num(fd0);
pfd[1] = fd2num(fd1);
return 0;
err3:
sys_page_unmap(0, va);
err2:
sys_page_unmap(0, fd1);
err1:
sys_page_unmap(0, fd0);
err:
return r;
}
static int
_pipeisclosed(struct Fd *fd, struct Pipe *p)
{
int n, nn, ret;
while (1) {
n = thisenv->env_runs;
ret = pageref(fd) == pageref(p);
nn = thisenv->env_runs;
if (n == nn)
return ret;
if (n != nn && ret == 1)
cprintf("pipe race avoided\n", n, thisenv->env_runs, ret);
}
}
int
pipeisclosed(int fdnum)
{
struct Fd *fd;
struct Pipe *p;
int r;
if ((r = fd_lookup(fdnum, &fd)) < 0)
return r;
p = (struct Pipe*) fd2data(fd);
return _pipeisclosed(fd, p);
}
static ssize_t
devpipe_read(struct Fd *fd, void *vbuf, size_t n)
{
uint8_t *buf;
size_t i;
struct Pipe *p;
p = (struct Pipe*)fd2data(fd);
if (debug)
cprintf("[%08x] devpipe_read %08x %d rpos %d wpos %d\n",
thisenv->env_id, uvpt[PGNUM(p)], n, p->p_rpos, p->p_wpos);
buf = vbuf;
for (i = 0; i < n; i++) {
while (p->p_rpos == p->p_wpos) {
// pipe is empty
// if we got any data, return it
if (i > 0)
return i;
// if all the writers are gone, note eof
if (_pipeisclosed(fd, p))
return 0;
// yield and see what happens
if (debug)
cprintf("devpipe_read yield\n");
sys_yield();
}
// there's a byte. take it.
// wait to increment rpos until the byte is taken!
buf[i] = p->p_buf[p->p_rpos % PIPEBUFSIZ];
p->p_rpos++;
}
return i;
}
static ssize_t
devpipe_write(struct Fd *fd, const void *vbuf, size_t n)
{
const uint8_t *buf;
size_t i;
struct Pipe *p;
p = (struct Pipe*) fd2data(fd);
if (debug)
cprintf("[%08x] devpipe_write %08x %d rpos %d wpos %d\n",
thisenv->env_id, uvpt[PGNUM(p)], n, p->p_rpos, p->p_wpos);
buf = vbuf;
for (i = 0; i < n; i++) {
while (p->p_wpos >= p->p_rpos + sizeof(p->p_buf)) {
// pipe is full
// if all the readers are gone
// (it's only writers like us now),
// note eof
if (_pipeisclosed(fd, p))
return 0;
// yield and see what happens
if (debug)
cprintf("devpipe_write yield\n");
sys_yield();
}
// there's room for a byte. store it.
// wait to increment wpos until the byte is stored!
p->p_buf[p->p_wpos % PIPEBUFSIZ] = buf[i];
p->p_wpos++;
}
return i;
}
static int
devpipe_stat(struct Fd *fd, struct Stat *stat)
{
struct Pipe *p = (struct Pipe*) fd2data(fd);
strcpy(stat->st_name, "<pipe>");
stat->st_size = p->p_wpos - p->p_rpos;
stat->st_isdir = 0;
stat->st_dev = &devpipe;
return 0;
}
static int
devpipe_close(struct Fd *fd)
{
(void) sys_page_unmap(0, fd);
return sys_page_unmap(0, fd2data(fd));
}