Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions temporalio/converter/_extstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ class StorageOperationMetrics:
total_duration: timedelta = dataclasses.field(default_factory=timedelta)
"""Wall-clock time spent on external storage operations."""

def record_batch(self, count: int, size: int, duration: timedelta) -> None:
driver_names: set[str] = dataclasses.field(default_factory=set)
"""Names of the drivers that participated in the operations."""

def record_batch(
self, count: int, size: int, duration: timedelta, driver_names: set[str]
) -> None:
"""Record metrics from a batch of storage operations."""
self.payload_count += count
self.total_size += size
self.total_duration += duration
self.driver_names.update(driver_names)

@contextlib.contextmanager
def track(self) -> Generator[Self, None, None]:
Expand Down Expand Up @@ -362,7 +368,7 @@ async def _store_payload(self, payload: Payload) -> Payload:
)
reference_payload.external_payloads.add().size_bytes = external_size

ExternalStorage._record_metrics(1, external_size, start_time)
ExternalStorage._record_metrics(1, external_size, start_time, {driver.name()})

return reference_payload

Expand Down Expand Up @@ -407,6 +413,7 @@ async def _store_payload_sequence(

external_count = 0
external_size = 0
driver_names: set[str] = set()
for (driver, indexed_payloads), claims in zip(driver_group_list, all_claims):
indices = [idx for idx, _ in indexed_payloads]
sizes = [p.ByteSize() for _, p in indexed_payloads]
Expand All @@ -428,8 +435,11 @@ async def _store_payload_sequence(
external_size += sizes[i]

external_count += len(claims)
driver_names.add(driver.name())

ExternalStorage._record_metrics(external_count, external_size, start_time)
ExternalStorage._record_metrics(
external_count, external_size, start_time, driver_names
)

return results

Expand All @@ -452,7 +462,9 @@ async def _retrieve_payload(self, payload: Payload) -> Payload:

stored_payload = stored_payloads[0]

ExternalStorage._record_metrics(1, stored_payload.ByteSize(), start_time)
ExternalStorage._record_metrics(
1, stored_payload.ByteSize(), start_time, {driver.name()}
)

return stored_payload

Expand Down Expand Up @@ -501,6 +513,7 @@ async def _retrieve_payload_sequence(

external_count = 0
external_size = 0
driver_names: set[str] = set()
for (driver, indexed_claims), stored_payloads in zip(
driver_claim_list, all_stored
):
Expand All @@ -517,14 +530,17 @@ async def _retrieve_payload_sequence(
external_size += stored_payload.ByteSize()

external_count += len(stored_payloads)
driver_names.add(driver.name())

retrieve_indices = sorted(stored_by_index.keys())
stored_list = [stored_by_index[idx] for idx in retrieve_indices]

for i, retrieved_payload in enumerate(stored_list):
results[retrieve_indices[i]] = retrieved_payload

ExternalStorage._record_metrics(external_count, external_size, start_time)
ExternalStorage._record_metrics(
external_count, external_size, start_time, driver_names
)

return results

Expand All @@ -545,9 +561,14 @@ def _validate_payload_length(
)

@staticmethod
def _record_metrics(count: int, size: int, start_time: float):
def _record_metrics(
count: int, size: int, start_time: float, driver_names: set[str]
):
metrics = _current_storage_metrics.get()
if metrics is not None:
metrics.record_batch(
count, size, timedelta(seconds=time.monotonic() - start_time)
count,
size,
timedelta(seconds=time.monotonic() - start_time),
driver_names,
)
48 changes: 30 additions & 18 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async def _handle_activation(
id=workflow_id,
run_id=act.run_id,
type=(
workflow.workflow_type
workflow.get_info().workflow_type
if workflow
else (init_job.workflow_type if init_job else None)
),
Expand All @@ -326,9 +326,7 @@ async def _handle_activation(
if not workflow:
assert init_job
workflow = _RunningWorkflow(
self._create_workflow_instance(act, init_job),
workflow_id,
workflow_type=init_job.workflow_type,
self._create_workflow_instance(act, init_job), workflow_id
)
self._running_workflows[act.run_id] = workflow

Expand Down Expand Up @@ -461,8 +459,8 @@ async def _handle_activation(
act, task_start_time, download_metrics, upload_metrics
)

@staticmethod
def _log_workflow_task_duration(
self,
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
task_start_time: float,
download_metrics: temporalio.converter._extstore.StorageOperationMetrics,
Expand All @@ -476,47 +474,60 @@ def _fmt_duration(td: timedelta) -> str:
return f"{secs:.3f}s"
return f"{secs * 1000:.3f}ms"

msg_details: dict[str, object] = {
"event_id": act.history_length,
"workflow_task_duration": _fmt_duration(task_duration),
}
extra: dict[str, object] = {
"event_id": act.history_length,
"workflow_task_duration": task_duration,
}
completed_event_id = act.history_length + 1
_running = self._running_workflows.get(act.run_id)
_info = _running.get_info() if _running is not None else None
attempt = _info.attempt if _info is not None else "unknown"
log_id = f"{act.run_id}:{completed_event_id}:{attempt}"
Comment thread
jmaeagle99 marked this conversation as resolved.
msg_details, extra = temporalio.workflow._build_log_context(
_info._logger_details() if _info is not None else None,
full_workflow_info=_info,
)
msg_details["event_id"] = completed_event_id
msg_details["workflow_task_duration"] = _fmt_duration(task_duration)
msg_details["workflow_history_size"] = act.history_size_bytes
extra["event_id"] = completed_event_id
extra["workflow_task_duration"] = task_duration
extra["workflow_history_size"] = act.history_size_bytes
if download_metrics.payload_count > 0:
msg_details["payload_download_count"] = download_metrics.payload_count
msg_details["payload_download_size"] = download_metrics.total_size
msg_details["payload_download_duration"] = _fmt_duration(
download_metrics.total_duration
)
msg_details["payload_download_drivers"] = sorted(
download_metrics.driver_names
)
extra["payload_download_count"] = download_metrics.payload_count
extra["payload_download_size"] = download_metrics.total_size
extra["payload_download_duration"] = download_metrics.total_duration
extra["payload_download_drivers"] = sorted(download_metrics.driver_names)
if upload_metrics.payload_count > 0:
msg_details["payload_upload_count"] = upload_metrics.payload_count
msg_details["payload_upload_size"] = upload_metrics.total_size
msg_details["payload_upload_duration"] = _fmt_duration(
upload_metrics.total_duration
)
msg_details["payload_upload_drivers"] = sorted(upload_metrics.driver_names)
extra["payload_upload_count"] = upload_metrics.payload_count
extra["payload_upload_size"] = upload_metrics.total_size
extra["payload_upload_duration"] = upload_metrics.total_duration
extra["payload_upload_drivers"] = sorted(upload_metrics.driver_names)
if task_duration.total_seconds() > 10:
logger.warning(
"[TMPRL1104] Workflow task exceeded 10 seconds (%s)",
f"[TMPRL1104] {log_id} Workflow task exceeded 10 seconds (%s)",
msg_details,
extra=extra,
)
elif task_duration.total_seconds() > 5:
logger.info(
"[TMPRL1104] Workflow task exceeded 5 seconds (%s)",
f"[TMPRL1104] {log_id} Workflow task exceeded 5 seconds (%s)",
msg_details,
extra=extra,
)
else:
logger.debug(
"[TMPRL1104] Workflow task duration information (%s)",
f"[TMPRL1104] {log_id} Workflow task duration information (%s)",
msg_details,
extra=extra,
)
Expand Down Expand Up @@ -822,15 +833,16 @@ def __init__(
self,
instance: WorkflowInstance,
workflow_id: str,
workflow_type: str | None = None,
):
self.instance = instance
self.workflow_id = workflow_id
self.workflow_type = workflow_type
self.deadlocked_activation_task: Awaitable | None = None
self._deadlock_can_be_interrupted_lock = threading.Lock()
self._deadlock_can_be_interrupted = False

def get_info(self) -> temporalio.workflow.Info:
return self.instance.get_info()

def activate(
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion:
Expand Down
8 changes: 8 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ def get_external_store_context(
"""
raise NotImplementedError

@abstractmethod
def get_info(self) -> temporalio.workflow.Info:
"""Return the workflow info for this instance."""
raise NotImplementedError

def get_thread_id(self) -> int | None:
"""Return the thread identifier that this workflow is running on.

Expand Down Expand Up @@ -1202,6 +1207,9 @@ def workflow_get_current_deployment_version(
deployment_name=self._deployment_version_for_current_task.deployment_name,
)

def get_info(self) -> temporalio.workflow.Info:
return self._info

def workflow_get_current_history_length(self) -> int:
return self._current_history_length

Expand Down
3 changes: 3 additions & 0 deletions temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
for k, v in extra_globals.items():
self.globals_and_locals.pop(k, None)

def get_info(self) -> temporalio.workflow.Info:
return self.instance_details.info

def get_thread_id(self) -> int | None:
return self._current_thread_id

Expand Down
60 changes: 45 additions & 15 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,41 @@ def sandbox_import_notification_policy(
_sandbox_import_notification_policy_override.value = original_policy


def _build_log_context(
workflow_details: Mapping[str, Any] | None,
update_details: Mapping[str, Any] | None = None,
*,
workflow_info_on_message: bool = True,
workflow_info_on_extra: bool = True,
full_workflow_info: Info | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Build the msg_extra suffix and extra dict entries for a temporal log record.

Returns:
(msg_extra, extra) where msg_extra should be appended to the log message
and extra should be merged into the log record's extra dict.
"""
msg_extra: dict[str, Any] = {}
extra: dict[str, Any] = {}

if workflow_details is not None:
if workflow_info_on_message:
msg_extra.update(workflow_details)
if workflow_info_on_extra:
extra["temporal_workflow"] = dict(workflow_details)

if update_details is not None:
if workflow_info_on_message:
msg_extra.update(update_details)
if workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)

if full_workflow_info is not None:
extra["workflow_info"] = full_workflow_info

return msg_extra, extra


class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running workflow.

Expand Down Expand Up @@ -1671,30 +1706,25 @@ def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
) -> tuple[Any, MutableMapping[str, Any]]:
"""Override to add workflow details."""
extra: dict[str, Any] = {}
msg_extra: dict[str, Any] = {}
extra: dict[str, Any] = {}

if (
self.workflow_info_on_message
or self.workflow_info_on_extra
or self.full_workflow_info_on_extra
):
runtime = _Runtime.maybe_current()
if runtime:
workflow_details = runtime.logger_details
if self.workflow_info_on_message:
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
extra["temporal_workflow"] = workflow_details
if self.full_workflow_info_on_extra:
extra["workflow_info"] = runtime.workflow_info()
update_info = current_update_info()
if update_info:
update_details = update_info._logger_details
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)
msg_extra, extra = _build_log_context(
runtime.logger_details if runtime else None,
update_info._logger_details if update_info else None,
workflow_info_on_message=self.workflow_info_on_message,
workflow_info_on_extra=self.workflow_info_on_extra,
full_workflow_info=runtime.workflow_info()
if runtime and self.full_workflow_info_on_extra
else None,
)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
Expand Down
Loading
Loading