Task Management System

Overview

Wafra implements a comprehensive async task management system that handles long-running operations, retries, and background processing. This system ensures reliable execution of critical operations like payments, blockchain transactions, and external API calls.

Architecture Components

Task Framework

Task Interface:

interface Task {
  id: string;
  type: TaskType;
  status: TaskStatus;
  data: TaskData;
  retryCount: number;
  maxRetries: number;
  scheduledAt: Date;
  startedAt?: Date;
  completedAt?: Date;
  error?: string;
  priority: TaskPriority;
}
 
enum TaskStatus {
  PENDING = "pending",
  RUNNING = "running",
  COMPLETED = "completed",
  FAILED = "failed",
  RETRYING = "retrying",
}
 
enum TaskType {
  PAYMENT_PROCESSING = "payment_processing",
  BLOCKCHAIN_TRANSACTION = "blockchain_transaction",
  KYC_VERIFICATION = "kyc_verification",
  NOTIFICATION_DELIVERY = "notification_delivery",
  DATA_SYNC = "data_sync",
}

Task Processing Engine

Current Implementation:

class TaskProcessor {
  private processingQueue: Task[] = [];
  private maxConcurrent: number = 10;
  private retryDelays: number[] = [1000, 5000, 15000, 60000]; // ms
 
  async processTask(task: Task): Promise<TaskResult> {
    try {
      task.status = TaskStatus.RUNNING;
      task.startedAt = new Date();
      
      await this.saveTask(task);
      
      const result = await this.executeTask(task);
      
      task.status = TaskStatus.COMPLETED;
      task.completedAt = new Date();
      
      await this.saveTask(task);
      await this.notifyCompletion(task, result);
      
      return { success: true, result };
    } catch (error) {
      return await this.handleTaskError(task, error);
    }
  }
 
  private async handleTaskError(task: Task, error: Error): Promise<TaskResult> {
    task.retryCount++;
    task.error = error.message;
 
    if (task.retryCount >= task.maxRetries) {
      task.status = TaskStatus.FAILED;
      task.completedAt = new Date();
      
      await this.saveTask(task);
      await this.notifyFailure(task, error);
      
      return { success: false, error: error.message };
    }
 
    // Schedule retry with exponential backoff
    const delay = this.retryDelays[Math.min(task.retryCount - 1, this.retryDelays.length - 1)];
    task.scheduledAt = new Date(Date.now() + delay);
    task.status = TaskStatus.RETRYING;
    
    await this.saveTask(task);
    await this.scheduleRetry(task, delay);
    
    return { success: false, retry: true };
  }
}

Task Types & Handlers

Payment Processing Tasks

DTR Payment Task:

interface DTRPaymentTaskData {
  userId: string;
  amount: number;
  currency: string;
  paymentMethodId: string;
  metadata: PaymentMetadata;
}
 
class DTRPaymentHandler implements TaskHandler {
  async execute(task: Task<DTRPaymentTaskData>): Promise<void> {
    const { userId, amount, currency, paymentMethodId } = task.data;
    
    // Initialize DTR payment
    const paymentResult = await dtrService.createPayment({
      userId,
      amount,
      currency,
      paymentMethodId,
    });
    
    // Update user payment status
    await this.updatePaymentStatus(userId, paymentResult);
    
    // Trigger blockchain transaction if payment successful
    if (paymentResult.status === 'completed') {
      await this.createBlockchainTask(userId, amount);
    }
  }
}

Blockchain Transaction Tasks

Safe Transaction Task:

interface SafeTransactionTaskData {
  safeAddress: string;
  transaction: SafeTransaction;
  userSignature: string;
  userId: string;
}
 
class SafeTransactionHandler implements TaskHandler {
  async execute(task: Task<SafeTransactionTaskData>): Promise<void> {
    const { safeAddress, transaction, userSignature, userId } = task.data;
    
    // Get Safe client
    const safeClient = await getSafeClient(safeAddress);
    
    // Add server signature
    const signedTransaction = await safeClient.signTransaction(transaction);
    
    // Execute transaction
    const result = await safeClient.executeTransaction(signedTransaction);
    
    // Track transaction
    await trackTransaction(result.hash, userId, 'safe_execution');
    
    return result;
  }
}

KYC Verification Tasks

KYC Document Processing:

interface KYCDocumentTaskData {
  userId: string;
  documentId: string;
  provider: 'onramp' | 'dtr';
  documentType: string;
}
 
class KYCDocumentHandler implements TaskHandler {
  async execute(task: Task<KYCDocumentTaskData>): Promise<void> {
    const { userId, documentId, provider, documentType } = task.data;
    
    // Process document with provider
    const result = await this.processDocument(provider, documentId);
    
    // Update KYC status
    await this.updateKYCStatus(userId, result);
    
    // Notify user of status change
    await this.notifyKYCUpdate(userId, result);
  }
}

Event Integration

Task Events

Event Types:

interface TaskEvent {
  taskId: string;
  type: TaskEventType;
  timestamp: Date;
  data: any;
}
 
enum TaskEventType {
  TASK_CREATED = "task_created",
  TASK_STARTED = "task_started",
  TASK_COMPLETED = "task_completed",
  TASK_FAILED = "task_failed",
  TASK_RETRIED = "task_retried",
}

Event Publishing:

class TaskEventPublisher {
  async publishTaskEvent(task: Task, eventType: TaskEventType, data?: any): Promise<void> {
    const event: TaskEvent = {
      taskId: task.id,
      type: eventType,
      timestamp: new Date(),
      data: data || task,
    };
 
    // Publish to event store
    await this.eventStore.publish(event);
    
    // Send real-time notification if needed
    if (this.shouldNotifyRealTime(eventType)) {
      await this.socketService.notifyTaskUpdate(task.userId, event);
    }
  }
}

Monitoring & Analytics

Task Metrics

Performance Tracking:

interface TaskMetrics {
  totalTasks: number;
  completedTasks: number;
  failedTasks: number;
  averageExecutionTime: number;
  retryRate: number;
  successRate: number;
  tasksByType: Record<TaskType, number>;
  tasksByStatus: Record<TaskStatus, number>;
}
 
class TaskMetricsCollector {
  async getTaskMetrics(timeRange: TimeRange): Promise<TaskMetrics> {
    const tasks = await this.getTasksInRange(timeRange);
    
    return {
      totalTasks: tasks.length,
      completedTasks: tasks.filter(t => t.status === TaskStatus.COMPLETED).length,
      failedTasks: tasks.filter(t => t.status === TaskStatus.FAILED).length,
      averageExecutionTime: this.calculateAverageExecutionTime(tasks),
      retryRate: this.calculateRetryRate(tasks),
      successRate: this.calculateSuccessRate(tasks),
      tasksByType: this.groupTasksByType(tasks),
      tasksByStatus: this.groupTasksByStatus(tasks),
    };
  }
}

Error Tracking

Failure Analysis:

interface TaskFailureAnalysis {
  taskType: TaskType;
  errorType: string;
  frequency: number;
  lastOccurrence: Date;
  affectedUsers: string[];
  suggestedAction: string;
}
 
class TaskErrorAnalyzer {
  async analyzeFailures(timeRange: TimeRange): Promise<TaskFailureAnalysis[]> {
    const failedTasks = await this.getFailedTasks(timeRange);
    
    const grouped = this.groupFailuresByError(failedTasks);
    
    return grouped.map(group => ({
      taskType: group.type,
      errorType: group.error,
      frequency: group.count,
      lastOccurrence: group.lastSeen,
      affectedUsers: group.users,
      suggestedAction: this.suggestAction(group),
    }));
  }
}

Configuration & Scaling

Task Configuration

Environment Settings:

interface TaskConfig {
  maxConcurrentTasks: number;
  defaultMaxRetries: number;
  retryDelays: number[];
  taskTimeout: number;
  queuePriority: boolean;
  persistTasks: boolean;
}
 
const taskConfig: TaskConfig = {
  maxConcurrentTasks: parseInt(process.env.MAX_CONCURRENT_TASKS || '10'),
  defaultMaxRetries: parseInt(process.env.DEFAULT_MAX_RETRIES || '3'),
  retryDelays: [1000, 5000, 15000, 60000], // Exponential backoff
  taskTimeout: parseInt(process.env.TASK_TIMEOUT || '300000'), // 5 minutes
  queuePriority: process.env.ENABLE_TASK_PRIORITY === 'true',
  persistTasks: process.env.PERSIST_TASKS === 'true',
};

Database Schema

Task Storage:

CREATE TABLE tasks (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  type VARCHAR(50) NOT NULL,
  status VARCHAR(20) NOT NULL,
  data JSONB NOT NULL,
  retry_count INTEGER DEFAULT 0,
  max_retries INTEGER DEFAULT 3,
  priority INTEGER DEFAULT 0,
  scheduled_at TIMESTAMP NOT NULL DEFAULT NOW(),
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  error TEXT,
  user_id UUID REFERENCES users(id),
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);
 
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_type ON tasks(type);
CREATE INDEX idx_tasks_scheduled_at ON tasks(scheduled_at);
CREATE INDEX idx_tasks_user_id ON tasks(user_id);

This task management system provides reliable, scalable background processing for all critical Wafra operations with comprehensive monitoring and error handling.