Optimize ListRedisScheduleSource: replace SCAN with sorted set time index#121
Optimize ListRedisScheduleSource: replace SCAN with sorted set time index#121tmkarthi wants to merge 5 commits intotaskiq-python:mainfrom
Conversation
- Introduced `populate_time_index` parameter to backfill the time index from existing keys. - Updated `startup` method to populate the time index if `populate_time_index` is set to True. - Modified schedule addition and deletion to manage the time index sorted set. - Added tests to verify time index population and cleanup behavior.
- Added `_maybe_cleanup_time_index` method to manage time index cleanup at most once per minute. - Introduced `_cleanup_time_index` method to remove stale entries older than one hour with empty time key lists. - Updated `delete_schedule` to call `_maybe_cleanup_time_index` for efficient cleanup. - Enhanced tests to verify the behavior of the new cleanup methods, ensuring proper handling of stale and recent entries.
…ameter - Updated the _get_previous_time_schedules method to take current_time as an argument, allowing for more precise cutoff calculations. - Adjusted the logic to use the provided current_time for determining previous schedules, ensuring no overlap with the current window. - Modified the call to _get_previous_time_schedules in the first run logic to pass the current_time parameter.
- Removed the `_is_first_run` flag to simplify the schedule fetching logic. - Updated `get_schedules` to fetch past time schedules on every call, ensuring no schedules are missed within the current minute and previous minute. - Enhanced documentation to reflect the new behavior of schedule retrieval.
- Expanded README to include details on interval tasks and the new `{prefix}:time_index` sorted set for tracking schedules.
- Updated cleanup logic in `ListRedisScheduleSource` to remove stale entries older than 5 minutes instead of 1 hour.
- Modified tests to reflect the new 5-minute threshold for cleanup, ensuring accurate verification of the time index behavior.
|
@s3rius Any update on this? |
There was a problem hiding this comment.
First of all. Thanks for you patience. It took me a while to get to your PR.
I really like the solution. It's elegant, simple and yet robust.
I only have a single comment about simplifying migration for people after upgrading their taskiq-redis. However, it's a complete minor thing.
Also, I noticed that you use floats for scores. I'm not sure if turning scores into ints would make a big of a change. But could be, since floats are known to worse at comparison and take a bit more memory than ints. Of course it all depends on how redis implements score type.
| await self._previous_schedule_source.shutdown() | ||
| logger.info("Migration complete") | ||
|
|
||
| if self._populate_time_index: |
There was a problem hiding this comment.
I think that this can be calculated automatically.
if you were using previous version of this schedule source, you are 100% don't have this key set.
We can just check if the key for time_index is present and in case if it's empty, build the initial index. This will allow us to remove the new argument and make a seamless transition.
There was a problem hiding this comment.
Pull request overview
This PR optimizes ListRedisScheduleSource’s handling of time-based schedules by introducing a Redis sorted-set “time index” to avoid keyspace-wide SCANs when discovering past-due time buckets, and adds a migration/backfill option plus documentation and tests.
Changes:
- Add
{prefix}:time_indexsorted set and switch past schedule discovery fromSCANtoZRANGEBYSCORE. - Maintain the time index on
add_schedule, and lazily clean stale empty entries fromdelete_schedulewith rate limiting. - Add
populate_time_indexto backfill the index on startup for upgrades; update README and expand test coverage.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
taskiq_redis/list_schedule_source.py |
Implements the time index, backfill-on-startup option, and lazy cleanup; adjusts past schedule fetching behavior. |
tests/test_list_schedule_source.py |
Adds tests for time index maintenance, lazy cleanup semantics, backfill behavior, and rate limiting. |
README.md |
Documents the new Redis keys and provides a populate_time_index migration guide. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._last_cleanup_time = now | ||
| await self._cleanup_time_index(redis) |
There was a problem hiding this comment.
_maybe_cleanup_time_index updates _last_cleanup_time before running _cleanup_time_index. If cleanup raises (network hiccup, Redis error), the rate limiter will suppress cleanup retries for 60s even though no cleanup actually happened. Consider only updating _last_cleanup_time after a successful cleanup (or resetting it on exception).
| self._last_cleanup_time = now | |
| await self._cleanup_time_index(redis) | |
| await self._cleanup_time_index(redis) | |
| self._last_cleanup_time = now |
Summary
Test plan