forked from mlkazar/lwt
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreadpipe.cc
More file actions
159 lines (133 loc) · 4.04 KB
/
threadpipe.cc
File metadata and controls
159 lines (133 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <string.h>
#include <stdio.h>
#include "threadpipe.h"
int32_t
ThreadPipe::write(const char *bufferp, int32_t count)
{
int32_t endPos;
int32_t tcount;
int32_t bytesThisTime;
int32_t bytesCopied;
bytesCopied = 0;
_lock.take();
while(count > 0) {
/* if other side indicated EOF, treat writes as if they're unable to do anything */
if (_eof) {
_lock.release();
return -1;
}
/* see where the first byte for writes lives */
endPos = _pos + _count;
if (endPos >= (signed) _maxBytes)
endPos -= _maxBytes;
/* don't copy more than requested */
bytesThisTime = count;
/* don't copy more than fit in the buffer */
tcount = _maxBytes - _count;
if (bytesThisTime > tcount)
bytesThisTime = tcount;
/* don't copy beyond the wrap */
tcount = _maxBytes - _pos;
if (bytesThisTime > tcount)
bytesThisTime = tcount;
if (bytesThisTime <= 0) {
_cv.wait();
continue;
}
memcpy(_data+endPos, bufferp, bytesThisTime);
bufferp += bytesThisTime;
bytesCopied += bytesThisTime;
count -= bytesThisTime; /* bytes left to copy */
_count += bytesThisTime; /* bytes in the buffer */
/* wakeup any readers, since we've added some data */
_cv.broadcast();
}
_lock.release();
return bytesCopied;
}
int32_t
ThreadPipe::read(char *bufferp, int32_t count)
{
int32_t tcount;
int32_t bytesThisTime;
int32_t bytesCopied;
bytesCopied = 0;
_lock.take();
while( 1) {
if (_count == 0 || count == 0) {
/* no more data left, or no more room left in receiver */
if (_eof || count == 0) {
/* we've run out of data and EOF is set, or we've run out of room;
* note that we don't have to check _count == 0 with eof case, since
* we already know that count==0 or _count == 0.
*/
break;
}
/* here, we've run out of data, but EOF isn't set yet, and we do have
* more room in the incoming buffer. Wait for more data.
*/
_cv.wait();
continue;
}
/* if _pos was pointing to the end, move it back to the start
* of the buffer.
*/
if (_pos >= _maxBytes)
_pos -= _maxBytes;
/* don't read more than requested */
bytesThisTime = count;
/* don't read more bytes than present in the buffer */
if (bytesThisTime > (signed) _count)
bytesThisTime = _count;
/* don't go past buffer wrap in one memcpy */
tcount = _maxBytes - _pos;
if (tcount < bytesThisTime)
bytesThisTime = tcount;
memcpy(bufferp, _data + _pos, bytesThisTime);
_pos += bytesThisTime;
if (_pos >= _maxBytes) {
/* we try not to wrap, so at most this may be equal to the end */
thread_assert(_pos == _maxBytes);
_pos -= _maxBytes;
}
_count -= bytesThisTime;
count -= bytesThisTime;
bufferp += bytesThisTime;
bytesCopied += bytesThisTime;
/* if someone may have been waiting for space into which to write,
* let them know there's space available now.
*/
_cv.broadcast();
}
_lock.release();
return bytesCopied;
}
/* discard available data until we see the EOF flag go on */
void
ThreadPipe::waitForEof()
{
_lock.take();
while(1) {
if (_eof)
break;
if (_count > 0) {
/* discard this data */
_pos += _count;
if (_pos >= _maxBytes) {
_pos -= _maxBytes;
}
_count = 0;
_cv.broadcast(); /* let people know there's more room */
}
_cv.wait();
}
_lock.release();
}
void
ThreadPipe::eof()
{
_lock.take();
_eof = 1;
_lock.release();
_cv.broadcast();
}