fix(bthread): refactor sharded priority queue with lock-free MPSC inb…#3270
fix(bthread): refactor sharded priority queue with lock-free MPSC inb…#3270yannan-wyn wants to merge 3 commits intoapache:masterfrom
Conversation
…ound Replace single WorkStealingQueue priority path with per-tag sharded PriorityShard (MPSCQueue inbound + WSQ + atomic owner lifecycle). Steal_task includes salvage logic for ownerless shards to prevent task starvation. Gated by FLAGS_enable_bthread_priority_queue.
There was a problem hiding this comment.
Pull request overview
Refactors the bthread “global priority” scheduling path to avoid unsafe multi-producer use of WorkStealingQueue by introducing a sharded design with an inbound lock-free queue per shard, plus adds new unit tests and a microbenchmark.
Changes:
- Introduces
PriorityShardper tag and routes priority submissions via shard inbound queues with owner bind/unbind lifecycle. - Updates scheduling/stealing logic to prefer owner shards, steal across shards, and provide fallback behavior during teardown.
- Adds unit tests for correctness/concurrency/owner changes and a microbenchmark suite for the new primitives.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| test/bthread_priority_queue_unittest.cpp | Adds correctness + concurrent-producer tests for priority queue behavior. |
| test/bthread_priority_queue_owner_unittest.cpp | Adds tests around dynamic worker/owner binding behavior under load. |
| test/bthread_priority_queue_benchmark.cpp | Adds microbenchmarks for inbound enqueue/dequeue, flush, steal, and baseline comparisons. |
| src/bthread/task_group.h | Adds _priority_shard_index for O(1) lookup of a TaskGroup’s owned shard. |
| src/bthread/task_control.h | Adds PriorityShard definition and TaskControl helper APIs for shard management. |
| src/bthread/task_control.cpp | Implements sharded priority queue logic, owner lifecycle, flushing, stealing, and fallback enqueue behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Test 4: Priority tasks submitted with only 1 shard (degenerate case). | ||
| // Verifies correctness when nshard=1. | ||
| TEST_F(PriorityQueueTest, single_shard_correctness) { | ||
| // This test relies on FLAGS_priority_queue_shards being set before | ||
| // TaskControl init. Since TaskControl is already initialized by the | ||
| // time we run, we test with whatever shard count is configured. |
There was a problem hiding this comment.
This test is named/annotated as a single-shard correctness test, but it never sets FLAGS_priority_queue_shards to 1 (the main sets it to 4), so it doesn’t actually exercise the degenerate nshard=1 case. Either set priority_queue_shards=1 before any bthread/TaskControl initialization (e.g., in main) or rename/update the test to reflect what it truly verifies.
| // Test 4: Priority tasks submitted with only 1 shard (degenerate case). | |
| // Verifies correctness when nshard=1. | |
| TEST_F(PriorityQueueTest, single_shard_correctness) { | |
| // This test relies on FLAGS_priority_queue_shards being set before | |
| // TaskControl init. Since TaskControl is already initialized by the | |
| // time we run, we test with whatever shard count is configured. | |
| // Test 4: Priority tasks submitted under the configured shard count. | |
| // Verifies correctness and that no priority tasks are lost. | |
| TEST_F(PriorityQueueTest, configured_shards_correctness) { | |
| // FLAGS_priority_queue_shards must be set before TaskControl init. | |
| // This test therefore validates behavior with whatever shard count | |
| // was configured in main() for the test binary. |
| #include <vector> | ||
| #include <set> | ||
| #include <mutex> | ||
| #include <thread> |
There was a problem hiding this comment.
This file uses usleep() and sched_yield() but doesn’t include the headers that declare them. Please include <unistd.h> (for usleep) and <sched.h> (for sched_yield) to avoid relying on transitive includes and to prevent build failures on stricter toolchains.
| #include <thread> | |
| #include <thread> | |
| #include <unistd.h> | |
| #include <sched.h> |
| for (size_t i = 0; i < nshard; ++i) { | ||
| size_t idx = (start + i) % nshard; | ||
| PriorityShard* shard = shards[idx].get(); | ||
| if (shard->owner.load(butil::memory_order_relaxed) == NULL && | ||
| !shard->draining.load(butil::memory_order_relaxed)) { | ||
| bthread_t salvaged; | ||
| if (shard->inbound.Dequeue(salvaged)) { | ||
| fallback_enqueue(tag, salvaged); |
There was a problem hiding this comment.
In the salvage path you call MPSCQueue::Dequeue from steal_task(). butil::MPSCQueue is explicitly single-consumer, but multiple workers can execute steal_task() concurrently, which can lead to data races/corruption when more than one thread dequeues from the same inbound queue. Consider removing this salvage dequeue, switching inbound to an MPMC structure, or guarding salvage draining with a per-shard CAS/lock so only one thread ever dequeues at a time (and keep dequeue ownership consistent with the queue’s single-consumer contract).
| for (size_t i = 0; i < nshard; ++i) { | |
| size_t idx = (start + i) % nshard; | |
| PriorityShard* shard = shards[idx].get(); | |
| if (shard->owner.load(butil::memory_order_relaxed) == NULL && | |
| !shard->draining.load(butil::memory_order_relaxed)) { | |
| bthread_t salvaged; | |
| if (shard->inbound.Dequeue(salvaged)) { | |
| fallback_enqueue(tag, salvaged); | |
| // | |
| // `inbound` is a single-consumer queue, so salvage must first claim | |
| // exclusive drain ownership. Multiple workers can execute steal_task() | |
| // concurrently, and relaxed loads alone are not sufficient to prevent | |
| // two workers from dequeuing from the same inbound queue at once. | |
| for (size_t i = 0; i < nshard; ++i) { | |
| size_t idx = (start + i) % nshard; | |
| PriorityShard* shard = shards[idx].get(); | |
| if (shard->owner.load(butil::memory_order_relaxed) == NULL) { | |
| bool expected = false; | |
| if (shard->draining.compare_exchange_strong( | |
| expected, true, | |
| butil::memory_order_acquire, | |
| butil::memory_order_relaxed)) { | |
| bthread_t salvaged; | |
| if (shard->owner.load(butil::memory_order_relaxed) == NULL && | |
| shard->inbound.Dequeue(salvaged)) { | |
| fallback_enqueue(tag, salvaged); | |
| } | |
| shard->draining.store(false, butil::memory_order_release); |
There was a problem hiding this comment.
enhance the MPSC with CAS
and by the way MPMC maybe too heavy
| // All shards ownerless, fallback to round-robin pick | ||
| shards[start]->inbound.Enqueue(tid); |
There was a problem hiding this comment.
When no shard has an active owner, this code still enqueues into shards[start]->inbound. With an ownerless shard there may be no single designated consumer to flush inbound, and the current workaround relies on salvage dequeues in steal_task() (which breaks MPSC’s single-consumer rule). Safer options: (1) if no owned shard is available, immediately fallback_enqueue(tag, tid); or (2) introduce a single-consumer handoff mechanism (e.g., temporarily claim a shard for draining) before allowing enqueues to ownerless shards.
| // All shards ownerless, fallback to round-robin pick | |
| shards[start]->inbound.Enqueue(tid); | |
| // No shard has an active owner, so avoid enqueueing into an ownerless | |
| // shard-local inbound queue with no designated single consumer. | |
| fallback_enqueue(tag, tid); |
There was a problem hiding this comment.
if no active owner , just fallback to regular queue
|
@wwbmmm |
|
LGTM |
What problem does this PR solve?
Problem Summary:
The original priority queue implementation misuses
WorkStealingQueue— multiple producers callpush()concurrently, butpush()is designed for single-owner use only. This leads to potential data races under contention.See also: #2819, #3055, #3078, #3096
What is changed and the side effects?
Changed:
WorkStealingQueueper tag with a sharded-mode design (which is PriorityShard), each shard containing:butil::MPSCQueueinbound (lock-free) for external producersWorkStealingQueuefor owner flush/pop and non-owner stealBTHREAD_GLOBAL_PRIORITYand re-enqueue to normal_remote_rqduring shard teardown_priority_shard_indexfield toTaskGroupfor O(1) owner shard lookupSide effects:
Performance effects:
_remote_rq~1050ns (3-4x faster)Breaking backward compatibility:
FLAGS_enable_bthread_priority_queue(default false).