Blockchain Integration

Overview

Wafra implements a comprehensive real-time blockchain monitoring system that tracks all on-chain activity, maintains accurate balance information, and provides instant notifications for blockchain events. The system operates on Base network with robust error handling and data consistency guarantees.

Architecture Components

Real-Time Transaction Listener

Current Implementation:

  • Base Network Monitoring: Continuous block processing with 5-block safety margin
  • Event-Driven Architecture: Handlers for specific contract events
  • Automatic Recovery: Resilient to network interruptions and RPC failures
  • Data Consistency: Ensures database remains in sync with blockchain state

Block Processing System

Current Architecture:

interface BlockProcessor {
  // Core processing
  currentBlock: number;
  safetyMargin: number; // 5 blocks
  processingState: "running" | "paused" | "error";
 
  // Event handling
  eventHandlers: Map<string, EventHandler>;
  contractAddresses: Set<Address>;
 
  // Recovery mechanisms
  lastHealthCheck: Date;
  consecutiveFailures: number;
  maxRetries: number;
}

Processing Flow:

  1. Block Detection: Monitor for new blocks with 5-block confirmation
  2. Event Extraction: Parse relevant events from confirmed blocks
  3. Handler Dispatch: Route events to appropriate handlers
  4. Database Update: Atomic updates to maintain consistency
  5. Notification Broadcast: Real-time updates via Socket.IO
  6. State Persistence: Save processing state for recovery

Event Handler System

Supported Events

Current Event Handlers:

EventContractPurpose
TransferUSDCTrack USDC transfers to/from user wallets
DepositFundContractMonitor fund deposits
WithdrawalFundContractTrack fund withdrawals
StrategyUpdatedFundContractStrategy allocation changes
InterestAccruedFundContractDaily yield accrual
OwnerAddedSafeNew wallet owners
OwnerRemovedSafeRemoved wallet owners

Event Handler Implementation

Handler Architecture:

interface EventHandler {
  signature: string;
  contractAddress: Address;
  handler: (event: ParsedEvent, context: ProcessingContext) => Promise<void>;
}
 
// Example: USDC Transfer Handler
const usdcTransferHandler: EventHandler = {
  signature: "Transfer(address,address,uint256)",
  contractAddress: USDC_CONTRACT_ADDRESS,
  handler: async (event, context) => {
    const { from, to, value } = event.args;
 
    // Check if transfer involves monitored wallets
    const userWallets = await getUserWalletAddresses();
 
    if (userWallets.includes(to)) {
      // Incoming transfer
      await handleIncomingTransfer(to, value, event.transactionHash);
    }
 
    if (userWallets.includes(from)) {
      // Outgoing transfer
      await handleOutgoingTransfer(from, value, event.transactionHash);
    }
  },
};

Balance Tracking System

Real-Time Balance Updates:

interface BalanceUpdate {
  walletAddress: Address;
  tokenAddress: Address;
  previousBalance: bigint;
  newBalance: bigint;
  change: bigint;
  transactionHash: string;
  blockNumber: number;
  timestamp: Date;
}
 
// Balance calculation with conversion rates
async function calculateUserBalance(
  walletAddress: Address,
  blockNumber: number
): Promise<UserBalance> {
  // Get raw token balances
  const usdcBalance = await getTokenBalance(
    walletAddress,
    USDC_ADDRESS,
    blockNumber
  );
  const wstBalance = await getTokenBalance(
    walletAddress,
    WST_ADDRESS,
    blockNumber
  );
 
  // Calculate USD values
  const exchangeRate = await getExchangeRate(blockNumber);
  const totalValueUSD =
    (usdcBalance + (wstBalance * exchangeRate) / PRECISION) / 1e6;
 
  return {
    usdc: usdcBalance,
    wst: wstBalance,
    totalValueUSD,
    exchangeRate,
    lastUpdated: new Date(),
  };
}

Gas Cost Monitoring

Transaction Cost Tracking

Current Implementation:

interface GasCostTracker {
  // Gas usage tracking
  transactionHash: string;
  gasUsed: bigint;
  gasPrice: bigint;
  maxFeePerGas: bigint;
  maxPriorityFeePerGas: bigint;
 
  // Cost calculations
  totalCostETH: bigint;
  totalCostUSD: number;
  ethPriceUSD: number;
 
  // Performance metrics
  estimatedGas: bigint;
  actualGas: bigint;
  estimationAccuracy: number; // percentage
}

Gas Cost Analysis:

  • Real-Time ETH Price: Live ETH/USD conversion for accurate cost calculation
  • Gas Efficiency Tracking: Compare estimated vs actual gas usage
  • Cost Optimization: Identify opportunities for gas savings
  • User Transparency: Detailed gas cost breakdown for users

ETH Price Service

Price Update System:

class ETHPriceService {
  private currentPrice: number = 0;
  private lastUpdate: Date = new Date(0);
  private updateInterval: number = 60000; // 1 minute
 
  async getCurrentPrice(): Promise<number> {
    if (this.shouldUpdate()) {
      await this.updatePrice();
    }
    return this.currentPrice;
  }
 
  private async updatePrice(): Promise<void> {
    // Fetch from multiple sources for reliability
    const sources = ["coingecko", "coinbase", "binance"];
 
    const prices = await Promise.allSettled(
      sources.map((source) => this.fetchFromSource(source))
    );
 
    // Use median price for accuracy
    this.currentPrice = this.calculateMedianPrice(prices);
    this.lastUpdate = new Date();
  }
}

Real-Time Notifications

Socket.IO Event System

Current Events:

interface SocketEvents {
  // Balance updates
  "balance-updated": {
    userId: string;
    walletAddress: Address;
    newBalance: UserBalance;
    change: BalanceChange;
  };
 
  // Transaction status
  "transaction-status-changed": {
    transactionHash: string;
    status: TransactionStatus;
    confirmations: number;
  };
 
  // Yield accrual
  "yield-accrued": {
    userId: string;
    amount: bigint;
    newTotalValue: number;
    apy: number;
  };
 
  // System events
  "system-maintenance": {
    message: string;
    estimatedDuration: number;
  };
}

User-Specific Notifications

Targeted Broadcasting:

// User-specific room management
class NotificationService {
  async joinUserRoom(userId: string, socketId: string): Promise<void> {
    await this.socket.join(`user:${userId}`);
 
    // Send current status immediately
    const currentBalance = await this.getLatestBalance(userId);
    this.socket.emit("balance-updated", currentBalance);
  }
 
  async notifyBalanceUpdate(
    userId: string,
    update: BalanceUpdate
  ): Promise<void> {
    // Broadcast to all user's connected devices
    this.io.to(`user:${userId}`).emit("balance-updated", update);
 
    // Trigger push notification if user is offline
    if (!this.isUserOnline(userId)) {
      await this.sendPushNotification(userId, update);
    }
  }
}

Data Consistency & Recovery

Consistency Guarantees

Database Transactions:

async function processBlockEvents(
  blockNumber: number,
  events: ParsedEvent[]
): Promise<void> {
  await db.$transaction(async (tx) => {
    // Process all events in single transaction
    for (const event of events) {
      await processEvent(event, tx);
    }
 
    // Update processing state
    await tx.blockProcessingState.upsert({
      where: { id: "main" },
      create: { id: "main", lastProcessedBlock: blockNumber },
      update: { lastProcessedBlock: blockNumber },
    });
  });
 
  // Only notify after successful database commit
  await notifyBalanceUpdates(events);
}

Error Recovery Mechanisms

Automatic Recovery:

class BlockchainMonitor {
  private async handleProcessingError(
    error: Error,
    blockNumber: number
  ): Promise<void> {
    logger.error("Block processing failed", {
      blockNumber,
      error: error.message,
      stack: error.stack,
    });
 
    this.consecutiveFailures++;
 
    if (this.consecutiveFailures >= this.maxRetries) {
      // Enter recovery mode
      await this.enterRecoveryMode();
    } else {
      // Retry with exponential backoff
      const delay = Math.pow(2, this.consecutiveFailures) * 1000;
      setTimeout(() => this.processBlock(blockNumber), delay);
    }
  }
 
  private async enterRecoveryMode(): Promise<void> {
    // Pause processing
    this.processingState = "error";
 
    // Alert monitoring systems
    await this.alertOpsTeam("Blockchain monitoring in recovery mode");
 
    // Attempt to recover from last known good state
    const lastGoodBlock = await this.getLastProcessedBlock();
    await this.reprocessFromBlock(lastGoodBlock);
  }
}

Historical Data Management

Balance History Tracking

Historical Balance Storage:

interface BalanceHistory {
  userId: string;
  walletAddress: Address;
  balance: bigint;
  balanceUSD: number;
  exchangeRate: bigint;
  blockNumber: number;
  transactionHash?: string;
  timestamp: Date;
}
 
// Efficient historical queries
async function getBalanceHistory(
  userId: string,
  fromDate: Date,
  toDate: Date
): Promise<BalanceHistory[]> {
  return await db.balanceHistory.findMany({
    where: {
      userId,
      timestamp: {
        gte: fromDate,
        lte: toDate,
      },
    },
    orderBy: { timestamp: "asc" },
  });
}

Performance Analytics

Yield Tracking:

interface YieldMetrics {
  userId: string;
  period: "daily" | "weekly" | "monthly" | "yearly";
  startValue: number;
  endValue: number;
  yield: number;
  yieldPercentage: number;
  apy: number;
  fees: number;
  netYield: number;
}
 
// Calculate performance metrics
async function calculateYieldMetrics(
  userId: string,
  period: string
): Promise<YieldMetrics> {
  const [startBalance, endBalance] = await Promise.all([
    getBalanceAtDate(userId, getPeriodStart(period)),
    getCurrentBalance(userId),
  ]);
 
  const yield = endBalance.totalValueUSD - startBalance.totalValueUSD;
  const yieldPercentage = (yield / startBalance.totalValueUSD) * 100;
  const apy = calculateAPY(yieldPercentage, period);
 
  return {
    userId,
    period,
    startValue: startBalance.totalValueUSD,
    endValue: endBalance.totalValueUSD,
    yield,
    yieldPercentage,
    apy,
    fees: await calculateFees(userId, period),
    netYield: yield - (await calculateFees(userId, period)),
  };
}

Network Health Monitoring

RPC Health Checks

Connection Monitoring:

class RPCHealthMonitor {
  private providers: JsonRpcProvider[] = [];
  private currentProviderIndex: number = 0;
  private healthCheckInterval: number = 30000; // 30 seconds
 
  async performHealthCheck(): Promise<void> {
    const results = await Promise.allSettled(
      this.providers.map((provider) => this.testProvider(provider))
    );
 
    // Update provider health status
    results.forEach((result, index) => {
      this.providers[index].isHealthy = result.status === "fulfilled";
    });
 
    // Switch to healthy provider if current is unhealthy
    if (!this.getCurrentProvider().isHealthy) {
      await this.switchToHealthyProvider();
    }
  }
 
  private async testProvider(provider: JsonRpcProvider): Promise<boolean> {
    try {
      const blockNumber = await provider.getBlockNumber();
      const latestBlock = await provider.getBlock("latest");
 
      // Check if provider is in sync
      const blockAge = Date.now() - latestBlock.timestamp * 1000;
      return blockAge < 60000; // Block should be less than 1 minute old
    } catch (error) {
      return false;
    }
  }
}

Performance Metrics

System Monitoring:

interface SystemMetrics {
  // Processing performance
  blocksProcessedPerSecond: number;
  averageBlockProcessingTime: number;
  eventProcessingBacklog: number;
 
  // Network health
  rpcResponseTime: number;
  blockSyncLag: number;
  connectionUptime: number;
 
  // Data quality
  balanceAccuracy: number;
  eventMissedRate: number;
  consistencyScore: number;
}

Future Enhancements

Multi-Chain Support

Planned Expansions:

  • Ethereum Mainnet: Support for ETH-based strategies
  • Arbitrum: Layer 2 scaling for reduced gas costs
  • Polygon: Alternative scaling solution
  • Cross-Chain Bridges: Asset movement between chains

Advanced Analytics

Enhanced Features:

  • Predictive Analytics: ML-based yield predictions
  • Risk Metrics: Real-time portfolio risk assessment
  • Performance Attribution: Strategy-level performance analysis
  • Compliance Monitoring: Automated regulatory reporting

This blockchain integration system provides reliable, real-time monitoring with robust error handling and comprehensive data tracking for all on-chain activity.