-
Notifications
You must be signed in to change notification settings - Fork 32
Optimize ListRedisScheduleSource: replace SCAN with sorted set time index #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fad3046
da282d3
7934072
384c862
0ed6d06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,5 @@ | ||||||||||
| import datetime | ||||||||||
| import time as _time | ||||||||||
| from logging import getLogger | ||||||||||
| from typing import Any | ||||||||||
|
|
||||||||||
|
|
@@ -23,6 +24,7 @@ def __init__( | |||||||||
| serializer: TaskiqSerializer | None = None, | ||||||||||
| buffer_size: int = 50, | ||||||||||
| skip_past_schedules: bool = False, | ||||||||||
| populate_time_index: bool = False, | ||||||||||
| **connection_kwargs: Any, | ||||||||||
| ) -> None: | ||||||||||
| """ | ||||||||||
|
|
@@ -34,6 +36,11 @@ def __init__( | |||||||||
| :param serializer: Serializer to use for the schedules | ||||||||||
| :param buffer_size: Buffer size for getting schedules | ||||||||||
| :param skip_past_schedules: Skip schedules that are in the past. | ||||||||||
| :param populate_time_index: If True, on startup run a one-time SCAN | ||||||||||
| to populate the time index sorted set from existing time keys. | ||||||||||
| This is needed for migrating from an older version that did not | ||||||||||
| maintain the time index. Set this to True once to backfill the | ||||||||||
| index, then set it back to False for subsequent runs. | ||||||||||
| :param connection_kwargs: Additional connection kwargs | ||||||||||
| """ | ||||||||||
| super().__init__() | ||||||||||
|
|
@@ -47,10 +54,11 @@ def __init__( | |||||||||
| if serializer is None: | ||||||||||
| serializer = PickleSerializer() | ||||||||||
| self._serializer = serializer | ||||||||||
| self._is_first_run = True | ||||||||||
| self._previous_schedule_source: ScheduleSource | None = None | ||||||||||
| self._delete_schedules_after_migration: bool = True | ||||||||||
| self._skip_past_schedules = skip_past_schedules | ||||||||||
| self._populate_time_index = populate_time_index | ||||||||||
| self._last_cleanup_time: float = 0 | ||||||||||
|
|
||||||||||
| async def startup(self) -> None: | ||||||||||
| """ | ||||||||||
|
|
@@ -59,6 +67,9 @@ async def startup(self) -> None: | |||||||||
| By default this function does nothing. | ||||||||||
| But if the previous schedule source is set, | ||||||||||
| it will try to migrate schedules from it. | ||||||||||
|
|
||||||||||
| If populate_time_index is True, it will scan for existing | ||||||||||
| time keys and populate the time index sorted set. | ||||||||||
| """ | ||||||||||
| if self._previous_schedule_source is not None: | ||||||||||
| logger.info("Migrating schedules from previous source") | ||||||||||
|
|
@@ -74,13 +85,36 @@ async def startup(self) -> None: | |||||||||
| await self._previous_schedule_source.shutdown() | ||||||||||
| logger.info("Migration complete") | ||||||||||
|
|
||||||||||
| if self._populate_time_index: | ||||||||||
| logger.info("Populating time index from existing keys via scan") | ||||||||||
| async with Redis(connection_pool=self._connection_pool) as redis: | ||||||||||
| batch: dict[str, float] = {} | ||||||||||
| async for key in redis.scan_iter(f"{self._prefix}:time:*"): | ||||||||||
| key_str = key.decode() | ||||||||||
| key_time = self._parse_time_key(key_str) | ||||||||||
| if key_time: | ||||||||||
| batch[key_str] = key_time.timestamp() | ||||||||||
| if len(batch) >= self._buffer_size: | ||||||||||
| await redis.zadd( | ||||||||||
| self._get_time_index_key(), | ||||||||||
| batch, | ||||||||||
| ) | ||||||||||
| batch = {} | ||||||||||
| if batch: | ||||||||||
| await redis.zadd(self._get_time_index_key(), batch) | ||||||||||
| logger.info("Time index population complete") | ||||||||||
|
|
||||||||||
| def _get_time_key(self, time: datetime.datetime) -> str: | ||||||||||
| """Get the key for a time-based schedule.""" | ||||||||||
| if time.tzinfo is None: | ||||||||||
| time = time.replace(tzinfo=datetime.timezone.utc) | ||||||||||
| iso_time = time.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M") | ||||||||||
| return f"{self._prefix}:time:{iso_time}" | ||||||||||
|
|
||||||||||
| def _get_time_index_key(self) -> str: | ||||||||||
| """Get the key for the time index sorted set.""" | ||||||||||
| return f"{self._prefix}:time_index" | ||||||||||
|
|
||||||||||
| def _get_cron_key(self) -> str: | ||||||||||
| """Get the key for a cron-based schedule.""" | ||||||||||
| return f"{self._prefix}:cron" | ||||||||||
|
|
@@ -103,35 +137,78 @@ def _parse_time_key(self, key: str) -> datetime.datetime | None: | |||||||||
| logger.debug("Failed to parse time key %s", key) | ||||||||||
| return None | ||||||||||
|
|
||||||||||
| async def _get_previous_time_schedules(self) -> list[bytes]: | ||||||||||
| async def _maybe_cleanup_time_index(self, redis: Redis) -> None: # type: ignore[type-arg] | ||||||||||
| """ | ||||||||||
| Run time index cleanup at most once per minute. | ||||||||||
|
|
||||||||||
| Called from delete_schedule after removing a time-based schedule, | ||||||||||
| since that's the path where time key lists become empty. | ||||||||||
| """ | ||||||||||
| now = _time.monotonic() | ||||||||||
| if now - self._last_cleanup_time < 60: | ||||||||||
| return | ||||||||||
| self._last_cleanup_time = now | ||||||||||
| await self._cleanup_time_index(redis) | ||||||||||
|
Comment on lines
+150
to
+151
|
||||||||||
| self._last_cleanup_time = now | |
| await self._cleanup_time_index(redis) | |
| await self._cleanup_time_index(redis) | |
| self._last_cleanup_time = now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this can be calculated automatically.
if you were using previous version of this schedule source, you are 100% don't have this key set.
We can just check if the key for time_index is present and in case if it's empty, build the initial index. This will allow us to remove the new argument and make a seamless transition.