Blockchain Integration
Overview
Wafra implements a comprehensive real-time blockchain monitoring system that tracks all on-chain activity, maintains accurate balance information, and provides instant notifications for blockchain events. The system operates on Base network with robust error handling and data consistency guarantees.
Architecture Components
Real-Time Transaction Listener
Current Implementation:
- Base Network Monitoring: Continuous block processing with 5-block safety margin
- Event-Driven Architecture: Handlers for specific contract events
- Automatic Recovery: Resilient to network interruptions and RPC failures
- Data Consistency: Ensures database remains in sync with blockchain state
Block Processing System
Current Architecture:
interface BlockProcessor {
// Core processing
currentBlock: number;
safetyMargin: number; // 5 blocks
processingState: "running" | "paused" | "error";
// Event handling
eventHandlers: Map<string, EventHandler>;
contractAddresses: Set<Address>;
// Recovery mechanisms
lastHealthCheck: Date;
consecutiveFailures: number;
maxRetries: number;
}Processing Flow:
- Block Detection: Monitor for new blocks with 5-block confirmation
- Event Extraction: Parse relevant events from confirmed blocks
- Handler Dispatch: Route events to appropriate handlers
- Database Update: Atomic updates to maintain consistency
- Notification Broadcast: Real-time updates via Socket.IO
- State Persistence: Save processing state for recovery
Event Handler System
Supported Events
Current Event Handlers:
| Event | Contract | Purpose |
|---|---|---|
Transfer | USDC | Track USDC transfers to/from user wallets |
Deposit | FundContract | Monitor fund deposits |
Withdrawal | FundContract | Track fund withdrawals |
StrategyUpdated | FundContract | Strategy allocation changes |
InterestAccrued | FundContract | Daily yield accrual |
OwnerAdded | Safe | New wallet owners |
OwnerRemoved | Safe | Removed wallet owners |
Event Handler Implementation
Handler Architecture:
interface EventHandler {
signature: string;
contractAddress: Address;
handler: (event: ParsedEvent, context: ProcessingContext) => Promise<void>;
}
// Example: USDC Transfer Handler
const usdcTransferHandler: EventHandler = {
signature: "Transfer(address,address,uint256)",
contractAddress: USDC_CONTRACT_ADDRESS,
handler: async (event, context) => {
const { from, to, value } = event.args;
// Check if transfer involves monitored wallets
const userWallets = await getUserWalletAddresses();
if (userWallets.includes(to)) {
// Incoming transfer
await handleIncomingTransfer(to, value, event.transactionHash);
}
if (userWallets.includes(from)) {
// Outgoing transfer
await handleOutgoingTransfer(from, value, event.transactionHash);
}
},
};Balance Tracking System
Real-Time Balance Updates:
interface BalanceUpdate {
walletAddress: Address;
tokenAddress: Address;
previousBalance: bigint;
newBalance: bigint;
change: bigint;
transactionHash: string;
blockNumber: number;
timestamp: Date;
}
// Balance calculation with conversion rates
async function calculateUserBalance(
walletAddress: Address,
blockNumber: number
): Promise<UserBalance> {
// Get raw token balances
const usdcBalance = await getTokenBalance(
walletAddress,
USDC_ADDRESS,
blockNumber
);
const wstBalance = await getTokenBalance(
walletAddress,
WST_ADDRESS,
blockNumber
);
// Calculate USD values
const exchangeRate = await getExchangeRate(blockNumber);
const totalValueUSD =
(usdcBalance + (wstBalance * exchangeRate) / PRECISION) / 1e6;
return {
usdc: usdcBalance,
wst: wstBalance,
totalValueUSD,
exchangeRate,
lastUpdated: new Date(),
};
}Gas Cost Monitoring
Transaction Cost Tracking
Current Implementation:
interface GasCostTracker {
// Gas usage tracking
transactionHash: string;
gasUsed: bigint;
gasPrice: bigint;
maxFeePerGas: bigint;
maxPriorityFeePerGas: bigint;
// Cost calculations
totalCostETH: bigint;
totalCostUSD: number;
ethPriceUSD: number;
// Performance metrics
estimatedGas: bigint;
actualGas: bigint;
estimationAccuracy: number; // percentage
}Gas Cost Analysis:
- Real-Time ETH Price: Live ETH/USD conversion for accurate cost calculation
- Gas Efficiency Tracking: Compare estimated vs actual gas usage
- Cost Optimization: Identify opportunities for gas savings
- User Transparency: Detailed gas cost breakdown for users
ETH Price Service
Price Update System:
class ETHPriceService {
private currentPrice: number = 0;
private lastUpdate: Date = new Date(0);
private updateInterval: number = 60000; // 1 minute
async getCurrentPrice(): Promise<number> {
if (this.shouldUpdate()) {
await this.updatePrice();
}
return this.currentPrice;
}
private async updatePrice(): Promise<void> {
// Fetch from multiple sources for reliability
const sources = ["coingecko", "coinbase", "binance"];
const prices = await Promise.allSettled(
sources.map((source) => this.fetchFromSource(source))
);
// Use median price for accuracy
this.currentPrice = this.calculateMedianPrice(prices);
this.lastUpdate = new Date();
}
}Real-Time Notifications
Socket.IO Event System
Current Events:
interface SocketEvents {
// Balance updates
"balance-updated": {
userId: string;
walletAddress: Address;
newBalance: UserBalance;
change: BalanceChange;
};
// Transaction status
"transaction-status-changed": {
transactionHash: string;
status: TransactionStatus;
confirmations: number;
};
// Yield accrual
"yield-accrued": {
userId: string;
amount: bigint;
newTotalValue: number;
apy: number;
};
// System events
"system-maintenance": {
message: string;
estimatedDuration: number;
};
}User-Specific Notifications
Targeted Broadcasting:
// User-specific room management
class NotificationService {
async joinUserRoom(userId: string, socketId: string): Promise<void> {
await this.socket.join(`user:${userId}`);
// Send current status immediately
const currentBalance = await this.getLatestBalance(userId);
this.socket.emit("balance-updated", currentBalance);
}
async notifyBalanceUpdate(
userId: string,
update: BalanceUpdate
): Promise<void> {
// Broadcast to all user's connected devices
this.io.to(`user:${userId}`).emit("balance-updated", update);
// Trigger push notification if user is offline
if (!this.isUserOnline(userId)) {
await this.sendPushNotification(userId, update);
}
}
}Data Consistency & Recovery
Consistency Guarantees
Database Transactions:
async function processBlockEvents(
blockNumber: number,
events: ParsedEvent[]
): Promise<void> {
await db.$transaction(async (tx) => {
// Process all events in single transaction
for (const event of events) {
await processEvent(event, tx);
}
// Update processing state
await tx.blockProcessingState.upsert({
where: { id: "main" },
create: { id: "main", lastProcessedBlock: blockNumber },
update: { lastProcessedBlock: blockNumber },
});
});
// Only notify after successful database commit
await notifyBalanceUpdates(events);
}Error Recovery Mechanisms
Automatic Recovery:
class BlockchainMonitor {
private async handleProcessingError(
error: Error,
blockNumber: number
): Promise<void> {
logger.error("Block processing failed", {
blockNumber,
error: error.message,
stack: error.stack,
});
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.maxRetries) {
// Enter recovery mode
await this.enterRecoveryMode();
} else {
// Retry with exponential backoff
const delay = Math.pow(2, this.consecutiveFailures) * 1000;
setTimeout(() => this.processBlock(blockNumber), delay);
}
}
private async enterRecoveryMode(): Promise<void> {
// Pause processing
this.processingState = "error";
// Alert monitoring systems
await this.alertOpsTeam("Blockchain monitoring in recovery mode");
// Attempt to recover from last known good state
const lastGoodBlock = await this.getLastProcessedBlock();
await this.reprocessFromBlock(lastGoodBlock);
}
}Historical Data Management
Balance History Tracking
Historical Balance Storage:
interface BalanceHistory {
userId: string;
walletAddress: Address;
balance: bigint;
balanceUSD: number;
exchangeRate: bigint;
blockNumber: number;
transactionHash?: string;
timestamp: Date;
}
// Efficient historical queries
async function getBalanceHistory(
userId: string,
fromDate: Date,
toDate: Date
): Promise<BalanceHistory[]> {
return await db.balanceHistory.findMany({
where: {
userId,
timestamp: {
gte: fromDate,
lte: toDate,
},
},
orderBy: { timestamp: "asc" },
});
}Performance Analytics
Yield Tracking:
interface YieldMetrics {
userId: string;
period: "daily" | "weekly" | "monthly" | "yearly";
startValue: number;
endValue: number;
yield: number;
yieldPercentage: number;
apy: number;
fees: number;
netYield: number;
}
// Calculate performance metrics
async function calculateYieldMetrics(
userId: string,
period: string
): Promise<YieldMetrics> {
const [startBalance, endBalance] = await Promise.all([
getBalanceAtDate(userId, getPeriodStart(period)),
getCurrentBalance(userId),
]);
const yield = endBalance.totalValueUSD - startBalance.totalValueUSD;
const yieldPercentage = (yield / startBalance.totalValueUSD) * 100;
const apy = calculateAPY(yieldPercentage, period);
return {
userId,
period,
startValue: startBalance.totalValueUSD,
endValue: endBalance.totalValueUSD,
yield,
yieldPercentage,
apy,
fees: await calculateFees(userId, period),
netYield: yield - (await calculateFees(userId, period)),
};
}Network Health Monitoring
RPC Health Checks
Connection Monitoring:
class RPCHealthMonitor {
private providers: JsonRpcProvider[] = [];
private currentProviderIndex: number = 0;
private healthCheckInterval: number = 30000; // 30 seconds
async performHealthCheck(): Promise<void> {
const results = await Promise.allSettled(
this.providers.map((provider) => this.testProvider(provider))
);
// Update provider health status
results.forEach((result, index) => {
this.providers[index].isHealthy = result.status === "fulfilled";
});
// Switch to healthy provider if current is unhealthy
if (!this.getCurrentProvider().isHealthy) {
await this.switchToHealthyProvider();
}
}
private async testProvider(provider: JsonRpcProvider): Promise<boolean> {
try {
const blockNumber = await provider.getBlockNumber();
const latestBlock = await provider.getBlock("latest");
// Check if provider is in sync
const blockAge = Date.now() - latestBlock.timestamp * 1000;
return blockAge < 60000; // Block should be less than 1 minute old
} catch (error) {
return false;
}
}
}Performance Metrics
System Monitoring:
interface SystemMetrics {
// Processing performance
blocksProcessedPerSecond: number;
averageBlockProcessingTime: number;
eventProcessingBacklog: number;
// Network health
rpcResponseTime: number;
blockSyncLag: number;
connectionUptime: number;
// Data quality
balanceAccuracy: number;
eventMissedRate: number;
consistencyScore: number;
}Future Enhancements
Multi-Chain Support
Planned Expansions:
- Ethereum Mainnet: Support for ETH-based strategies
- Arbitrum: Layer 2 scaling for reduced gas costs
- Polygon: Alternative scaling solution
- Cross-Chain Bridges: Asset movement between chains
Advanced Analytics
Enhanced Features:
- Predictive Analytics: ML-based yield predictions
- Risk Metrics: Real-time portfolio risk assessment
- Performance Attribution: Strategy-level performance analysis
- Compliance Monitoring: Automated regulatory reporting
This blockchain integration system provides reliable, real-time monitoring with robust error handling and comprehensive data tracking for all on-chain activity.