Skip to content

Commit 786de9e

Browse files
committed
Batch enqueue/dequeue for bqueue
The Blocking Queue (bqueue) code is used by zfs send/receive to send messages between the various threads. It uses a shared linked list, which is locked whenever we enqueue or dequeue. For workloads which process many blocks per second, the locking on the shared list can be quite expensive. This commit changes the bqueue logic to have 3 linked lists: 1. An enquing list, which is used only by the (single) enquing thread, and thus needs no locks. 2. A shared list, with an associated lock. 3. A dequing list, which is used only by the (single) dequing thread, and thus needs no locks. The entire enquing list can be moved to the shared list in constant time, and the entire shared list can be moved to the dequing list in constant time. These operations only happen when the `fill_fraction` is reached, or on an explicit flush request. Therefore, the lock only needs to be acquired infrequently. The API already allows for dequing to block until an explicit flush, so callers don't need to be changed. Signed-off-by: Matthew Ahrens <[email protected]>
1 parent c935fe2 commit 786de9e

File tree

2 files changed

+72
-52
lines changed

2 files changed

+72
-52
lines changed

include/sys/bqueue.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,14 @@ extern "C" {
2727

2828
typedef struct bqueue {
2929
list_t bq_list;
30+
size_t bq_size;
31+
list_t bq_deqing_list;
32+
size_t bq_deqing_size;
33+
list_t bq_enqing_list;
34+
size_t bq_enqing_size;
3035
kmutex_t bq_lock;
3136
kcondvar_t bq_add_cv;
3237
kcondvar_t bq_pop_cv;
33-
size_t bq_size;
3438
size_t bq_maxsize;
3539
uint_t bq_fill_fraction;
3640
size_t bq_node_offset;
@@ -47,7 +51,6 @@ void bqueue_destroy(bqueue_t *);
4751
void bqueue_enqueue(bqueue_t *, void *, size_t);
4852
void bqueue_enqueue_flush(bqueue_t *, void *, size_t);
4953
void *bqueue_dequeue(bqueue_t *);
50-
boolean_t bqueue_empty(bqueue_t *);
5154

5255
#ifdef __cplusplus
5356
}

module/zfs/bqueue.c

+67-50
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,26 @@ obj2node(bqueue_t *q, void *data)
2727

2828
/*
2929
* Initialize a blocking queue The maximum capacity of the queue is set to
30-
* size. Types that are stored in a bqueue must contain a bqueue_node_t,
31-
* and node_offset must be its offset from the start of the struct.
32-
* fill_fraction is a performance tuning value; when the queue is full, any
33-
* threads attempting to enqueue records will block. They will block until
34-
* they're signaled, which will occur when the queue is at least 1/fill_fraction
30+
* size. Types that are stored in a bqueue must contain a bqueue_node_t, and
31+
* node_offset must be its offset from the start of the struct. fill_fraction
32+
* is a performance tuning value; when the queue is full, any threads
33+
* attempting to enqueue records will block. They will block until they're
34+
* signaled, which will occur when the queue is at least 1/fill_fraction
3535
* empty. Similar behavior occurs on dequeue; if the queue is empty, threads
36-
* block. They will be signalled when the queue has 1/fill_fraction full, or
37-
* when bqueue_flush is called. As a result, you must call bqueue_flush when
38-
* you enqueue your final record on a thread, in case the dequeueing threads are
39-
* currently blocked and that enqueue does not cause them to be awoken.
40-
* Alternatively, this behavior can be disabled (causing signaling to happen
41-
* immediately) by setting fill_fraction to any value larger than size.
42-
* Return 0 on success, or -1 on failure.
36+
* block. They will be signalled when the queue has 1/fill_fraction full.
37+
* As a result, you must call bqueue_enqueue_flush() when you enqueue your
38+
* final record on a thread, in case the dequeuing threads are currently
39+
* blocked and that enqueue does not cause them to be woken. Alternatively,
40+
* this behavior can be disabled (causing signaling to happen immediately) by
41+
* setting fill_fraction to any value larger than size. Return 0 on success,
42+
* or -1 on failure.
43+
*
44+
* Note: The caller must ensure that for a given bqueue_t, there's only a
45+
* single call to bqueue_enqueue() running at a time (e.g. by calling only
46+
* from a single thread, or with locking around the call). Similarly, the
47+
* caller must ensure that there's only a single call to bqueue_dequeue()
48+
* running at a time. However, the one call to bqueue_enqueue() may be
49+
* invoked concurrently with the one call to bqueue_dequeue().
4350
*/
4451
int
4552
bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
@@ -49,11 +56,17 @@ bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
4956
}
5057
list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
5158
node_offset + offsetof(bqueue_node_t, bqn_node));
59+
list_create(&q->bq_deqing_list, node_offset + sizeof (bqueue_node_t),
60+
node_offset + offsetof(bqueue_node_t, bqn_node));
61+
list_create(&q->bq_enqing_list, node_offset + sizeof (bqueue_node_t),
62+
node_offset + offsetof(bqueue_node_t, bqn_node));
5263
cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
5364
cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
5465
mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
5566
q->bq_node_offset = node_offset;
5667
q->bq_size = 0;
68+
q->bq_deqing_size = 0;
69+
q->bq_enqing_size = 0;
5770
q->bq_maxsize = size;
5871
q->bq_fill_fraction = fill_fraction;
5972
return (0);
@@ -69,9 +82,13 @@ bqueue_destroy(bqueue_t *q)
6982
{
7083
mutex_enter(&q->bq_lock);
7184
ASSERT0(q->bq_size);
85+
ASSERT0(q->bq_deqing_size);
86+
ASSERT0(q->bq_enqing_size);
7287
cv_destroy(&q->bq_add_cv);
7388
cv_destroy(&q->bq_pop_cv);
7489
list_destroy(&q->bq_list);
90+
list_destroy(&q->bq_deqing_list);
91+
list_destroy(&q->bq_enqing_list);
7592
mutex_exit(&q->bq_lock);
7693
mutex_destroy(&q->bq_lock);
7794
}
@@ -81,23 +98,24 @@ bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
8198
{
8299
ASSERT3U(item_size, >, 0);
83100
ASSERT3U(item_size, <=, q->bq_maxsize);
84-
mutex_enter(&q->bq_lock);
101+
85102
obj2node(q, data)->bqn_size = item_size;
86-
while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) {
87-
/*
88-
* Wake up bqueue_dequeue() thread if already sleeping in order
89-
* to prevent the deadlock condition
90-
*/
91-
cv_signal(&q->bq_pop_cv);
92-
cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
93-
}
94-
q->bq_size += item_size;
95-
list_insert_tail(&q->bq_list, data);
96-
if (flush)
103+
q->bq_enqing_size += item_size;
104+
list_insert_tail(&q->bq_enqing_list, data);
105+
106+
if (flush ||
107+
q->bq_enqing_size >= q->bq_maxsize / q->bq_fill_fraction) {
108+
/* Append the enquing list to the shared list. */
109+
mutex_enter(&q->bq_lock);
110+
while (q->bq_size > q->bq_maxsize) {
111+
cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
112+
}
113+
q->bq_size += q->bq_enqing_size;
114+
list_move_tail(&q->bq_list, &q->bq_enqing_list);
115+
q->bq_enqing_size = 0;
97116
cv_broadcast(&q->bq_pop_cv);
98-
else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
99-
cv_signal(&q->bq_pop_cv);
100-
mutex_exit(&q->bq_lock);
117+
mutex_exit(&q->bq_lock);
118+
}
101119
}
102120

103121
/*
@@ -115,8 +133,8 @@ bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
115133
* Enqueue an entry, and then flush the queue. This forces the popping threads
116134
* to wake up, even if we're below the fill fraction. We have this in a single
117135
* function, rather than having a separate call, because it prevents race
118-
* conditions between the enqueuing thread and the dequeueing thread, where the
119-
* enqueueing thread will wake up the dequeueing thread, that thread will
136+
* conditions between the enqueuing thread and the dequeuing thread, where the
137+
* enqueueing thread will wake up the dequeuing thread, that thread will
120138
* destroy the condvar before the enqueuing thread is done.
121139
*/
122140
void
@@ -132,27 +150,26 @@ bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
132150
void *
133151
bqueue_dequeue(bqueue_t *q)
134152
{
135-
void *ret = NULL;
136-
size_t item_size;
137-
mutex_enter(&q->bq_lock);
138-
while (q->bq_size == 0) {
139-
cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
153+
void *ret = list_remove_head(&q->bq_deqing_list);
154+
if (ret == NULL) {
155+
/*
156+
* Dequeuing list is empty. Wait for there to be something on
157+
* the shared list, then move the entire shared list to the
158+
* dequeuing list.
159+
*/
160+
mutex_enter(&q->bq_lock);
161+
while (q->bq_size == 0) {
162+
cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
163+
}
164+
ASSERT0(q->bq_deqing_size);
165+
ASSERT(list_is_empty(&q->bq_deqing_list));
166+
list_move_tail(&q->bq_deqing_list, &q->bq_list);
167+
q->bq_deqing_size = q->bq_size;
168+
q->bq_size = 0;
169+
cv_broadcast(&q->bq_add_cv);
170+
mutex_exit(&q->bq_lock);
171+
ret = list_remove_head(&q->bq_deqing_list);
140172
}
141-
ret = list_remove_head(&q->bq_list);
142-
ASSERT3P(ret, !=, NULL);
143-
item_size = obj2node(q, ret)->bqn_size;
144-
q->bq_size -= item_size;
145-
if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
146-
cv_signal(&q->bq_add_cv);
147-
mutex_exit(&q->bq_lock);
173+
q->bq_deqing_size -= obj2node(q, ret)->bqn_size;
148174
return (ret);
149175
}
150-
151-
/*
152-
* Returns true if the space used is 0.
153-
*/
154-
boolean_t
155-
bqueue_empty(bqueue_t *q)
156-
{
157-
return (q->bq_size == 0);
158-
}

0 commit comments

Comments
 (0)