Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions taskiq_faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""FastStream - taskiq integration to schedule FastStream tasks."""

__version__ = "0.1.8"
2 changes: 2 additions & 0 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from taskiq.decor import AsyncTaskiqDecoratedTask
from typing_extensions import TypeAlias, override

from taskiq_faststream.formatter import PatchedFormatter
from taskiq_faststream.serializer import PatchedSerializer
from taskiq_faststream.types import ScheduledTask
from taskiq_faststream.utils import resolve_msg
Expand All @@ -34,6 +35,7 @@ class BrokerWrapper(AsyncBroker):
def __init__(self, broker: Any) -> None:
super().__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter(self)
self.broker = broker

async def startup(self) -> None:
Expand Down
48 changes: 48 additions & 0 deletions taskiq_faststream/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any, Dict

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.formatter import TaskiqFormatter
from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate
from taskiq.message import BrokerMessage, TaskiqMessage

if IS_PYDANTIC2:

def model_dump(instance: Model) -> Dict[str, Any]:
"""Model dump."""
return instance.model_dump()

else:

def model_dump(instance: Model) -> Dict[str, Any]:
"""Model dump."""
return instance.dict()


class PatchedFormatter(TaskiqFormatter):
"""Default taskiq formatter."""

def __init__(self, broker: AsyncBroker) -> None:
self.broker = broker

def dumps(self, message: TaskiqMessage) -> BrokerMessage:
"""
Dumps taskiq message to some broker message format.

:param message: message to send.
:return: Dumped message.
"""
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
message=self.broker.serializer.dumpb(model_dump(message)),
labels=message.labels,
)

def loads(self, message: bytes) -> TaskiqMessage:
"""
Loads json from message.

:param message: broker's message.
:return: parsed taskiq message.
"""
return model_validate(TaskiqMessage, self.broker.serializer.loadb(message))