-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathcodec.py
More file actions
30 lines (24 loc) · 897 Bytes
/
codec.py
File metadata and controls
30 lines (24 loc) · 897 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import gzip
from collections.abc import Iterable
from typing import List
from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec
class CompressionCodec(PayloadCodec):
"""Gzip-based payload codec."""
ENCODING = b"binary/gzip"
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
return [
Payload(
metadata={"encoding": self.ENCODING},
data=gzip.compress(p.SerializeToString()),
)
for p in payloads
]
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
result: List[Payload] = []
for p in payloads:
if p.metadata.get("encoding") == self.ENCODING:
result.append(Payload.FromString(gzip.decompress(p.data)))
else:
result.append(p)
return result