Skip to content

Commit a369184

Browse files
committed
Test that checks that AttemptError can round trip properly
Follows up #27 to add a more elaborate test that checks that non-default job properties can unmarshal to `Job` property including `errors, which includes `AttemptError`. It turned out that of course this wasn't working properly.
1 parent 44ed566 commit a369184

5 files changed

Lines changed: 233 additions & 14 deletions

File tree

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,59 @@ class JobInsertFastManyParams:
134134
tags: List[str]
135135

136136

137+
JOB_INSERT_FULL = """-- name: job_insert_full \\:one
138+
INSERT INTO river_job(
139+
args,
140+
attempt,
141+
attempted_at,
142+
created_at,
143+
errors,
144+
finalized_at,
145+
kind,
146+
max_attempts,
147+
metadata,
148+
priority,
149+
queue,
150+
scheduled_at,
151+
state,
152+
tags
153+
) VALUES (
154+
:p1\\:\\:jsonb,
155+
coalesce(:p2\\:\\:smallint, 0),
156+
:p3,
157+
coalesce(:p4\\:\\:timestamptz, now()),
158+
:p5\\:\\:jsonb[],
159+
:p6,
160+
:p7\\:\\:text,
161+
:p8\\:\\:smallint,
162+
coalesce(:p9\\:\\:jsonb, '{}'),
163+
:p10\\:\\:smallint,
164+
:p11\\:\\:text,
165+
coalesce(:p12\\:\\:timestamptz, now()),
166+
:p13\\:\\:river_job_state,
167+
coalesce(:p14\\:\\:varchar(255)[], '{}')
168+
) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
169+
"""
170+
171+
172+
@dataclasses.dataclass()
173+
class JobInsertFullParams:
174+
args: Any
175+
attempt: int
176+
attempted_at: Optional[datetime.datetime]
177+
created_at: Optional[datetime.datetime]
178+
errors: List[Any]
179+
finalized_at: Optional[datetime.datetime]
180+
kind: str
181+
max_attempts: int
182+
metadata: Any
183+
priority: int
184+
queue: str
185+
scheduled_at: Optional[datetime.datetime]
186+
state: models.RiverJobState
187+
tags: List[str]
188+
189+
137190
class Querier:
138191
def __init__(self, conn: sqlalchemy.engine.Connection):
139192
self._conn = conn
@@ -266,6 +319,44 @@ def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
266319
})
267320
return result.rowcount
268321

322+
def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
323+
row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
324+
"p1": arg.args,
325+
"p2": arg.attempt,
326+
"p3": arg.attempted_at,
327+
"p4": arg.created_at,
328+
"p5": arg.errors,
329+
"p6": arg.finalized_at,
330+
"p7": arg.kind,
331+
"p8": arg.max_attempts,
332+
"p9": arg.metadata,
333+
"p10": arg.priority,
334+
"p11": arg.queue,
335+
"p12": arg.scheduled_at,
336+
"p13": arg.state,
337+
"p14": arg.tags,
338+
}).first()
339+
if row is None:
340+
return None
341+
return models.RiverJob(
342+
id=row[0],
343+
args=row[1],
344+
attempt=row[2],
345+
attempted_at=row[3],
346+
attempted_by=row[4],
347+
created_at=row[5],
348+
errors=row[6],
349+
finalized_at=row[7],
350+
kind=row[8],
351+
max_attempts=row[9],
352+
metadata=row[10],
353+
priority=row[11],
354+
queue=row[12],
355+
state=row[13],
356+
scheduled_at=row[14],
357+
tags=row[15],
358+
)
359+
269360

270361
class AsyncQuerier:
271362
def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection):
@@ -398,3 +489,41 @@ async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
398489
"p9": arg.tags,
399490
})
400491
return result.rowcount
492+
493+
async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
494+
row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
495+
"p1": arg.args,
496+
"p2": arg.attempt,
497+
"p3": arg.attempted_at,
498+
"p4": arg.created_at,
499+
"p5": arg.errors,
500+
"p6": arg.finalized_at,
501+
"p7": arg.kind,
502+
"p8": arg.max_attempts,
503+
"p9": arg.metadata,
504+
"p10": arg.priority,
505+
"p11": arg.queue,
506+
"p12": arg.scheduled_at,
507+
"p13": arg.state,
508+
"p14": arg.tags,
509+
})).first()
510+
if row is None:
511+
return None
512+
return models.RiverJob(
513+
id=row[0],
514+
args=row[1],
515+
attempt=row[2],
516+
attempted_at=row[3],
517+
attempted_by=row[4],
518+
created_at=row[5],
519+
errors=row[6],
520+
finalized_at=row[7],
521+
kind=row[8],
522+
max_attempts=row[9],
523+
metadata=row[10],
524+
priority=row[11],
525+
queue=row[12],
526+
state=row[13],
527+
scheduled_at=row[14],
528+
tags=row[15],
529+
)

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,37 @@ INSERT INTO river_job(
103103

104104
-- Had trouble getting multi-dimensional arrays to play nicely with sqlc,
105105
-- but it might be possible. For now, join tags into a single string.
106-
string_to_array(unnest(@tags::text[]), ',');
106+
string_to_array(unnest(@tags::text[]), ',');
107+
108+
-- name: JobInsertFull :one
109+
INSERT INTO river_job(
110+
args,
111+
attempt,
112+
attempted_at,
113+
created_at,
114+
errors,
115+
finalized_at,
116+
kind,
117+
max_attempts,
118+
metadata,
119+
priority,
120+
queue,
121+
scheduled_at,
122+
state,
123+
tags
124+
) VALUES (
125+
@args::jsonb,
126+
coalesce(@attempt::smallint, 0),
127+
@attempted_at,
128+
coalesce(sqlc.narg('created_at')::timestamptz, now()),
129+
@errors::jsonb[],
130+
@finalized_at,
131+
@kind::text,
132+
@max_attempts::smallint,
133+
coalesce(@metadata::jsonb, '{}'),
134+
@priority::smallint,
135+
@queue::text,
136+
coalesce(sqlc.narg('scheduled_at')::timestamptz, now()),
137+
@state::river_job_state,
138+
coalesce(@tags::varchar(255)[], '{}')
139+
) RETURNING *;

src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async def advisory_lock(self, key: int) -> None:
3636
await self.pg_misc_querier.pg_advisory_xact_lock(key=key)
3737

3838
async def job_insert(self, insert_params: JobInsertParams) -> Job:
39-
return _job_from_row(
39+
return job_from_row(
4040
cast( # drop Optional[] because insert always returns a row
4141
models.RiverJob,
4242
await self.job_querier.job_insert_fast(
@@ -57,7 +57,7 @@ async def job_get_by_kind_and_unique_properties(
5757
row = await self.job_querier.job_get_by_kind_and_unique_properties(
5858
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
5959
)
60-
return _job_from_row(row) if row else None
60+
return job_from_row(row) if row else None
6161

6262
@asynccontextmanager
6363
async def transaction(self) -> AsyncGenerator:
@@ -103,7 +103,7 @@ def advisory_lock(self, key: int) -> None:
103103
self.pg_misc_querier.pg_advisory_xact_lock(key=key)
104104

105105
def job_insert(self, insert_params: JobInsertParams) -> Job:
106-
return _job_from_row(
106+
return job_from_row(
107107
cast( # drop Optional[] because insert always returns a row
108108
models.RiverJob,
109109
self.job_querier.job_insert_fast(
@@ -122,7 +122,7 @@ def job_get_by_kind_and_unique_properties(
122122
row = self.job_querier.job_get_by_kind_and_unique_properties(
123123
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
124124
)
125-
return _job_from_row(row) if row else None
125+
return job_from_row(row) if row else None
126126

127127
@contextmanager
128128
def transaction(self) -> Iterator[None]:
@@ -187,7 +187,7 @@ def _build_insert_many_params(
187187
return insert_many_params
188188

189189

190-
def _job_from_row(row: models.RiverJob) -> Job:
190+
def job_from_row(row: models.RiverJob) -> Job:
191191
"""
192192
Converts an internal sqlc generated row to the top level type, issuing a few
193193
minor transformations along the way. Timestamps are changed from local
@@ -196,7 +196,7 @@ def _job_from_row(row: models.RiverJob) -> Job:
196196

197197
def attempt_error_from(data: dict[str, Any]) -> AttemptError:
198198
return AttemptError(
199-
at=data["at"],
199+
at=datetime.fromisoformat(data["at"]),
200200
attempt=data["attempt"],
201201
error=data["error"],
202202
trace=data["trace"],

src/riverqueue/job.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from dataclasses import dataclass
2-
import datetime
2+
from datetime import datetime, timezone
33
from enum import Enum
4+
import json
45
from typing import Any, Optional
56

67

@@ -106,7 +107,7 @@ class Job:
106107
if it's either snoozed or errors.
107108
"""
108109

109-
attempted_at: Optional[datetime.datetime]
110+
attempted_at: Optional[datetime]
110111
"""
111112
The time that the job was last worked. Starts out as `nil` on a new insert.
112113
"""
@@ -120,7 +121,7 @@ class Job:
120121
time when it starts up.
121122
"""
122123

123-
created_at: datetime.datetime
124+
created_at: datetime
124125
"""
125126
When the job record was created.
126127
"""
@@ -131,7 +132,7 @@ class Job:
131132
Ordered from earliest error to the latest error.
132133
"""
133134

134-
finalized_at: Optional[datetime.datetime]
135+
finalized_at: Optional[datetime]
135136
"""
136137
The time at which the job was "finalized", meaning it was either completed
137138
successfully or errored for the last time such that it'll no longer be
@@ -170,7 +171,7 @@ class Job:
170171
independently and be used to isolate jobs.
171172
"""
172173

173-
scheduled_at: datetime.datetime
174+
scheduled_at: datetime
174175
"""
175176
When the job is scheduled to become available to be worked. Jobs default to
176177
running immediately, but may be scheduled for the future when they're
@@ -199,7 +200,7 @@ class AttemptError:
199200
that occurred.
200201
"""
201202

202-
at: datetime.datetime
203+
at: datetime
203204
"""
204205
The time at which the error occurred.
205206
"""
@@ -221,3 +222,15 @@ class AttemptError:
221222
Contains a stack trace from a job that panicked. The trace is produced by
222223
invoking `debug.Trace()` in Go.
223224
"""
225+
226+
def to_json(self) -> str:
227+
return json.dumps(
228+
{
229+
"at": self.at.astimezone(timezone.utc)
230+
.isoformat()
231+
.replace("+00:00", "Z"),
232+
"attempt": self.attempt,
233+
"error": self.error,
234+
"trace": self.trace,
235+
}
236+
)

tests/driver/riversqlalchemy/sqlalchemy_driver_test.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import json
12
import pytest
23
import pytest_asyncio
4+
from riverqueue.job import AttemptError
35
import sqlalchemy
46
import sqlalchemy.ext.asyncio
57
from datetime import datetime, timezone
@@ -52,7 +54,7 @@ async def client(
5254
#
5355

5456
@pytest.mark.asyncio
55-
async def test_insert_job_from_row(self, client, simple_args):
57+
async def test_insert_job_from_row(self, client, simple_args, test_tx):
5658
insert_res = await client.insert(simple_args)
5759
job = insert_res.job
5860
assert job
@@ -70,6 +72,48 @@ async def test_insert_job_from_row(self, client, simple_args):
7072
assert job.state == JobState.AVAILABLE
7173
assert job.tags == []
7274

75+
now = datetime.now(timezone.utc)
76+
77+
job_row = await dbsqlc.river_job.AsyncQuerier(test_tx).job_insert_full(
78+
dbsqlc.river_job.JobInsertFullParams(
79+
args=json.dumps(dict(foo="args")),
80+
attempt=0,
81+
attempted_at=None,
82+
created_at=datetime.now(),
83+
errors=[
84+
AttemptError(
85+
at=now,
86+
attempt=1,
87+
error="message",
88+
trace="trace",
89+
).to_json(),
90+
],
91+
finalized_at=datetime.now(),
92+
kind="custom_kind",
93+
max_attempts=MAX_ATTEMPTS_DEFAULT,
94+
metadata=json.dumps(dict(foo="metadata")),
95+
priority=PRIORITY_DEFAULT,
96+
queue=QUEUE_DEFAULT,
97+
scheduled_at=datetime.now(),
98+
state=JobState.COMPLETED,
99+
tags=[],
100+
)
101+
)
102+
103+
job = riversqlalchemy.sql_alchemy_driver.job_from_row(job_row)
104+
assert job
105+
assert job.args == dict(foo="args")
106+
assert job.errors == [
107+
AttemptError(
108+
at=now,
109+
attempt=1,
110+
error="message",
111+
trace="trace",
112+
)
113+
]
114+
assert job.finalized_at.tzinfo == timezone.utc
115+
assert job.metadata == dict(foo="metadata")
116+
73117
#
74118
# tests below this line should match what are in the sync client tests below
75119
#

0 commit comments

Comments
 (0)