Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/heartbeatClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class HeartbeatClient extends HttpClient {
}

public start(taskId: string): void {
this.logger.info(`start heartbit for taskId=${taskId}`);
this.logger.info(`[HeartbeatClient][start] taskId=${taskId}`);
if (this.intervalKey !== null) {
clearInterval(this.intervalKey);
this.intervalKey = null;
Expand All @@ -26,21 +26,20 @@ export class HeartbeatClient extends HttpClient {
}

public stop(taskId: string): void {
this.logger.info(`stop heartbit for taskId=${taskId}`);
this.logger.info(`[HeartbeatClient][stop] taskId=${taskId}`);
if (this.intervalKey !== null) {
clearInterval(this.intervalKey);
this.intervalKey = null;
}
}

public async send(taskId: number): Promise<void> {
const logFormat = `taskId=${taskId}`;
try {
this.logger.debug(`send heartbit for ${logFormat}`);
this.logger.debug(`[HeartbeatClient][send] taskId=${taskId}`);
const heartbeatUrl = `/heartbeat/${taskId}`;
await this.post(heartbeatUrl);
} catch (err) {
this.logger.error(`failed to send heartbit ${logFormat}`);
this.logger.error(`[HeartbeatClient][send] taskId=${taskId} failed error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
}
}
}
94 changes: 76 additions & 18 deletions src/jobManagerClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ILogger, HttpClient } from '@map-colonies/mc-utils';
import { NotFoundError } from '@map-colonies/error-types';
import { IJobResponse, ITaskResponse, IUpdateJobRequestPayload, IUpdateTaskRequestPayload } from './models/dataTypes';
import { ICreateJobBody, ICreateTaskBody, IJobResponse, ITaskResponse, IUpdateJobBody, IUpdateTaskBody } from './models/dataTypes';

export class JobManagerClient extends HttpClient {
public constructor(protected readonly logger: ILogger, protected jobType: string, protected taskType: string, protected jobManagerBaseUrl: string) {
Expand All @@ -12,66 +12,124 @@ export class JobManagerClient extends HttpClient {
}

public async getTask(jobId: string, taskId: string): Promise<ITaskResponse | null> {
const logFormat = `jobId=${jobId}, taskId=${taskId}`;
try {
this.logger.info(`get task ${logFormat}`);
this.logger.info(`[JobManagerClient][getTask] jobId=${jobId}, taskId=${taskId}`);
const getTaskUrl = `/jobs/${jobId}/tasks/${taskId}`;
const task = await this.get<ITaskResponse>(getTaskUrl);
return task;
} catch (err) {
this.logger.error(`failed to get task ${logFormat}`);
this.logger.error(
`[JobManagerClient][getTask] jobId=${jobId}, taskId=${taskId} failed to get task error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}

public async getTasksForJob(jobId: string): Promise<ITaskResponse[] | null> {
try {
this.logger.info(`[JobManagerClient][getTasksForJob] jobId=${jobId}`);
const getTaskUrl = `/jobs/${jobId}/tasks`;
const tasks = await this.get<ITaskResponse[]>(getTaskUrl);
return tasks;
} catch (err) {
this.logger.error(`[JobManagerClient][getTasksForJob] jobId=${jobId} failed error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
throw err;
}
}

public async getJob(jobId: string): Promise<IJobResponse | undefined> {
try {
this.logger.info(`get job ${jobId}`);
this.logger.info(`[JobManagerClient][getJob] jobId=${jobId}`);
const job = await this.get<IJobResponse>(`/jobs/${jobId}`);
return job;
} catch {
this.logger.error(`failed to get job data for job: ${jobId}`);
} catch (err) {
this.logger.error(`[JobManagerClient][getJob] jobId=${jobId} failed error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
return undefined;
}
}

public async consume(): Promise<ITaskResponse | null> {
const logFormat = `JobManagerClient: jobType=${this.jobType}, taskType=${this.taskType}`;
try {
this.logger.debug(`${logFormat} consume `);
this.logger.debug(`[JobManagerClient][consume] jobType=${this.jobType}, taskType=${this.taskType}`);
const consumeTaskUrl = `/tasks/${this.jobType}/${this.taskType}/startPending`;
const taskResponse = await this.post<ITaskResponse>(consumeTaskUrl);
return taskResponse;
} catch (err) {
if (err instanceof NotFoundError) {
this.logger.debug(`${logFormat} failed to consume due empty queue`);
this.logger.debug(`[JobManagerClient][consume] jobType=${this.jobType}, taskType=${this.taskType}, failed to consume due empty queue`);
return null;
} else {
this.logger.error(`failed to consume ${logFormat}`);
this.logger.error(
`[JobManagerClient][consume] jobType=${this.jobType}, taskType=${this.taskType}, failed to consume error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}
}

public async updateTask(jobId: string, taskId: string, payload: IUpdateTaskRequestPayload): Promise<void> {
const logFormat = `jobId=${jobId}, taskId=${taskId}`;
public async enqueueTask(jobId: string, payload: ICreateTaskBody): Promise<void> {
try {
this.logger.info(`update task ${logFormat} payload=${JSON.stringify(payload)}`);
this.logger.info(`[JobManagerClient][enqueueTask] jobId=${jobId}, payload=${JSON.stringify(payload)}`);
const createTaskUrl = `/jobs/${jobId}/tasks`;
await this.post(createTaskUrl, payload);
} catch (err) {
this.logger.error(
`[JobManagerClient][enqueueTask] jobId=${jobId}, payload=${JSON.stringify(payload)} failed error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}

public async updateTask(jobId: string, taskId: string, payload: IUpdateTaskBody): Promise<void> {
try {
this.logger.info(`[JobManagerClient][updateTask] jobId=${jobId}, taskId=${taskId}, payload=${JSON.stringify(payload)}`);
const updateTaskUrl = `/jobs/${jobId}/tasks/${taskId}`;
await this.put(updateTaskUrl, payload);
} catch (err) {
this.logger.error(`failed to update task ${logFormat}`);
this.logger.error(
`[JobManagerClient][updateTask] jobId=${jobId}, taskId=${taskId}, payload=${JSON.stringify(payload)} failed error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}

public async createJob(payload: ICreateJobBody): Promise<void> {
try {
this.logger.info(`[JobManagerClient][createJob] payload=${JSON.stringify(payload)}`);
const createJobUrl = `/jobs`;
await this.post(createJobUrl, payload);
} catch (err) {
this.logger.error(
`[JobManagerClient][createJob] payload=${JSON.stringify(payload)} failed error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`
);
throw err;
}
}

public async updateJob(jobId: string, payload: IUpdateJobRequestPayload): Promise<void> {
public async updateJob(jobId: string, payload: IUpdateJobBody): Promise<void> {
try {
this.logger.info(`update job Id=${jobId} payload=${JSON.stringify(payload)}`);
this.logger.info(`[JobManagerClient][updateJob] jobId=${jobId}, payload=${JSON.stringify(payload)}`);
const updateJobUrl = `/jobs/${jobId}`;
await this.put(updateJobUrl, payload);
} catch (err) {
this.logger.error(`failed to update job Id=${jobId}`);
this.logger.error(
`[JobManagerClient][updateJob] jobId=${jobId}, payload=${JSON.stringify(payload)} failed error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}
Expand Down
53 changes: 37 additions & 16 deletions src/models/dataTypes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export enum TaskStatus {
export enum OperationStatus {
PENDING = 'Pending',
IN_PROGRESS = 'In-Progress',
COMPLETED = 'Completed',
Expand All @@ -12,21 +12,12 @@ export interface ITaskResponse {
parameters: unknown;
created: string;
updated: string;
status: TaskStatus;
status: OperationStatus;
percentage: number;
reason: string;
attempts: number;
}

export interface IUpdateTaskRequestPayload {
status: TaskStatus;
attempts?: number;
reason?: string;
description?: string;
parameters?: unknown;
percentage?: number;
}

export interface IJobResponse {
id: string;
resourceId: string;
Expand All @@ -35,19 +26,49 @@ export interface IJobResponse {
parameters: unknown;
created: string;
updated: string;
status: TaskStatus;
status: OperationStatus;
percentage: number;
reason: string;
attempts: number;
type: string;
priority: number;
tasks: IUpdateTaskRequestPayload[];
tasks: IUpdateTaskBody[];
isCleaned: boolean;
}

export interface IUpdateJobRequestPayload {
parameters?: unknown;
status: TaskStatus;
export interface ICreateJobBody {
resourceId: string;
version: string;
parameters: Record<string, unknown>;
type: string;
description?: string;
status?: OperationStatus;
reason?: string;
tasks?: ICreateTaskBody[];
priority?: number;
}

export interface ICreateTaskBody {
description?: string;
parameters: Record<string, unknown>;
reason?: string;
type?: string;
status?: OperationStatus;
attempts?: number;
}

export interface IUpdateTaskBody {
description?: string;
parameters?: Record<string, unknown>;
status: OperationStatus;
percentage?: number;
reason?: string;
attempts?: number;
}

export interface IUpdateJobBody {
parameters?: Record<string, unknown>;
status?: OperationStatus;
percentage?: number;
reason?: string;
isCleaned?: boolean;
Expand Down
54 changes: 32 additions & 22 deletions src/taskHandler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ILogger } from '@map-colonies/mc-utils';
import { JobManagerClient } from './jobManagerClient';
import { HeartbeatClient } from './heartbeatClient';
import { ITaskResponse, IUpdateTaskRequestPayload, TaskStatus } from './models/dataTypes';
import { ITaskResponse, IUpdateTaskBody, OperationStatus } from './models/dataTypes';

const minValidPrcentage = 0;
const maxValidPrcentage = 100;
Expand All @@ -26,7 +26,7 @@ export class TaskHandler {
public async waitForTask(): Promise<ITaskResponse | null> {
let task: ITaskResponse | null;
do {
this.logger.debug('TaskHandler: consuming task');
this.logger.debug(`[TaskHandler][waitForTask]`);
task = await this.dequeue();
await new Promise((resolve) => setTimeout(resolve, this.dequeueIntervalMs));
} while (!task);
Expand All @@ -39,77 +39,87 @@ export class TaskHandler {
if (response) {
const jobId = response.jobId;
const taskId = response.id;
const payload: IUpdateTaskRequestPayload = {
status: TaskStatus.IN_PROGRESS,
const payload: IUpdateTaskBody = {
status: OperationStatus.IN_PROGRESS,
};
await this.jobManagerClient.updateTask(jobId, taskId, payload);
this.heartbeatClient.start(taskId);
}
return response;
} catch (err) {
this.logger.error(`Error occurred while trying dequeue a record error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
this.logger.error(`[TaskHandler][dequeue] error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
throw err;
}
}

public async reject(jobId: string, taskId: string, isRecoverable: boolean, reason?: string): Promise<void> {
const logFormat = `TaskHandler: jobId=${jobId}, taskId=${taskId}, isRecoverable=${String(isRecoverable)}, reason=${reason as string}`;
try {
this.logger.info(`${logFormat} reject`);
this.logger.info(`[TaskHandler][reject] jobId=${jobId}, taskId=${taskId}, isRecoverable=${String(isRecoverable)}, reason=${reason as string}`);
this.heartbeatClient.stop(taskId);
let payload: IUpdateTaskRequestPayload | undefined;
let payload: IUpdateTaskBody | undefined;
if (isRecoverable) {
const task = await this.jobManagerClient.getTask(jobId, taskId);
if (task) {
payload = {
status: TaskStatus.PENDING,
status: OperationStatus.PENDING,
attempts: task.attempts + 1,
reason: reason,
};
}
} else {
payload = {
status: TaskStatus.FAILED,
status: OperationStatus.FAILED,
};
}

if (payload !== undefined) {
this.logger.info(`${logFormat} reject send update with payload ${JSON.stringify(payload)}`);
this.logger.info(
`[TaskHandler][reject] send update with payload ${JSON.stringify(payload)} jobId=${jobId}, taskId=${taskId}, isRecoverable=${String(
isRecoverable
)}, reason=${reason as string}`
);
await this.jobManagerClient.updateTask(jobId, taskId, payload);
}
} catch (err) {
this.logger.error(`${logFormat} Error occurred while trying dequeue a record ${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
this.logger.error(
`[TaskHandler][reject] failed jobId=${jobId}, taskId=${taskId}, isRecoverable=${String(isRecoverable)}, reason=${
reason as string
} error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`
);
throw err;
}
}

public async ack(jobId: string, taskId: string): Promise<void> {
const logFormat = `TaskHandler: jobId=${jobId}, taskId=${taskId}`;
try {
this.logger.info(`${logFormat} ack`);
this.logger.info(`[TaskHandler][ack] jobId=${jobId}, taskId=${taskId}`);
this.heartbeatClient.stop(taskId);
const payload: IUpdateTaskRequestPayload = {
status: TaskStatus.COMPLETED,
const payload: IUpdateTaskBody = {
status: OperationStatus.COMPLETED,
};
await this.jobManagerClient.updateTask(jobId, taskId, payload);
} catch (err) {
this.logger.error(`${logFormat} Error occurred while executing ack logic, error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
this.logger.error(`[TaskHandler][ack] failed jobId=${jobId}, taskId=${taskId} error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
throw err;
}
}

public async updateProgress(jobId: string, taskId: string, percentage: number): Promise<void> {
const logFormat = `TaskHandler: jobId=${jobId}, taskId=${taskId}`;
const percentageValidValue = Math.min(Math.max(minValidPrcentage, percentage), maxValidPrcentage);
try {
this.logger.info(`${logFormat} updateProgress`);
const payload: IUpdateTaskRequestPayload = {
status: TaskStatus.IN_PROGRESS,
this.logger.info(`[TaskHandler][updateProgress] jobId=${jobId}, taskId=${taskId}, percentageValidValue=${percentageValidValue}`);
const payload: IUpdateTaskBody = {
status: OperationStatus.IN_PROGRESS,
percentage: percentageValidValue,
};
await this.jobManagerClient.updateTask(jobId, taskId, payload);
} catch (err) {
this.logger.error(`${logFormat} Error occurred while trying to update Progress, error=${JSON.stringify(err, Object.getOwnPropertyNames(err))}`);
this.logger.error(
`[TaskHandler][updateProgress] failed jobId=${jobId}, taskId=${taskId}, percentageValidValue=${percentageValidValue} error=${JSON.stringify(
err,
Object.getOwnPropertyNames(err)
)}`
);
throw err;
}
}
Expand Down