@@ -27,19 +27,26 @@ obj2node(bqueue_t *q, void *data)
27
27
28
28
/*
29
29
* Initialize a blocking queue The maximum capacity of the queue is set to
30
- * size. Types that are stored in a bqueue must contain a bqueue_node_t,
31
- * and node_offset must be its offset from the start of the struct.
32
- * fill_fraction is a performance tuning value; when the queue is full, any
33
- * threads attempting to enqueue records will block. They will block until
34
- * they're signaled, which will occur when the queue is at least 1/fill_fraction
30
+ * size. Types that are stored in a bqueue must contain a bqueue_node_t, and
31
+ * node_offset must be its offset from the start of the struct. fill_fraction
32
+ * is a performance tuning value; when the queue is full, any threads
33
+ * attempting to enqueue records will block. They will block until they're
34
+ * signaled, which will occur when the queue is at least 1/fill_fraction
35
35
* empty. Similar behavior occurs on dequeue; if the queue is empty, threads
36
- * block. They will be signalled when the queue has 1/fill_fraction full, or
37
- * when bqueue_flush is called. As a result, you must call bqueue_flush when
38
- * you enqueue your final record on a thread, in case the dequeueing threads are
39
- * currently blocked and that enqueue does not cause them to be awoken.
40
- * Alternatively, this behavior can be disabled (causing signaling to happen
41
- * immediately) by setting fill_fraction to any value larger than size.
42
- * Return 0 on success, or -1 on failure.
36
+ * block. They will be signalled when the queue has 1/fill_fraction full.
37
+ * As a result, you must call bqueue_enqueue_flush() when you enqueue your
38
+ * final record on a thread, in case the dequeuing threads are currently
39
+ * blocked and that enqueue does not cause them to be woken. Alternatively,
40
+ * this behavior can be disabled (causing signaling to happen immediately) by
41
+ * setting fill_fraction to any value larger than size. Return 0 on success,
42
+ * or -1 on failure.
43
+ *
44
+ * Note: The caller must ensure that for a given bqueue_t, there's only a
45
+ * single call to bqueue_enqueue() running at a time (e.g. by calling only
46
+ * from a single thread, or with locking around the call). Similarly, the
47
+ * caller must ensure that there's only a single call to bqueue_dequeue()
48
+ * running at a time. However, the one call to bqueue_enqueue() may be
49
+ * invoked concurrently with the one call to bqueue_dequeue().
43
50
*/
44
51
int
45
52
bqueue_init (bqueue_t * q , uint_t fill_fraction , size_t size , size_t node_offset )
@@ -49,11 +56,17 @@ bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
49
56
}
50
57
list_create (& q -> bq_list , node_offset + sizeof (bqueue_node_t ),
51
58
node_offset + offsetof(bqueue_node_t , bqn_node ));
59
+ list_create (& q -> bq_dequeuing_list , node_offset + sizeof (bqueue_node_t ),
60
+ node_offset + offsetof(bqueue_node_t , bqn_node ));
61
+ list_create (& q -> bq_enqueuing_list , node_offset + sizeof (bqueue_node_t ),
62
+ node_offset + offsetof(bqueue_node_t , bqn_node ));
52
63
cv_init (& q -> bq_add_cv , NULL , CV_DEFAULT , NULL );
53
64
cv_init (& q -> bq_pop_cv , NULL , CV_DEFAULT , NULL );
54
65
mutex_init (& q -> bq_lock , NULL , MUTEX_DEFAULT , NULL );
55
66
q -> bq_node_offset = node_offset ;
56
67
q -> bq_size = 0 ;
68
+ q -> bq_dequeuing_size = 0 ;
69
+ q -> bq_enqueuing_size = 0 ;
57
70
q -> bq_maxsize = size ;
58
71
q -> bq_fill_fraction = fill_fraction ;
59
72
return (0 );
@@ -69,9 +82,13 @@ bqueue_destroy(bqueue_t *q)
69
82
{
70
83
mutex_enter (& q -> bq_lock );
71
84
ASSERT0 (q -> bq_size );
85
+ ASSERT0 (q -> bq_dequeuing_size );
86
+ ASSERT0 (q -> bq_enqueuing_size );
72
87
cv_destroy (& q -> bq_add_cv );
73
88
cv_destroy (& q -> bq_pop_cv );
74
89
list_destroy (& q -> bq_list );
90
+ list_destroy (& q -> bq_dequeuing_list );
91
+ list_destroy (& q -> bq_enqueuing_list );
75
92
mutex_exit (& q -> bq_lock );
76
93
mutex_destroy (& q -> bq_lock );
77
94
}
@@ -81,23 +98,24 @@ bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
81
98
{
82
99
ASSERT3U (item_size , > , 0 );
83
100
ASSERT3U (item_size , <=, q -> bq_maxsize );
84
- mutex_enter ( & q -> bq_lock );
101
+
85
102
obj2node (q , data )-> bqn_size = item_size ;
86
- while (q -> bq_size && q -> bq_size + item_size > q -> bq_maxsize ) {
87
- /*
88
- * Wake up bqueue_dequeue() thread if already sleeping in order
89
- * to prevent the deadlock condition
90
- */
91
- cv_signal (& q -> bq_pop_cv );
92
- cv_wait_sig (& q -> bq_add_cv , & q -> bq_lock );
93
- }
94
- q -> bq_size += item_size ;
95
- list_insert_tail (& q -> bq_list , data );
96
- if (flush )
103
+ q -> bq_enqueuing_size += item_size ;
104
+ list_insert_tail (& q -> bq_enqueuing_list , data );
105
+
106
+ if (flush ||
107
+ q -> bq_enqueuing_size >= q -> bq_maxsize / q -> bq_fill_fraction ) {
108
+ /* Append the enquing list to the shared list. */
109
+ mutex_enter (& q -> bq_lock );
110
+ while (q -> bq_size > q -> bq_maxsize ) {
111
+ cv_wait_sig (& q -> bq_add_cv , & q -> bq_lock );
112
+ }
113
+ q -> bq_size += q -> bq_enqueuing_size ;
114
+ list_move_tail (& q -> bq_list , & q -> bq_enqueuing_list );
115
+ q -> bq_enqueuing_size = 0 ;
97
116
cv_broadcast (& q -> bq_pop_cv );
98
- else if (q -> bq_size >= q -> bq_maxsize / q -> bq_fill_fraction )
99
- cv_signal (& q -> bq_pop_cv );
100
- mutex_exit (& q -> bq_lock );
117
+ mutex_exit (& q -> bq_lock );
118
+ }
101
119
}
102
120
103
121
/*
@@ -115,8 +133,8 @@ bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
115
133
* Enqueue an entry, and then flush the queue. This forces the popping threads
116
134
* to wake up, even if we're below the fill fraction. We have this in a single
117
135
* function, rather than having a separate call, because it prevents race
118
- * conditions between the enqueuing thread and the dequeueing thread, where the
119
- * enqueueing thread will wake up the dequeueing thread, that thread will
136
+ * conditions between the enqueuing thread and the dequeuing thread, where the
137
+ * enqueueing thread will wake up the dequeuing thread, that thread will
120
138
* destroy the condvar before the enqueuing thread is done.
121
139
*/
122
140
void
@@ -132,27 +150,26 @@ bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
132
150
void *
133
151
bqueue_dequeue (bqueue_t * q )
134
152
{
135
- void * ret = NULL ;
136
- size_t item_size ;
137
- mutex_enter (& q -> bq_lock );
138
- while (q -> bq_size == 0 ) {
139
- cv_wait_sig (& q -> bq_pop_cv , & q -> bq_lock );
153
+ void * ret = list_remove_head (& q -> bq_dequeuing_list );
154
+ if (ret == NULL ) {
155
+ /*
156
+ * Dequeuing list is empty. Wait for there to be something on
157
+ * the shared list, then move the entire shared list to the
158
+ * dequeuing list.
159
+ */
160
+ mutex_enter (& q -> bq_lock );
161
+ while (q -> bq_size == 0 ) {
162
+ cv_wait_sig (& q -> bq_pop_cv , & q -> bq_lock );
163
+ }
164
+ ASSERT0 (q -> bq_dequeuing_size );
165
+ ASSERT (list_is_empty (& q -> bq_dequeuing_list ));
166
+ list_move_tail (& q -> bq_dequeuing_list , & q -> bq_list );
167
+ q -> bq_dequeuing_size = q -> bq_size ;
168
+ q -> bq_size = 0 ;
169
+ cv_broadcast (& q -> bq_add_cv );
170
+ mutex_exit (& q -> bq_lock );
171
+ ret = list_remove_head (& q -> bq_dequeuing_list );
140
172
}
141
- ret = list_remove_head (& q -> bq_list );
142
- ASSERT3P (ret , != , NULL );
143
- item_size = obj2node (q , ret )-> bqn_size ;
144
- q -> bq_size -= item_size ;
145
- if (q -> bq_size <= q -> bq_maxsize - (q -> bq_maxsize / q -> bq_fill_fraction ))
146
- cv_signal (& q -> bq_add_cv );
147
- mutex_exit (& q -> bq_lock );
173
+ q -> bq_dequeuing_size -= obj2node (q , ret )-> bqn_size ;
148
174
return (ret );
149
175
}
150
-
151
- /*
152
- * Returns true if the space used is 0.
153
- */
154
- boolean_t
155
- bqueue_empty (bqueue_t * q )
156
- {
157
- return (q -> bq_size == 0 );
158
- }
0 commit comments