Task Management System
Overview
Wafra implements a comprehensive async task management system that handles long-running operations, retries, and background processing. This system ensures reliable execution of critical operations like payments, blockchain transactions, and external API calls.
Architecture Components
Task Framework
Task Interface:
interface Task {
id: string;
type: TaskType;
status: TaskStatus;
data: TaskData;
retryCount: number;
maxRetries: number;
scheduledAt: Date;
startedAt?: Date;
completedAt?: Date;
error?: string;
priority: TaskPriority;
}
enum TaskStatus {
PENDING = "pending",
RUNNING = "running",
COMPLETED = "completed",
FAILED = "failed",
RETRYING = "retrying",
}
enum TaskType {
PAYMENT_PROCESSING = "payment_processing",
BLOCKCHAIN_TRANSACTION = "blockchain_transaction",
KYC_VERIFICATION = "kyc_verification",
NOTIFICATION_DELIVERY = "notification_delivery",
DATA_SYNC = "data_sync",
}Task Processing Engine
Current Implementation:
class TaskProcessor {
private processingQueue: Task[] = [];
private maxConcurrent: number = 10;
private retryDelays: number[] = [1000, 5000, 15000, 60000]; // ms
async processTask(task: Task): Promise<TaskResult> {
try {
task.status = TaskStatus.RUNNING;
task.startedAt = new Date();
await this.saveTask(task);
const result = await this.executeTask(task);
task.status = TaskStatus.COMPLETED;
task.completedAt = new Date();
await this.saveTask(task);
await this.notifyCompletion(task, result);
return { success: true, result };
} catch (error) {
return await this.handleTaskError(task, error);
}
}
private async handleTaskError(task: Task, error: Error): Promise<TaskResult> {
task.retryCount++;
task.error = error.message;
if (task.retryCount >= task.maxRetries) {
task.status = TaskStatus.FAILED;
task.completedAt = new Date();
await this.saveTask(task);
await this.notifyFailure(task, error);
return { success: false, error: error.message };
}
// Schedule retry with exponential backoff
const delay = this.retryDelays[Math.min(task.retryCount - 1, this.retryDelays.length - 1)];
task.scheduledAt = new Date(Date.now() + delay);
task.status = TaskStatus.RETRYING;
await this.saveTask(task);
await this.scheduleRetry(task, delay);
return { success: false, retry: true };
}
}Task Types & Handlers
Payment Processing Tasks
DTR Payment Task:
interface DTRPaymentTaskData {
userId: string;
amount: number;
currency: string;
paymentMethodId: string;
metadata: PaymentMetadata;
}
class DTRPaymentHandler implements TaskHandler {
async execute(task: Task<DTRPaymentTaskData>): Promise<void> {
const { userId, amount, currency, paymentMethodId } = task.data;
// Initialize DTR payment
const paymentResult = await dtrService.createPayment({
userId,
amount,
currency,
paymentMethodId,
});
// Update user payment status
await this.updatePaymentStatus(userId, paymentResult);
// Trigger blockchain transaction if payment successful
if (paymentResult.status === 'completed') {
await this.createBlockchainTask(userId, amount);
}
}
}Blockchain Transaction Tasks
Safe Transaction Task:
interface SafeTransactionTaskData {
safeAddress: string;
transaction: SafeTransaction;
userSignature: string;
userId: string;
}
class SafeTransactionHandler implements TaskHandler {
async execute(task: Task<SafeTransactionTaskData>): Promise<void> {
const { safeAddress, transaction, userSignature, userId } = task.data;
// Get Safe client
const safeClient = await getSafeClient(safeAddress);
// Add server signature
const signedTransaction = await safeClient.signTransaction(transaction);
// Execute transaction
const result = await safeClient.executeTransaction(signedTransaction);
// Track transaction
await trackTransaction(result.hash, userId, 'safe_execution');
return result;
}
}KYC Verification Tasks
KYC Document Processing:
interface KYCDocumentTaskData {
userId: string;
documentId: string;
provider: 'onramp' | 'dtr';
documentType: string;
}
class KYCDocumentHandler implements TaskHandler {
async execute(task: Task<KYCDocumentTaskData>): Promise<void> {
const { userId, documentId, provider, documentType } = task.data;
// Process document with provider
const result = await this.processDocument(provider, documentId);
// Update KYC status
await this.updateKYCStatus(userId, result);
// Notify user of status change
await this.notifyKYCUpdate(userId, result);
}
}Event Integration
Task Events
Event Types:
interface TaskEvent {
taskId: string;
type: TaskEventType;
timestamp: Date;
data: any;
}
enum TaskEventType {
TASK_CREATED = "task_created",
TASK_STARTED = "task_started",
TASK_COMPLETED = "task_completed",
TASK_FAILED = "task_failed",
TASK_RETRIED = "task_retried",
}Event Publishing:
class TaskEventPublisher {
async publishTaskEvent(task: Task, eventType: TaskEventType, data?: any): Promise<void> {
const event: TaskEvent = {
taskId: task.id,
type: eventType,
timestamp: new Date(),
data: data || task,
};
// Publish to event store
await this.eventStore.publish(event);
// Send real-time notification if needed
if (this.shouldNotifyRealTime(eventType)) {
await this.socketService.notifyTaskUpdate(task.userId, event);
}
}
}Monitoring & Analytics
Task Metrics
Performance Tracking:
interface TaskMetrics {
totalTasks: number;
completedTasks: number;
failedTasks: number;
averageExecutionTime: number;
retryRate: number;
successRate: number;
tasksByType: Record<TaskType, number>;
tasksByStatus: Record<TaskStatus, number>;
}
class TaskMetricsCollector {
async getTaskMetrics(timeRange: TimeRange): Promise<TaskMetrics> {
const tasks = await this.getTasksInRange(timeRange);
return {
totalTasks: tasks.length,
completedTasks: tasks.filter(t => t.status === TaskStatus.COMPLETED).length,
failedTasks: tasks.filter(t => t.status === TaskStatus.FAILED).length,
averageExecutionTime: this.calculateAverageExecutionTime(tasks),
retryRate: this.calculateRetryRate(tasks),
successRate: this.calculateSuccessRate(tasks),
tasksByType: this.groupTasksByType(tasks),
tasksByStatus: this.groupTasksByStatus(tasks),
};
}
}Error Tracking
Failure Analysis:
interface TaskFailureAnalysis {
taskType: TaskType;
errorType: string;
frequency: number;
lastOccurrence: Date;
affectedUsers: string[];
suggestedAction: string;
}
class TaskErrorAnalyzer {
async analyzeFailures(timeRange: TimeRange): Promise<TaskFailureAnalysis[]> {
const failedTasks = await this.getFailedTasks(timeRange);
const grouped = this.groupFailuresByError(failedTasks);
return grouped.map(group => ({
taskType: group.type,
errorType: group.error,
frequency: group.count,
lastOccurrence: group.lastSeen,
affectedUsers: group.users,
suggestedAction: this.suggestAction(group),
}));
}
}Configuration & Scaling
Task Configuration
Environment Settings:
interface TaskConfig {
maxConcurrentTasks: number;
defaultMaxRetries: number;
retryDelays: number[];
taskTimeout: number;
queuePriority: boolean;
persistTasks: boolean;
}
const taskConfig: TaskConfig = {
maxConcurrentTasks: parseInt(process.env.MAX_CONCURRENT_TASKS || '10'),
defaultMaxRetries: parseInt(process.env.DEFAULT_MAX_RETRIES || '3'),
retryDelays: [1000, 5000, 15000, 60000], // Exponential backoff
taskTimeout: parseInt(process.env.TASK_TIMEOUT || '300000'), // 5 minutes
queuePriority: process.env.ENABLE_TASK_PRIORITY === 'true',
persistTasks: process.env.PERSIST_TASKS === 'true',
};Database Schema
Task Storage:
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
data JSONB NOT NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
priority INTEGER DEFAULT 0,
scheduled_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
error TEXT,
user_id UUID REFERENCES users(id),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_type ON tasks(type);
CREATE INDEX idx_tasks_scheduled_at ON tasks(scheduled_at);
CREATE INDEX idx_tasks_user_id ON tasks(user_id);This task management system provides reliable, scalable background processing for all critical Wafra operations with comprehensive monitoring and error handling.