diff --git a/taskiq_faststream/__about__.py b/taskiq_faststream/__about__.py index 67739d1..e6aa654 100644 --- a/taskiq_faststream/__about__.py +++ b/taskiq_faststream/__about__.py @@ -1,2 +1,3 @@ """FastStream - taskiq integration to schedule FastStream tasks.""" + __version__ = "0.1.8" diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f295316..1d44461 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -10,6 +10,7 @@ from taskiq.decor import AsyncTaskiqDecoratedTask from typing_extensions import TypeAlias, override +from taskiq_faststream.formatter import PatchedFormatter from taskiq_faststream.serializer import PatchedSerializer from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg @@ -34,6 +35,7 @@ class BrokerWrapper(AsyncBroker): def __init__(self, broker: Any) -> None: super().__init__() self.serializer = PatchedSerializer() + self.formatter = PatchedFormatter(self) self.broker = broker async def startup(self) -> None: diff --git a/taskiq_faststream/formatter.py b/taskiq_faststream/formatter.py new file mode 100644 index 0000000..235fbd8 --- /dev/null +++ b/taskiq_faststream/formatter.py @@ -0,0 +1,48 @@ +from typing import Any, Dict + +from taskiq.abc.broker import AsyncBroker +from taskiq.abc.formatter import TaskiqFormatter +from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate +from taskiq.message import BrokerMessage, TaskiqMessage + +if IS_PYDANTIC2: + + def model_dump(instance: Model) -> Dict[str, Any]: + """Model dump.""" + return instance.model_dump() + +else: + + def model_dump(instance: Model) -> Dict[str, Any]: + """Model dump.""" + return instance.dict() + + +class PatchedFormatter(TaskiqFormatter): + """Default taskiq formatter.""" + + def __init__(self, broker: AsyncBroker) -> None: + self.broker = broker + + def dumps(self, message: TaskiqMessage) -> BrokerMessage: + """ + Dumps taskiq message to some broker message format. + + :param message: message to send. + :return: Dumped message. + """ + return BrokerMessage( + task_id=message.task_id, + task_name=message.task_name, + message=self.broker.serializer.dumpb(model_dump(message)), + labels=message.labels, + ) + + def loads(self, message: bytes) -> TaskiqMessage: + """ + Loads json from message. + + :param message: broker's message. + :return: parsed taskiq message. + """ + return model_validate(TaskiqMessage, self.broker.serializer.loadb(message))