Skip to content

fix: prevent deadlock when main thread puts on full queue#977

Open
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1775123605-deadlock-fix-nonblocking-put
Open

fix: prevent deadlock when main thread puts on full queue#977
devin-ai-integration[bot] wants to merge 1 commit intomainfrom
devin/1775123605-deadlock-fix-nonblocking-put

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

Fixes a deadlock in the concurrent source where the main thread (sole queue consumer) blocks on queue.put() when the shared queue is full. Since the main thread is the only consumer, it can never unblock itself — classic self-deadlock.

Root cause: Three code paths in _handle_item cause the main thread to produce into its own queue via ConcurrentMessageRepository.emit_message()queue.put():

  1. PartitionCompleteSentinel_on_stream_is_doneensure_at_least_one_state_emittedqueue.put(state)
  2. PartitionGenerationCompletedSentinel → same path
  3. Partitionon_partitionemit_message(slice_log)queue.put(log)

Fix: The main thread now uses put(block=False). If the queue is Full, the message is buffered in a deque (_pending) and drained via consume_queue(), which the main thread already calls after processing every queue item. Worker threads are unchanged (blocking put() for backpressure). Thread detection uses threading.get_ident() captured at construction time.

Review & Testing Checklist for Human

  • Verify _pending is main-thread-only: _put_on_queue only appends to _pending when get_ident() == _consumer_thread_id, and consume_queue() is only called from the main thread's processing loop. If this invariant holds, no lock is needed. If a worker thread could ever call consume_queue(), this would be a data race.
  • Verify consume_queue() is called frequently enough: Previously a no-op (yield from []), it now yields buffered messages. Confirm callers in concurrent_read_processor.py (on_record, on_partition_complete_sentinel, _on_stream_is_done) all do yield from self._message_repository.consume_queue() so buffered messages are drained promptly.
  • No new tests for the deadlock scenario: Existing tests pass but none exercise a full-queue + main-thread-emit path. Consider adding a test that fills the queue, calls emit_message from the "main" thread, and verifies the message lands in _pending and is yielded by consume_queue().
  • Validate with a real sync: Run a HubSpot sync (or similar high-volume concurrent source) that previously deadlocked, and confirm it completes. This was the original reproducer (job 77457394).

Notes

  • threading.get_ident() is captured once in __init__, which is assumed to always run on the main thread. This is true today because ConcurrentSource.__init__ and the declarative source constructor both run on the main thread.
  • The deque thread-safety comment references CPython's GIL, but in practice _pending is only accessed from a single thread (main), so it's safe regardless of GIL guarantees.
  • consume_queue() was previously a dead no-op. It now has real behavior — any code that calls it will start receiving buffered messages, which is the intended design.

Link to Devin session: https://app.devin.ai/sessions/ad184113df474f0ba37ede09cdac7eaf

The main thread is the sole consumer of the shared queue. In 3 code
paths from _handle_item, it also produces into the queue via
ConcurrentMessageRepository.emit_message() -> queue.put(). When the
queue is full, this blocks forever — the main thread deadlocks on
its own queue.

Fix: detect when the caller is the main thread (via threading.get_ident)
and use non-blocking put(block=False). If the queue is Full, buffer the
message in a deque. Buffered messages are drained via consume_queue(),
which the main thread already calls after processing every queue item.

Worker threads are unchanged — they still use blocking put() for
normal backpressure.

Deadlock paths fixed:
1. PartitionCompleteSentinel -> _on_stream_is_done -> ensure_at_least_one_state_emitted -> emit_message -> queue.put(state)
2. PartitionGenerationCompletedSentinel -> _on_stream_is_done -> same
3. Partition -> on_partition -> emit_message(slice_log) -> queue.put(log)

Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1775123605-deadlock-fix-nonblocking-put#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1775123605-deadlock-fix-nonblocking-put

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Fast)

3 989 tests  ±0   3 978 ✅ ±0   7m 27s ⏱️ +20s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0344741. ± Comparison against base commit 0b94cbe.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

PyTest Results (Full)

3 992 tests  ±0   3 980 ✅ ±0   11m 12s ⏱️ -1s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 0344741. ± Comparison against base commit 0b94cbe.

@tolik0
Copy link
Copy Markdown
Contributor

Anatolii Yatsuk (tolik0) commented Apr 3, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/23950401533

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants