1616)
1717
1818from ...driver import DriverProtocol , ExecutorProtocol , GetParams , JobInsertParams
19- from ...model import Job
19+ from ...model import Job , JobState
2020from .dbsqlc import models , river_job , pg_misc
2121
2222
@@ -30,11 +30,13 @@ async def advisory_lock(self, key: int) -> None:
3030 await self .pg_misc_querier .pg_advisory_xact_lock (key = key )
3131
3232 async def job_insert (self , insert_params : JobInsertParams ) -> Job :
33- return cast (
34- Job ,
35- await self .job_querier .job_insert_fast (
36- cast (river_job .JobInsertFastParams , insert_params )
37- ),
33+ return _job_from_row (
34+ cast ( # drop Optional[] because insert always returns a row
35+ models .RiverJob ,
36+ await self .job_querier .job_insert_fast (
37+ cast (river_job .JobInsertFastParams , insert_params )
38+ ),
39+ )
3840 )
3941
4042 async def job_insert_many (self , all_params : list [JobInsertParams ]) -> int :
@@ -46,12 +48,10 @@ async def job_insert_many(self, all_params: list[JobInsertParams]) -> int:
4648 async def job_get_by_kind_and_unique_properties (
4749 self , get_params : GetParams
4850 ) -> Optional [Job ]:
49- return cast (
50- Optional [Job ],
51- await self .job_querier .job_get_by_kind_and_unique_properties (
52- cast (river_job .JobGetByKindAndUniquePropertiesParams , get_params )
53- ),
51+ row = await self .job_querier .job_get_by_kind_and_unique_properties (
52+ cast (river_job .JobGetByKindAndUniquePropertiesParams , get_params )
5453 )
54+ return _job_from_row (row ) if row else None
5555
5656 @asynccontextmanager
5757 async def transaction (self ) -> AsyncGenerator :
@@ -91,10 +91,12 @@ def advisory_lock(self, key: int) -> None:
9191 self .pg_misc_querier .pg_advisory_xact_lock (key = key )
9292
9393 def job_insert (self , insert_params : JobInsertParams ) -> Job :
94- return cast (
95- Job ,
96- self .job_querier .job_insert_fast (
97- cast (river_job .JobInsertFastParams , insert_params )
94+ return _job_from_row (
95+ cast ( # drop Optional[] because insert always returns a row
96+ models .RiverJob ,
97+ self .job_querier .job_insert_fast (
98+ cast (river_job .JobInsertFastParams , insert_params )
99+ ),
98100 ),
99101 )
100102
@@ -105,12 +107,10 @@ def job_insert_many(self, all_params: list[JobInsertParams]) -> int:
105107 def job_get_by_kind_and_unique_properties (
106108 self , get_params : GetParams
107109 ) -> Optional [Job ]:
108- return cast (
109- Optional [Job ],
110- self .job_querier .job_get_by_kind_and_unique_properties (
111- cast (river_job .JobGetByKindAndUniquePropertiesParams , get_params )
112- ),
110+ row = self .job_querier .job_get_by_kind_and_unique_properties (
111+ cast (river_job .JobGetByKindAndUniquePropertiesParams , get_params )
113112 )
113+ return _job_from_row (row ) if row else None
114114
115115 @contextmanager
116116 def transaction (self ) -> Iterator [None ]:
@@ -169,3 +169,28 @@ def _build_insert_many_params(
169169 insert_many_params .tags .append ("," .join (insert_params .tags ))
170170
171171 return insert_many_params
172+
173+
174+ def _job_from_row (row : models .RiverJob ) -> Job :
175+ return Job (
176+ id = row .id ,
177+ args = row .args ,
178+ attempt = row .attempt ,
179+ attempted_at = row .attempted_at .astimezone (timezone .utc )
180+ if row .attempted_at
181+ else None ,
182+ attempted_by = row .attempted_by ,
183+ created_at = row .created_at .astimezone (timezone .utc ),
184+ errors = row .errors ,
185+ finalized_at = row .finalized_at .astimezone (timezone .utc )
186+ if row .finalized_at
187+ else None ,
188+ kind = row .kind ,
189+ max_attempts = row .max_attempts ,
190+ metadata = row .metadata ,
191+ priority = row .priority ,
192+ queue = row .queue ,
193+ state = cast (JobState , row .state ),
194+ scheduled_at = row .scheduled_at .astimezone (timezone .utc ),
195+ tags = row .tags ,
196+ )
0 commit comments