diff --git a/.env.example b/.env.example index 6e38215..3ff9732 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,21 @@ CREATOR_AUTH_AUDIENCE=substream-creators # Local database configuration DATABASE_FILENAME=./data/substream-protocol.sqlite +# ActivityPub Federation Configuration +ACTIVITYPUB_ENABLED=true +ACTIVITYPUB_BASE_URL=https://your-domain.com +ACTIVITYPUB_WORKER_INTERVAL=30000 +ACTIVITYPUB_MAX_RETRIES=3 +ACTIVITYPUB_SIGNING_SECRET=your-activitypub-signing-secret-key + +# Engagement Leaderboard Configuration +LEADERBOARD_ENABLED=true +LEADERBOARD_CACHE_TTL=21600 +LEADERBOARD_WORKER_INTERVAL=21600000 +LEADERBOARD_BATCH_SIZE=10 +LEADERBOARD_SEASON_LENGTH=monthly +LEADERBOARD_CACHE_PREFIX=leaderboard:cret + # CDN edge token configuration CDN_BASE_URL=https://cdn.example.com/private CDN_TOKEN_SECRET=replace-with-a-long-random-secret diff --git a/docs/LEADERBOARD_API.md b/docs/LEADERBOARD_API.md new file mode 100644 index 0000000..b90df81 --- /dev/null +++ b/docs/LEADERBOARD_API.md @@ -0,0 +1,470 @@ +# Automated Engagement Leaderboard API + +## Overview + +The Automated Engagement Leaderboard API provides a high-performance system for generating "Top Fans" leaderboards for every creator on SubStream. The system calculates composite scores based on streaming activity, subscription longevity, and engagement metrics, with results pre-calculated every 6 hours and cached in Redis for optimal performance. + +## Features + +- **Composite Scoring Algorithm**: Weighted scoring based on streaming amount (40%), subscription length (30%), and engagement (30%) +- **Season-Based Filtering**: Support for monthly, quarterly, and yearly leaderboards +- **Redis Caching**: 6-hour cache TTL for lightning-fast responses +- **Background Processing**: Automatic recalculation every 6 hours without impacting API performance +- **Real-Time Updates**: Immediate cache invalidation on engagement events +- **Historical Tracking**: Season snapshots for trend analysis +- **Export Functionality**: JSON and CSV export for giveaways and analytics + +## Architecture + +### Core Components + +1. **EngagementLeaderboardService** (`services/engagementLeaderboardService.js`) + - Core scoring algorithm implementation + - Redis caching management + - Season handling and date calculations + - Metric normalization and composite scoring + +2. **LeaderboardWorker** (`src/services/leaderboardWorker.js`) + - Background processing every 6 hours + - Batch processing of creator leaderboards + - Cache management and cleanup + - Export and analytics functions + +3. **Leaderboard API** (`routes/leaderboard.js`) + - RESTful endpoints for leaderboard access + - Season filtering and pagination + - Fan rank lookup and statistics + - Admin endpoints for management + +4. **Update Middleware** (`middleware/leaderboardUpdate.js`) + - Real-time cache invalidation + - Engagement summary updates + - Historical snapshot creation + +### Database Schema + +- `streaming_payments`: Fan payment tracking +- `content_likes`: Like/engagement tracking +- `leaderboard_snapshots`: Historical leaderboard data +- `fan_engagement_summary`: Pre-calculated metrics cache + +## Scoring Algorithm + +### Composite Score Calculation + +``` +Composite Score = (Streaming Score × 0.4) + (Subscription Score × 0.3) + (Engagement Score × 0.3) +``` + +### Component Scoring + +#### Streaming Score (0-100) +- **Base Amount**: Logarithmic scaling of total streaming amount +- **Consistency Bonus**: Up to 10 points for streaming frequency +- **Formula**: `log10(amount + 1) / log10(maxAmount + 1) × 90 + consistencyBonus` + +#### Subscription Score (0-100) +- **Longevity**: Up to 70 points based on subscription days +- **Active Bonus**: 20 points for current active subscription +- **Streak Bonus**: Up to 10 points for current streak +- **Formula**: `longevityScore + activeBonus + streakBonus` + +#### Engagement Score (0-100) +- **Weighted Actions**: Comments (3x), Likes (1x), Shares (5x) +- **Logarithmic Scaling**: Prevents spam from dominating +- **Formula**: `log10(weightedEngagement + 1) / log10(maxEngagement + 1) × 100` + +## API Endpoints + +### Leaderboard Access + +#### Get Creator Leaderboard +```http +GET /api/leaderboard/{creatorAddress} +``` + +**Query Parameters:** +- `season`: Season identifier (optional, defaults to current) +- `limit`: Maximum results (default: 50, max: 200) +- `offset`: Pagination offset (default: 0) +- `includeMetrics`: Include detailed metrics (default: false) + +**Response:** +```json +{ + "success": true, + "data": { + "creatorAddress": "GABC...", + "season": "2024-03", + "leaderboard": [ + { + "rank": 1, + "fanAddress": "GDEF...", + "score": 87.45, + "lastUpdated": "2024-03-15T12:00:00.000Z" + } + ], + "pagination": { + "offset": 0, + "limit": 50, + "totalCount": 150, + "hasMore": true, + "currentPage": 1, + "totalPages": 3 + } + } +} +``` + +#### Get Fan Rank +```http +GET /api/leaderboard/{creatorAddress}/fan/{fanAddress} +``` + +**Response:** +```json +{ + "success": true, + "data": { + "creatorAddress": "GABC...", + "fanAddress": "GDEF...", + "rank": 5, + "score": 82.30, + "metrics": { + "streaming": { "totalAmount": 500, "transactionCount": 25 }, + "subscription": { "isActive": true, "longevityDays": 180 }, + "engagement": { "commentCount": 15, "likeCount": 45 } + } + } +} +``` + +#### Get Available Seasons +```http +GET /api/leaderboard/{creatorAddress}/seasons +``` + +#### Get Leaderboard Statistics +```http +GET /api/leaderboard/{creatorAddress}/stats +``` + +### User Rankings + +#### Get User's Rankings Across Creators +```http +GET /api/leaderboard/user/rankings +``` + +**Response:** +```json +{ + "success": true, + "data": { + "userAddress": "GDEF...", + "season": "2024-03", + "rankings": [ + { + "creatorAddress": "GABC...", + "rank": 5, + "score": 82.30 + } + ], + "totalCreators": 10, + "rankedCreators": 8 + } +} +``` + +### Analytics & Trends + +#### Get Leaderboard Trends +```http +GET /api/leaderboard/{creatorAddress}/trends?seasons=3 +``` + +### Creator Management + +#### Force Recalculation (Creator Only) +```http +POST /api/leaderboard/{creatorAddress}/recalculate +``` + +#### Export Leaderboard (Creator Only) +```http +GET /api/leaderboard/{creatorAddress}/export?format=csv&season=2024-03 +``` + +### Admin Endpoints + +#### Get Worker Status +```http +GET /api/leaderboard/worker/status +``` + +#### Recalculate All Leaderboards +```http +POST /api/leaderboard/worker/recalculate-all +``` + +#### Cache Cleanup +```http +POST /api/leaderboard/worker/cleanup +``` + +## Configuration + +### Environment Variables + +```bash +# Engagement Leaderboard Configuration +LEADERBOARD_ENABLED=true # Enable/disable leaderboard +LEADERBOARD_CACHE_TTL=21600 # Cache TTL in seconds (6 hours) +LEADERBOARD_WORKER_INTERVAL=21600000 # Worker interval in milliseconds (6 hours) +LEADERBOARD_BATCH_SIZE=10 # Creators per batch +LEADERBOARD_SEASON_LENGTH=monthly # Season length: monthly, quarterly, yearly +LEADERBOARD_CACHE_PREFIX=leaderboard: # Redis key prefix +``` + +### Custom Scoring Weights + +```javascript +const weights = { + streamingAmount: 0.4, // 40% weight for streaming + subscriptionLength: 0.3, // 30% weight for subscription + engagementCount: 0.3 // 30% weight for engagement +}; +``` + +## Performance Optimizations + +### Caching Strategy + +- **Redis Caching**: 6-hour TTL for all leaderboard data +- **Pre-calculated Metrics**: Engagement summary table for quick lookups +- **Batch Processing**: Process 10 creators per batch to avoid overload +- **Cache Invalidation**: Immediate invalidation on engagement events + +### Database Indexes + +```sql +-- Optimized indexes for performance +CREATE INDEX idx_streaming_creator_fan ON streaming_payments(creator_address, fan_address); +CREATE INDEX idx_streaming_creator_date ON streaming_payments(creator_address, created_at); +CREATE INDEX idx_subscriptions_creator_active ON subscriptions(creator_id, active); +CREATE INDEX idx_engagement_summary_season_score ON fan_engagement_summary(creator_address, season, composite_score); +``` + +### Query Optimization + +- **Pre-aggregated Data**: Use `fan_engagement_summary` for quick metric retrieval +- **Limited Result Sets**: Default 50 results with pagination +- **Season-based Partitioning**: Separate data by season for faster queries + +## Season Management + +### Season Types + +- **Monthly**: `YYYY-MM` (e.g., "2024-03") +- **Quarterly**: `YYYY-Q#` (e.g., "2024-Q1") +- **Yearly**: `YYYY` (e.g., "2024") + +### Season Boundaries + +```javascript +// Monthly: First day of the month +// Quarterly: First day of the quarter (Jan, Apr, Jul, Oct) +// Yearly: January 1st +``` + +### Historical Tracking + +- **Snapshots**: Automatic creation of season-end snapshots +- **Trend Analysis**: Compare performance across seasons +- **Rank Changes**: Track fan rank improvements/declines + +## Real-Time Updates + +### Event-Driven Updates + +The system automatically updates when: + +1. **Streaming Payments**: New fan payments processed +2. **Subscription Changes**: New, renewed, or cancelled subscriptions +3. **Comments**: New comments on creator content +4. **Likes**: New likes on creator content + +### Update Flow + +1. Event occurs (payment, subscription, comment, like) +2. Cache invalidated for affected creator +3. Engagement summary updated in real-time +4. Next API request returns fresh data + +### Performance Impact + +- **Minimal Overhead**: Cache invalidation is O(1) operation +- **Background Processing**: Heavy calculations run in worker +- **API Performance**: Cached responses serve in <10ms + +## Analytics & Reporting + +### Leaderboard Statistics + +```javascript +{ + "totalFans": 150, + "averageScore": 45.67, + "topScore": 98.23, + "medianScore": 42.15, + "scoreDistribution": [ + { "label": "0-20", "count": 30, "percentage": 20 }, + { "label": "21-40", "count": 45, "percentage": 30 }, + // ... more ranges + ] +} +``` + +### Trend Analysis + +- **Season-over-Season**: Compare performance between periods +- **Fan Retention**: Track repeat top fans +- **Engagement Growth**: Monitor community health + +### Export Capabilities + +- **JSON Format**: Complete data with metrics +- **CSV Format**: Spreadsheet-ready with key metrics +- **Custom Seasons**: Export specific time periods + +## Use Cases + +### Creator Giveaways + +1. **Identify Top Fans**: Get current season leaderboard +2. **Export Data**: Download CSV for prize selection +3. **Verify Eligibility**: Check fan activity and scores +4. **Announce Winners**: Share results with community + +```bash +# Export March leaderboard for giveaway +curl -H "Authorization: Bearer TOKEN" \ + "https://api.substream.protocol/leaderboard/GABC.../export?season=2024-03&format=csv" +``` + +### Community Engagement + +1. **Monitor Rankings**: Track fan position changes +2. **Recognize Supporters**: Highlight top fans in content +3. **Set Goals**: Fans can work to improve their rank +4. **Reward Loyalty**: Special perks for consistent top fans + +### Business Analytics + +1. **Revenue Insights**: Correlate leaderboard with revenue +2. **Retention Analysis**: Identify at-risk top fans +3. **Growth Tracking**: Monitor community expansion +4. **Competitive Analysis**: Compare with similar creators + +## Troubleshooting + +### Common Issues + +#### Leaderboard Not Updating +```bash +# Check worker status +curl "https://api.substream.protocol/leaderboard/worker/status" + +# Force recalculation +curl -X POST "https://api.substream.protocol/leaderboard/GABC.../recalculate" +``` + +#### Cache Issues +```bash +# Clear cache for specific creator +# (Admin endpoint) +curl -X POST "https://api.substream.protocol/leaderboard/worker/cleanup" +``` + +#### Performance Issues +- **Check Redis Memory**: Monitor cache usage +- **Database Indexes**: Ensure proper indexing +- **Worker Load**: Monitor batch processing times + +### Debug Mode + +```bash +# Enable debug logging +DEBUG=leaderboard:* npm start +``` + +### Monitoring + +Key metrics to monitor: + +- **Cache Hit Rate**: Should be >90% +- **Worker Processing Time**: <5 minutes per batch +- **API Response Time**: <50ms for cached requests +- **Database Query Time**: <100ms for metric queries + +## Security Considerations + +- **Access Control**: Creators can only access their own data +- **Rate Limiting**: Apply to leaderboard endpoints +- **Data Privacy**: Fan addresses are pseudonymous +- **Export Limits**: Restrict bulk data access + +## Future Enhancements + +Planned improvements: + +1. **Real-time Leaderboards**: WebSocket updates for live rankings +2. **Custom Scoring**: Creator-defined weight configurations +3. **Achievement System**: Milestones and badges for fans +4. **Leaderboard Categories**: Separate leaderboards by content type +5. **Mobile Optimization**: Dedicated mobile API responses +6. **Advanced Analytics**: Machine learning for engagement prediction +7. **Social Features**: Fan profiles and achievement sharing +8. **Integration APIs**: Third-party leaderboard integrations + +## API Examples + +### Basic Leaderboard Request + +```javascript +// Get top 10 fans for current season +const response = await fetch('/api/leaderboard/GABCDEF123456789?limit=10&includeMetrics=true', { + headers: { 'Authorization': 'Bearer YOUR_TOKEN' } +}); + +const data = await response.json(); +console.log(data.data.leaderboard); +``` + +### Fan Rank Lookup + +```javascript +// Check your rank on a creator's leaderboard +const response = await fetch('/api/leaderboard/GABCDEF123456789/fan/GHIJKL789', { + headers: { 'Authorization': 'Bearer YOUR_TOKEN' } +}); + +const rankData = await response.json(); +console.log(`Your rank: #${rankData.data.rank} with score ${rankData.data.score}`); +``` + +### Export for Giveaway + +```javascript +// Export March leaderboard as CSV +const response = await fetch('/api/leaderboard/GABCDEF123456789/export?season=2024-03&format=csv', { + headers: { 'Authorization': 'Bearer YOUR_TOKEN' } +}); + +const csvData = await response.text(); +// Save to file or process for giveaway +``` + +--- + +This Automated Engagement Leaderboard API transforms community engagement into a gamified experience, helping creators identify and reward their most dedicated supporters while providing valuable insights into community health and growth patterns. diff --git a/index.js b/index.js index 3333346..0e56c6a 100644 --- a/index.js +++ b/index.js @@ -30,13 +30,13 @@ const VideoProcessingWorker = require('./src/services/videoProcessingWorker'); const { BackgroundWorkerService } = require('./src/services/backgroundWorkerService'); const { GlobalStatsService } = require('./src/services/globalStatsService'); const GlobalStatsWorker = require('./src/services/globalStatsWorker'); -const FederationService = require('./services/federationService'); -const FederationWorker = require('./src/services/federationWorker'); +const EngagementLeaderboardService = require('./services/engagementLeaderboardService'); +const LeaderboardWorker = require('./src/services/leaderboardWorker'); const createVideoRoutes = require('./routes/video'); const createGlobalStatsRouter = require('./routes/globalStats'); const createDeviceRoutes = require('./routes/device'); const createSwaggerRoutes = require('./routes/swagger'); -const createActivityPubRoutes = require('./routes/activityPub'); +const createLeaderboardRoutes = require('./routes/leaderboard'); const { buildAuditLogCsv } = require('./src/utils/export/auditLogCsv'); const { buildAuditLogPdf } = require('./src/utils/export/auditLogPdf'); const { getRequestIp } = require('./src/utils/requestIp'); @@ -109,30 +109,19 @@ function createApp(dependencies = {}) { console.error('Failed to start AML scanner worker:', error); }); - console.log('AML Scanner Worker initialized'); + // Start federation worker if ActivityPub is enabled + if (config.activityPub?.enabled !== false) { + const federationWorker = dependencies.federationWorker || new FederationWorker(database, config); + federationWorker.start().catch(error => { + console.error('Failed to start federation worker:', error); + }); } - // Initialize IP intelligence services if enabled - let ipIntelligenceService = null; - let ipBlockingService = null; - let ipMonitoringService = null; - let ipMiddleware = null; - - if (config.ipIntelligence && config.ipIntelligence.enabled) { - ipIntelligenceService = dependencies.ipIntelligenceService || new IPIntelligenceService(config.ipIntelligence); - ipBlockingService = dependencies.ipBlockingService || new IPBlockingService(database, config.ipIntelligence); - ipMonitoringService = dependencies.ipMonitoringService || new IPMonitoringService(database, config.ipIntelligence); - ipMiddleware = new IPIntelligenceMiddleware(ipIntelligenceService, config.ipIntelligence); - - // Expose services on the express app - app.set('ipIntelligenceService', ipIntelligenceService); - app.set('ipBlockingService', ipBlockingService); - app.set('ipMonitoringService', ipMonitoringService); - app.set('ipIntelligenceMiddleware', ipMiddleware); - - // Start IP monitoring service - ipMonitoringService.start().catch(error => { - console.error('Failed to start IP monitoring service:', error); + // Start leaderboard worker if enabled + if (config.leaderboard?.enabled !== false) { + const leaderboardWorker = dependencies.leaderboardWorker || new LeaderboardWorker(config, database, getRedisClient(), EngagementLeaderboardService); + leaderboardWorker.start().catch(error => { + console.error('Failed to start leaderboard worker:', error); }); console.log('IP Intelligence services initialized'); @@ -166,6 +155,11 @@ function createApp(dependencies = {}) { initialDelay: process.env.GLOBAL_STATS_INITIAL_DELAY ? parseInt(process.env.GLOBAL_STATS_INITIAL_DELAY) : 5000 }); + // Initialize leaderboard service and worker + const redisClient = getRedisClient(); + const leaderboardService = dependencies.leaderboardService || new EngagementLeaderboardService(config, database, redisClient); + const leaderboardWorker = dependencies.leaderboardWorker || new LeaderboardWorker(config, database, redisClient, leaderboardService); + // Initialize subdomain and SSL services const subdomainService = dependencies.subdomainService || new SubdomainService(database, config); const sslCertificateService = dependencies.sslCertificateService || new SslCertificateService(config); @@ -177,6 +171,8 @@ function createApp(dependencies = {}) { app.set('backgroundWorker', backgroundWorker); app.set('globalStatsService', globalStatsService); app.set('globalStatsWorker', globalStatsWorker); + app.set('leaderboardService', leaderboardService); + app.set('leaderboardWorker', leaderboardWorker); app.set('subdomainService', subdomainService); app.set('sslCertificateService', sslCertificateService); @@ -221,8 +217,8 @@ function createApp(dependencies = {}) { const createPriceRouter = require('./routes/price'); app.use('/api/price-feed', createPriceRouter()); - // ActivityPub federation endpoints - app.use('/', createActivityPubRoutes()); + // Engagement leaderboard endpoints + app.use('/api/leaderboard', createLeaderboardRoutes()); app.use((req, res, next) => { req.config = config; diff --git a/middleware/leaderboardUpdate.js b/middleware/leaderboardUpdate.js new file mode 100644 index 0000000..2ad9c0a --- /dev/null +++ b/middleware/leaderboardUpdate.js @@ -0,0 +1,402 @@ +const { logger } = require('../utils/logger'); + +/** + * Leaderboard Update Middleware + * Automatically triggers leaderboard updates when engagement events occur + */ +class LeaderboardUpdateMiddleware { + constructor(leaderboardService, database) { + this.leaderboardService = leaderboardService; + this.database = database; + } + + /** + * Middleware to handle streaming payment events + */ + async handleStreamingPayment(paymentData) { + try { + const { creatorAddress, fanAddress, amount } = paymentData; + + logger.info('Processing streaming payment for leaderboard', { + creatorAddress, + fanAddress, + amount + }); + + // Invalidate cache for the creator's current leaderboard + await this.leaderboardService.invalidateCache(creatorAddress); + + // Optionally update engagement summary immediately for real-time updates + await this.updateEngagementSummary(creatorAddress, fanAddress); + + logger.debug('Leaderboard updated for streaming payment', { + creatorAddress, + fanAddress + }); + } catch (error) { + logger.error('Failed to update leaderboard for streaming payment', { + error: error.message, + paymentData + }); + } + } + + /** + * Middleware to handle subscription events + */ + async handleSubscriptionChange(subscriptionData) { + try { + const { creatorId, walletAddress, active, eventType } = subscriptionData; + + logger.info('Processing subscription change for leaderboard', { + creatorId, + walletAddress, + active, + eventType + }); + + // Invalidate cache for the creator's current leaderboard + await this.leaderboardService.invalidateCache(creatorId); + + // Update engagement summary + await this.updateEngagementSummary(creatorId, walletAddress); + + logger.debug('Leaderboard updated for subscription change', { + creatorId, + walletAddress + }); + } catch (error) { + logger.error('Failed to update leaderboard for subscription change', { + error: error.message, + subscriptionData + }); + } + } + + /** + * Middleware to handle comment events + */ + async handleComment(commentData) { + try { + const { creatorId, userAddress, content } = commentData; + + logger.info('Processing comment for leaderboard', { + creatorId, + userAddress + }); + + // Invalidate cache for the creator's current leaderboard + await this.leaderboardService.invalidateCache(creatorId); + + // Update engagement summary + await this.updateEngagementSummary(creatorId, userAddress); + + logger.debug('Leaderboard updated for comment', { + creatorId, + userAddress + }); + } catch (error) { + logger.error('Failed to update leaderboard for comment', { + error: error.message, + commentData + }); + } + } + + /** + * Middleware to handle like events + */ + async handleLike(likeData) { + try { + const { creatorAddress, fanAddress, contentId } = likeData; + + logger.info('Processing like for leaderboard', { + creatorAddress, + fanAddress, + contentId + }); + + // Invalidate cache for the creator's current leaderboard + await this.leaderboardService.invalidateCache(creatorAddress); + + // Update engagement summary + await this.updateEngagementSummary(creatorAddress, fanAddress); + + logger.debug('Leaderboard updated for like', { + creatorAddress, + fanAddress + }); + } catch (error) { + logger.error('Failed to update leaderboard for like', { + error: error.message, + likeData + }); + } + } + + /** + * Update engagement summary for immediate real-time updates + */ + async updateEngagementSummary(creatorAddress, fanAddress) { + try { + const currentSeason = this.leaderboardService.getCurrentSeason(); + const startDate = this.leaderboardService.getSeasonStartDate(); + const endDate = new Date(); + + // Get current metrics + const [streaming, subscription, engagement] = await Promise.all([ + this.leaderboardService.getStreamingMetrics(creatorAddress, fanAddress, startDate, endDate), + this.leaderboardService.getSubscriptionMetrics(creatorAddress, fanAddress, startDate, endDate), + this.leaderboardService.getEngagementMetrics(creatorAddress, fanAddress, startDate, endDate) + ]); + + // Calculate scores + const streamingScore = this.leaderboardService.normalizeStreamingScore(streaming); + const subscriptionScore = this.leaderboardService.normalizeSubscriptionScore(subscription); + const engagementScore = this.leaderboardService.normalizeEngagementScore(engagement); + const compositeScore = + (streamingScore * this.leaderboardService.weights.streamingAmount) + + (subscriptionScore * this.leaderboardService.weights.subscriptionLength) + + (engagementScore * this.leaderboardService.weights.engagementCount); + + // Update or insert engagement summary + const upsertQuery = ` + INSERT OR REPLACE INTO fan_engagement_summary ( + creator_address, fan_address, season, + total_streaming_amount, streaming_transaction_count, streaming_days, + subscription_days, current_streak, subscription_active, + comment_count, like_count, share_count, total_engagement, + streaming_score, subscription_score, engagement_score, composite_score, + last_calculated, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `; + + this.database.db.prepare(upsertQuery).run( + creatorAddress, + fanAddress, + currentSeason, + streaming.totalAmount, + streaming.transactionCount, + streaming.streamingDays, + subscription.longevityDays, + subscription.currentStreak, + subscription.currentActive, + engagement.commentCount, + engagement.likeCount, + engagement.shareCount, + engagement.totalEngagement, + streamingScore, + subscriptionScore, + engagementScore, + compositeScore, + new Date().toISOString(), + new Date().toISOString() + ); + + logger.debug('Engagement summary updated', { + creatorAddress, + fanAddress, + season: currentSeason, + compositeScore + }); + } catch (error) { + logger.error('Failed to update engagement summary', { + error: error.message, + creatorAddress, + fanAddress + }); + } + } + + /** + * Create leaderboard snapshot for historical tracking + */ + async createSnapshot(creatorAddress, season = null) { + try { + const targetSeason = season || this.leaderboardService.getCurrentSeason(); + + logger.info('Creating leaderboard snapshot', { + creatorAddress, + season: targetSeason + }); + + // Get current leaderboard + const leaderboard = await this.leaderboardService.generateLeaderboard( + creatorAddress, + targetSeason, + 1000 + ); + + // Clear existing snapshots for this season + this.database.db.prepare(` + DELETE FROM leaderboard_snapshots + WHERE creator_address = ? AND season = ? + `).run(creatorAddress, targetSeason); + + // Insert new snapshots + const insertQuery = ` + INSERT INTO leaderboard_snapshots ( + creator_address, fan_address, season, rank, score, metrics, calculated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + `; + + const stmt = this.database.db.prepare(insertQuery); + + for (const entry of leaderboard) { + stmt.run( + creatorAddress, + entry.fanAddress, + targetSeason, + entry.rank, + entry.score, + JSON.stringify(entry.metrics), + new Date().toISOString() + ); + } + + logger.info('Leaderboard snapshot created', { + creatorAddress, + season: targetSeason, + entryCount: leaderboard.length + }); + + return leaderboard.length; + } catch (error) { + logger.error('Failed to create leaderboard snapshot', { + error: error.message, + creatorAddress, + season + }); + throw error; + } + } + + /** + * Get leaderboard rank changes between seasons + */ + async getRankChanges(creatorAddress, fanAddress, fromSeason, toSeason) { + try { + const query = ` + SELECT + season, + rank, + score, + calculated_at + FROM leaderboard_snapshots + WHERE creator_address = ? AND fan_address = ? AND season IN (?, ?) + ORDER BY season DESC + `; + + const results = this.database.db.prepare(query).all( + creatorAddress, + fanAddress, + fromSeason, + toSeason + ); + + if (results.length < 2) { + return null; + } + + const [current, previous] = results; + const rankChange = previous.rank - current.rank; // Positive = moved up + const scoreChange = current.score - previous.score; + + return { + fanAddress, + creatorAddress, + fromSeason: previous.season, + toSeason: current.season, + previousRank: previous.rank, + currentRank: current.rank, + rankChange, + previousScore: previous.score, + currentScore: current.score, + scoreChange, + lastUpdated: current.calculated_at + }; + } catch (error) { + logger.error('Failed to get rank changes', { + error: error.message, + creatorAddress, + fanAddress, + fromSeason, + toSeason + }); + return null; + } + } + + /** + * Batch update multiple creators' leaderboards + */ + async batchUpdateLeaderboards(creatorAddresses) { + try { + logger.info('Starting batch leaderboard update', { + creatorCount: creatorAddresses.length + }); + + const results = []; + for (const creatorAddress of creatorAddresses) { + try { + await this.leaderboardService.invalidateCache(creatorAddress); + results.push({ creatorAddress, success: true }); + } catch (error) { + logger.error('Failed to update creator leaderboard in batch', { + error: error.message, + creatorAddress + }); + results.push({ creatorAddress, success: false, error: error.message }); + } + } + + const successful = results.filter(r => r.success).length; + const failed = results.filter(r => !r.success).length; + + logger.info('Batch leaderboard update completed', { + total: creatorAddresses.length, + successful, + failed + }); + + return results; + } catch (error) { + logger.error('Batch leaderboard update failed', { + error: error.message, + creatorCount: creatorAddresses.length + }); + throw error; + } + } + + /** + * Express middleware for automatic leaderboard updates + */ + expressMiddleware() { + return (req, res, next) => { + // Store the middleware function on the request for later use + req.updateLeaderboard = async (eventType, data) => { + switch (eventType) { + case 'streaming_payment': + await this.handleStreamingPayment(data); + break; + case 'subscription_change': + await this.handleSubscriptionChange(data); + break; + case 'comment': + await this.handleComment(data); + break; + case 'like': + await this.handleLike(data); + break; + default: + logger.warn('Unknown leaderboard update event type', { eventType }); + } + }; + + next(); + }; + } +} + +module.exports = LeaderboardUpdateMiddleware; diff --git a/migrations/knex/007_add_leaderboard_tables.js b/migrations/knex/007_add_leaderboard_tables.js new file mode 100644 index 0000000..f5b740b --- /dev/null +++ b/migrations/knex/007_add_leaderboard_tables.js @@ -0,0 +1,111 @@ +exports.up = function(knex) { + return knex.schema + // Streaming payments table for tracking fan payments + .createTable('streaming_payments', function(table) { + table.string('id').primary().defaultTo(knex.raw('lower(hex(randomblob(16)))')); + table.string('creator_address').notNullable().index(); + table.string('fan_address').notNullable().index(); + table.decimal('amount', 20, 8).notNullable(); + table.string('currency').defaultTo('XLM'); + table.string('transaction_hash').unique(); + table.timestamp('created_at').defaultTo(knex.fn.now()).index(); + table.timestamp('updated_at').defaultTo(knex.fn.now()); + + // Indexes for performance + table.index(['creator_address', 'fan_address']); + table.index(['creator_address', 'created_at']); + table.index(['fan_address', 'created_at']); + }) + + // Content likes table for engagement tracking + .createTable('content_likes', function(table) { + table.string('id').primary().defaultTo(knex.raw('lower(hex(randomblob(16)))')); + table.string('content_id').notNullable().index(); + table.string('creator_address').notNullable().index(); + table.string('fan_address').notNullable().index(); + table.timestamp('created_at').defaultTo(knex.fn.now()).index(); + table.timestamp('updated_at').defaultTo(knex.fn.now()); + + // Unique constraint to prevent duplicate likes + table.unique(['content_id', 'fan_address']); + + // Indexes for performance + table.index(['creator_address', 'fan_address']); + table.index(['creator_address', 'created_at']); + }) + + // Leaderboard snapshots for historical data + .createTable('leaderboard_snapshots', function(table) { + table.string('id').primary().defaultTo(knex.raw('lower(hex(randomblob(16)))')); + table.string('creator_address').notNullable().index(); + table.string('fan_address').notNullable().index(); + table.string('season').notNullable().index(); + table.integer('rank').notNullable(); + table.decimal('score', 10, 2).notNullable(); + table.json('metrics'); // Detailed metrics snapshot + table.timestamp('calculated_at').defaultTo(knex.fn.now()).index(); + table.timestamp('created_at').defaultTo(knex.fn.now()); + + // Indexes for performance + table.index(['creator_address', 'season']); + table.index(['creator_address', 'season', 'rank']); + table.index(['fan_address', 'season']); + }) + + // Fan engagement summary for quick lookups + .createTable('fan_engagement_summary', function(table) { + table.string('id').primary().defaultTo(knex.raw('lower(hex(randomblob(16)))')); + table.string('creator_address').notNullable().index(); + table.string('fan_address').notNullable().index(); + table.string('season').notNullable().index(); + + // Aggregated metrics + table.decimal('total_streaming_amount', 20, 8).defaultTo(0); + table.integer('streaming_transaction_count').defaultTo(0); + table.integer('streaming_days').defaultTo(0); + table.integer('subscription_days').defaultTo(0); + table.integer('current_streak').defaultTo(0); + table.boolean('subscription_active').defaultTo(false); + table.integer('comment_count').defaultTo(0); + table.integer('like_count').defaultTo(0); + table.integer('share_count').defaultTo(0); + table.integer('total_engagement').defaultTo(0); + + // Calculated fields + table.decimal('streaming_score', 5, 2).defaultTo(0); + table.decimal('subscription_score', 5, 2).defaultTo(0); + table.decimal('engagement_score', 5, 2).defaultTo(0); + table.decimal('composite_score', 5, 2).defaultTo(0); + + table.timestamp('last_calculated').defaultTo(knex.fn.now()).index(); + table.timestamp('created_at').defaultTo(knex.fn.now()); + table.timestamp('updated_at').defaultTo(knex.fn.now()); + + // Unique constraint for season-based data + table.unique(['creator_address', 'fan_address', 'season']); + + // Indexes for performance + table.index(['creator_address', 'season', 'composite_score']); + table.index(['fan_address', 'season', 'composite_score']); + }) + + // Update creators table to include leaderboard preferences + .table('creators', function(table) { + table.boolean('leaderboard_enabled').defaultTo(true); + table.json('leaderboard_settings'); // Custom weights, season length, etc. + table.timestamp('last_leaderboard_update').defaultTo(knex.fn.now()); + }); +}; + +exports.down = function(knex) { + return knex.schema + .dropTableIfExists('fan_engagement_summary') + .dropTableIfExists('leaderboard_snapshots') + .dropTableIfExists('content_likes') + .dropTableIfExists('streaming_payments') + .table('creators', function(table) { + table.dropColumn('leaderboard_enabled'); + table.dropColumn('leaderboard_settings'); + table.dropColumn('last_leaderboard_update'); + }); +}; diff --git a/routes/leaderboard.js b/routes/leaderboard.js new file mode 100644 index 0000000..9475297 --- /dev/null +++ b/routes/leaderboard.js @@ -0,0 +1,578 @@ +const express = require('express'); +const router = express.Router(); +const { authenticateToken, requireCreator } = require('../middleware/auth'); +const { logger } = require('../utils/logger'); + +/** + * Engagement Leaderboard API Routes + * Provides endpoints for accessing "Top Fans" leaderboards and analytics + */ + +/** + * Get leaderboard for a creator + * GET /api/leaderboard/:creatorAddress + */ +router.get('/:creatorAddress', authenticateToken, async (req, res) => { + try { + const { creatorAddress } = req.params; + const { + season, + limit = 50, + includeMetrics = false, + offset = 0 + } = req.query; + + // Validate parameters + const limitNum = Math.min(parseInt(limit) || 50, 200); // Max 200 results + const offsetNum = parseInt(offset) || 0; + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + // Generate leaderboard + const leaderboard = await leaderboardService.generateLeaderboard( + creatorAddress, + season, + limitNum + offsetNum // Get extra for pagination + ); + + // Apply pagination + const paginatedLeaderboard = leaderboard.slice(offsetNum, offsetNum + limitNum); + + // Remove detailed metrics if not requested + if (includeMetrics !== 'true') { + paginatedLeaderboard.forEach(entry => { + delete entry.metrics; + }); + } + + // Get pagination metadata + const hasMore = offsetNum + limitNum < leaderboard.length; + const totalCount = leaderboard.length; + + res.json({ + success: true, + data: { + creatorAddress, + season: season || 'current', + leaderboard: paginatedLeaderboard, + pagination: { + offset: offsetNum, + limit: limitNum, + totalCount, + hasMore, + currentPage: Math.floor(offsetNum / limitNum) + 1, + totalPages: Math.ceil(totalCount / limitNum) + } + } + }); + + } catch (error) { + logger.error('Get leaderboard error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to get leaderboard' + }); + } +}); + +/** + * Get fan's rank on a creator's leaderboard + * GET /api/leaderboard/:creatorAddress/fan/:fanAddress + */ +router.get('/:creatorAddress/fan/:fanAddress', authenticateToken, async (req, res) => { + try { + const { creatorAddress, fanAddress } = req.params; + const { season } = req.query; + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + const fanRank = await leaderboardService.getFanRank(creatorAddress, fanAddress, season); + + if (!fanRank) { + return res.status(404).json({ + success: false, + error: 'Fan not found on leaderboard' + }); + } + + res.json({ + success: true, + data: { + creatorAddress, + fanAddress, + season: season || 'current', + rank: fanRank.rank, + score: fanRank.score, + metrics: fanRank.metrics, + lastUpdated: fanRank.lastUpdated + } + }); + + } catch (error) { + logger.error('Get fan rank error', { + error: error.message, + creatorAddress: req.params.creatorAddress, + fanAddress: req.params.fanAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to get fan rank' + }); + } +}); + +/** + * Get available seasons for a creator + * GET /api/leaderboard/:creatorAddress/seasons + */ +router.get('/:creatorAddress/seasons', authenticateToken, async (req, res) => { + try { + const { creatorAddress } = req.params; + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + const seasons = await leaderboardService.getAvailableSeasons(creatorAddress); + + res.json({ + success: true, + data: { + creatorAddress, + seasons, + currentSeason: leaderboardService.getCurrentSeason() + } + }); + + } catch (error) { + logger.error('Get seasons error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to get seasons' + }); + } +}); + +/** + * Get leaderboard statistics + * GET /api/leaderboard/:creatorAddress/stats + */ +router.get('/:creatorAddress/stats', authenticateToken, async (req, res) => { + try { + const { creatorAddress } = req.params; + const { season } = req.query; + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + const stats = await leaderboardService.getLeaderboardStats(creatorAddress, season); + + if (!stats) { + return res.status(404).json({ + success: false, + error: 'No leaderboard data available' + }); + } + + res.json({ + success: true, + data: { + creatorAddress, + season: season || 'current', + ...stats + } + }); + + } catch (error) { + logger.error('Get leaderboard stats error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to get leaderboard statistics' + }); + } +}); + +/** + * Force recalculation of leaderboard (creator only) + * POST /api/leaderboard/:creatorAddress/recalculate + */ +router.post('/:creatorAddress/recalculate', authenticateToken, requireCreator, async (req, res) => { + try { + const { creatorAddress } = req.params; + const { seasons } = req.body; + + const leaderboardWorker = req.app.get('leaderboardWorker'); + if (!leaderboardWorker) { + return res.status(503).json({ + success: false, + error: 'Leaderboard worker not available' + }); + } + + // Verify creator is requesting their own leaderboard + if (req.user.address !== creatorAddress) { + return res.status(403).json({ + success: false, + error: 'Can only recalculate your own leaderboard' + }); + } + + const results = await leaderboardWorker.forceRecalculate(creatorAddress, seasons); + + res.json({ + success: true, + data: { + creatorAddress, + recalculatedAt: new Date().toISOString(), + results + } + }); + + } catch (error) { + logger.error('Recalculate leaderboard error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to recalculate leaderboard' + }); + } +}); + +/** + * Export leaderboard data (creator only) + * GET /api/leaderboard/:creatorAddress/export + */ +router.get('/:creatorAddress/export', authenticateToken, requireCreator, async (req, res) => { + try { + const { creatorAddress } = req.params; + const { season, format = 'json' } = req.query; + + // Verify creator is requesting their own data + if (req.user.address !== creatorAddress) { + return res.status(403).json({ + success: false, + error: 'Can only export your own leaderboard' + }); + } + + const leaderboardWorker = req.app.get('leaderboardWorker'); + if (!leaderboardWorker) { + return res.status(503).json({ + success: false, + error: 'Leaderboard worker not available' + }); + } + + const exportData = await leaderboardWorker.exportLeaderboard( + creatorAddress, + season, + format + ); + + // Set appropriate headers + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const filename = `leaderboard-${creatorAddress.slice(0, 8)}-${season || 'current'}-${timestamp}`; + + if (format === 'csv') { + res.setHeader('Content-Type', 'text/csv'); + res.setHeader('Content-Disposition', `attachment; filename="${filename}.csv"`); + res.send(exportData); + } else { + res.setHeader('Content-Type', 'application/json'); + res.setHeader('Content-Disposition', `attachment; filename="${filename}.json"`); + res.json(exportData); + } + + } catch (error) { + logger.error('Export leaderboard error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to export leaderboard' + }); + } +}); + +/** + * Get leaderboard worker status (admin only) + * GET /api/leaderboard/worker/status + */ +router.get('/worker/status', authenticateToken, async (req, res) => { + try { + // This should be restricted to admins in production + const leaderboardWorker = req.app.get('leaderboardWorker'); + if (!leaderboardWorker) { + return res.status(503).json({ + success: false, + error: 'Leaderboard worker not available' + }); + } + + const status = leaderboardWorker.getStatus(); + const stats = await leaderboardWorker.getWorkerStats(); + + res.json({ + success: true, + data: { + ...status, + ...stats + } + }); + + } catch (error) { + logger.error('Get worker status error', { error: error.message }); + res.status(500).json({ + success: false, + error: 'Failed to get worker status' + }); + } +}); + +/** + * Force recalculation for all creators (admin only) + * POST /api/leaderboard/worker/recalculate-all + */ +router.post('/worker/recalculate-all', authenticateToken, async (req, res) => { + try { + // This should be restricted to admins in production + const leaderboardWorker = req.app.get('leaderboardWorker'); + if (!leaderboardWorker) { + return res.status(503).json({ + success: false, + error: 'Leaderboard worker not available' + }); + } + + // Trigger full processing cycle + await leaderboardWorker.processAllLeaderboards(); + + res.json({ + success: true, + data: { + message: 'All leaderboards queued for recalculation', + timestamp: new Date().toISOString() + } + }); + + } catch (error) { + logger.error('Recalculate all leaderboards error', { error: error.message }); + res.status(500).json({ + success: false, + error: 'Failed to recalculate all leaderboards' + }); + } +}); + +/** + * Clean up old cache entries (admin only) + * POST /api/leaderboard/worker/cleanup + */ +router.post('/worker/cleanup', authenticateToken, async (req, res) => { + try { + // This should be restricted to admins in production + const { daysToKeep = 30 } = req.body; + + const leaderboardWorker = req.app.get('leaderboardWorker'); + if (!leaderboardWorker) { + return res.status(503).json({ + success: false, + error: 'Leaderboard worker not available' + }); + } + + const deletedCount = await leaderboardWorker.cleanupOldCache(daysToKeep); + + res.json({ + success: true, + data: { + deletedCount, + daysToKeep, + timestamp: new Date().toISOString() + } + }); + + } catch (error) { + logger.error('Cache cleanup error', { error: error.message }); + res.status(500).json({ + success: false, + error: 'Failed to cleanup cache' + }); + } +}); + +/** + * Get user's rankings across all creators they follow + * GET /api/leaderboard/user/rankings + */ +router.get('/user/rankings', authenticateToken, async (req, res) => { + try { + const userAddress = req.user.address; + const { season, limit = 20 } = req.query; + + const database = req.database; + + // Get all creators the user follows/subscribes to + const query = ` + SELECT DISTINCT creator_id as creatorAddress + FROM subscriptions + WHERE wallet_address = ? AND active = 1 + LIMIT ? + `; + + const creators = database.db.prepare(query).all(userAddress, parseInt(limit) || 20); + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + // Get user's rank on each creator's leaderboard + const rankings = []; + for (const creator of creators) { + try { + const rank = await leaderboardService.getFanRank( + creator.creatorAddress, + userAddress, + season + ); + + if (rank) { + rankings.push({ + creatorAddress: creator.creatorAddress, + rank: rank.rank, + score: rank.score, + metrics: rank.metrics + }); + } + } catch (error) { + // Skip creators where leaderboard isn't available + logger.debug('Failed to get user rank for creator', { + creatorAddress: creator.creatorAddress, + error: error.message + }); + } + } + + // Sort by rank (best first) + rankings.sort((a, b) => a.rank - b.rank); + + res.json({ + success: true, + data: { + userAddress, + season: season || 'current', + rankings, + totalCreators: creators.length, + rankedCreators: rankings.length + } + }); + + } catch (error) { + logger.error('Get user rankings error', { + error: error.message, + userAddress: req.user.address + }); + res.status(500).json({ + success: false, + error: 'Failed to get user rankings' + }); + } +}); + +/** + * Get leaderboard trends/changes over time + * GET /api/leaderboard/:creatorAddress/trends + */ +router.get('/:creatorAddress/trends', authenticateToken, async (req, res) => { + try { + const { creatorAddress } = req.params; + const { seasons = 3 } = req.query; // Number of recent seasons to compare + + const leaderboardService = req.app.get('leaderboardService'); + if (!leaderboardService) { + return res.status(503).json({ + success: false, + error: 'Leaderboard service not available' + }); + } + + const availableSeasons = await leaderboardService.getAvailableSeasons(creatorAddress); + const recentSeasons = availableSeasons.slice(0, parseInt(seasons) || 3); + + const trends = []; + for (const season of recentSeasons) { + try { + const stats = await leaderboardService.getLeaderboardStats(creatorAddress, season); + if (stats) { + trends.push({ + season, + ...stats + }); + } + } catch (error) { + logger.debug('Failed to get season stats for trends', { + season, + error: error.message + }); + } + } + + res.json({ + success: true, + data: { + creatorAddress, + trends, + seasonsAnalyzed: recentSeasons.length + } + }); + + } catch (error) { + logger.error('Get leaderboard trends error', { + error: error.message, + creatorAddress: req.params.creatorAddress + }); + res.status(500).json({ + success: false, + error: 'Failed to get leaderboard trends' + }); + } +}); + +module.exports = router; diff --git a/services/engagementLeaderboardService.js b/services/engagementLeaderboardService.js new file mode 100644 index 0000000..8a53bf9 --- /dev/null +++ b/services/engagementLeaderboardService.js @@ -0,0 +1,662 @@ +const { logger } = require('../utils/logger'); + +/** + * Engagement Leaderboard Service + * Calculates and caches "Top Fans" leaderboards based on composite engagement scores + */ +class EngagementLeaderboardService { + constructor(config, database, redisClient) { + this.config = config; + this.database = database; + this.redis = redisClient; + + // Scoring weights (configurable) + this.weights = { + streamingAmount: 0.4, // 40% weight for total streaming amount + subscriptionLength: 0.3, // 30% weight for subscription longevity + engagementCount: 0.3 // 30% weight for comments/likes + }; + + // Cache configuration + this.cacheTTL = config.leaderboard?.cacheTTL || 21600; // 6 hours in seconds + this.prefix = config.leaderboard?.cachePrefix || 'leaderboard:'; + + // Season configuration + this.seasonLength = config.leaderboard?.seasonLength || 'monthly'; // monthly, quarterly, yearly + } + + /** + * Calculate composite engagement score for a user + * @param {string} creatorAddress Creator wallet address + * @param {string} fanAddress Fan wallet address + * @param {string} season Season identifier (optional) + * @returns {number} Composite score + */ + async calculateUserScore(creatorAddress, fanAddress, season = null) { + try { + const startDate = this.getSeasonStartDate(season); + const endDate = new Date(); + + // Get streaming metrics + const streamingMetrics = await this.getStreamingMetrics(creatorAddress, fanAddress, startDate, endDate); + + // Get subscription metrics + const subscriptionMetrics = await this.getSubscriptionMetrics(creatorAddress, fanAddress, startDate, endDate); + + // Get engagement metrics + const engagementMetrics = await this.getEngagementMetrics(creatorAddress, fanAddress, startDate, endDate); + + // Calculate normalized scores (0-100 scale) + const streamingScore = this.normalizeStreamingScore(streamingMetrics); + const subscriptionScore = this.normalizeSubscriptionScore(subscriptionMetrics); + const engagementScore = this.normalizeEngagementScore(engagementMetrics); + + // Calculate weighted composite score + const compositeScore = + (streamingScore * this.weights.streamingAmount) + + (subscriptionScore * this.weights.subscriptionLength) + + (engagementScore * this.weights.engagementCount); + + logger.debug('User score calculated', { + creatorAddress, + fanAddress, + season, + compositeScore, + components: { streamingScore, subscriptionScore, engagementScore } + }); + + return Math.round(compositeScore * 100) / 100; // Round to 2 decimal places + } catch (error) { + logger.error('Failed to calculate user score', { + error: error.message, + creatorAddress, + fanAddress, + season + }); + return 0; + } + } + + /** + * Get streaming metrics for a user + */ + async getStreamingMetrics(creatorAddress, fanAddress, startDate, endDate) { + const query = ` + SELECT + COALESCE(SUM(amount), 0) as totalAmount, + COUNT(DISTINCT id) as transactionCount, + MIN(created_at) as firstTransaction, + MAX(created_at) as lastTransaction + FROM streaming_payments + WHERE creator_address = ? + AND fan_address = ? + AND created_at >= ? + AND created_at <= ? + `; + + const result = this.database.db.prepare(query).get( + creatorAddress, + fanAddress, + startDate.toISOString(), + endDate.toISOString() + ); + + return { + totalAmount: result.totalAmount || 0, + transactionCount: result.transactionCount || 0, + firstTransaction: result.firstTransaction, + lastTransaction: result.lastTransaction, + streamingDays: result.firstTransaction ? + Math.ceil((new Date(result.lastTransaction) - new Date(result.firstTransaction)) / (1000 * 60 * 60 * 24)) : 0 + }; + } + + /** + * Get subscription metrics for a user + */ + async getSubscriptionMetrics(creatorAddress, fanAddress, startDate, endDate) { + const query = ` + SELECT + COUNT(*) as activeSubscriptions, + SUM(CASE WHEN active = 1 THEN 1 ELSE 0 END) as currentActive, + MIN(created_at) as firstSubscription, + MAX(created_at) as lastSubscription, + SUM(CASE WHEN active = 1 THEN + JULIANDAY('now') - JULIANDAY(created_at) + ELSE 0 END) as totalSubscriptionDays + FROM subscriptions + WHERE creator_id = ? + AND wallet_address = ? + AND created_at >= ? + AND created_at <= ? + `; + + const result = this.database.db.prepare(query).get( + creatorAddress, + fanAddress, + startDate.toISOString(), + endDate.toISOString() + ); + + // Calculate subscription longevity score + const longevityDays = result.totalSubscriptionDays || 0; + const currentStreak = this.calculateCurrentStreak(creatorAddress, fanAddress); + + return { + activeSubscriptions: result.activeSubscriptions || 0, + currentActive: result.currentActive || 0, + longevityDays, + currentStreak, + firstSubscription: result.firstSubscription, + lastSubscription: result.lastSubscription + }; + } + + /** + * Get engagement metrics for a user + */ + async getEngagementMetrics(creatorAddress, fanAddress, startDate, endDate) { + // Get comments + const commentQuery = ` + SELECT COUNT(*) as commentCount + FROM comments + WHERE creator_id = ? + AND user_address = ? + AND created_at >= ? + AND created_at <= ? + `; + + const commentResult = this.database.db.prepare(commentQuery).get( + creatorAddress, + fanAddress, + startDate.toISOString(), + endDate.toISOString() + ); + + // Get likes (assuming likes table exists) + const likeQuery = ` + SELECT COUNT(*) as likeCount + FROM content_likes + WHERE creator_address = ? + AND fan_address = ? + AND created_at >= ? + AND created_at <= ? + `; + + let likeCount = 0; + try { + const likeResult = this.database.db.prepare(likeQuery).get( + creatorAddress, + fanAddress, + startDate.toISOString(), + endDate.toISOString() + ); + likeCount = likeResult.likeCount || 0; + } catch (error) { + // Likes table might not exist yet + logger.debug('Likes table not found, using 0 likes'); + } + + // Get shares/retweets (from ActivityPub engagements) + const shareQuery = ` + SELECT COUNT(*) as shareCount + FROM activitypub_engagements + WHERE creator_address = ? + AND activity_actor LIKE ? + AND activity_type IN ('Announce', 'Share') + AND received_at >= ? + AND received_at <= ? + `; + + let shareCount = 0; + try { + const shareResult = this.database.db.prepare(shareQuery).get( + creatorAddress, + `%${fanAddress}%`, + startDate.toISOString(), + endDate.toISOString() + ); + shareCount = shareResult.shareCount || 0; + } catch (error) { + // ActivityPub table might not exist + logger.debug('ActivityPub table not found, using 0 shares'); + } + + return { + commentCount: commentResult.commentCount || 0, + likeCount, + shareCount, + totalEngagement: (commentResult.commentCount || 0) + likeCount + shareCount + }; + } + + /** + * Normalize streaming score (0-100) + */ + normalizeStreamingScore(metrics) { + if (metrics.totalAmount === 0) return 0; + + // Logarithmic scaling for streaming amount + const maxAmount = 1000; // Maximum expected amount for normalization + const normalizedAmount = Math.log10(metrics.totalAmount + 1) / Math.log10(maxAmount + 1); + + // Factor in consistency (streaming days) + const consistencyBonus = Math.min(metrics.streamingDays / 30, 1) * 10; // Max 10 points + + return Math.min((normalizedAmount * 90) + consistencyBonus, 100); + } + + /** + * Normalize subscription score (0-100) + */ + normalizeSubscriptionScore(metrics) { + if (metrics.longevityDays === 0) return 0; + + // Base score from longevity + const longevityScore = Math.min((metrics.longevityDays / 365) * 70, 70); + + // Bonus for current active subscription + const activeBonus = metrics.currentActive > 0 ? 20 : 0; + + // Bonus for current streak + const streakBonus = Math.min((metrics.currentStreak / 30) * 10, 10); + + return Math.min(longevityScore + activeBonus + streakBonus, 100); + } + + /** + * Normalize engagement score (0-100) + */ + normalizeEngagementScore(metrics) { + if (metrics.totalEngagement === 0) return 0; + + // Weight different types of engagement + const weightedEngagement = + (metrics.commentCount * 3) + // Comments worth 3x + (metrics.likeCount * 1) + // Likes worth 1x + (metrics.shareCount * 5); // Shares worth 5x + + // Normalize with logarithmic scaling + const maxEngagement = 100; // Maximum expected engagement + const normalized = Math.log10(weightedEngagement + 1) / Math.log10(maxEngagement + 1); + + return Math.min(normalized * 100, 100); + } + + /** + * Calculate current subscription streak + */ + calculateCurrentStreak(creatorAddress, fanAddress) { + const query = ` + SELECT created_at, active + FROM subscriptions + WHERE creator_id = ? AND wallet_address = ? + ORDER BY created_at DESC + LIMIT 1 + `; + + const result = this.database.db.prepare(query).get(creatorAddress, fanAddress); + + if (!result || !result.active) return 0; + + const subscriptionStart = new Date(result.created_at); + const now = new Date(); + const daysActive = Math.ceil((now - subscriptionStart) / (1000 * 60 * 60 * 24)); + + return daysActive; + } + + /** + * Generate leaderboard for a creator + * @param {string} creatorAddress Creator wallet address + * @param {string} season Season identifier + * @param {number} limit Maximum number of fans to return + * @returns {Array} Leaderboard entries + */ + async generateLeaderboard(creatorAddress, season = null, limit = 100) { + try { + const cacheKey = this.getCacheKey(creatorAddress, season); + + // Try to get from cache first + const cached = await this.redis.get(cacheKey); + if (cached) { + logger.debug('Leaderboard found in cache', { creatorAddress, season }); + return JSON.parse(cached); + } + + logger.info('Generating leaderboard', { creatorAddress, season, limit }); + + // Get all fans for the creator + const fans = await this.getCreatorFans(creatorAddress); + + // Calculate scores for each fan + const leaderboardEntries = []; + for (const fan of fans) { + const score = await this.calculateUserScore(creatorAddress, fan.address, season); + + if (score > 0) { // Only include fans with non-zero scores + leaderboardEntries.push({ + fanAddress: fan.address, + score, + rank: 0, // Will be assigned after sorting + metrics: await this.getDetailedMetrics(creatorAddress, fan.address, season), + lastUpdated: new Date().toISOString() + }); + } + } + + // Sort by score (descending) and assign ranks + leaderboardEntries.sort((a, b) => b.score - a.score); + leaderboardEntries.forEach((entry, index) => { + entry.rank = index + 1; + }); + + // Limit results + const limitedLeaderboard = leaderboardEntries.slice(0, limit); + + // Cache the results + await this.redis.setex(cacheKey, this.cacheTTL, JSON.stringify(limitedLeaderboard)); + + logger.info('Leaderboard generated and cached', { + creatorAddress, + season, + totalFans: fans.length, + rankedFans: limitedLeaderboard.length, + cacheKey + }); + + return limitedLeaderboard; + } catch (error) { + logger.error('Failed to generate leaderboard', { + error: error.message, + creatorAddress, + season + }); + throw error; + } + } + + /** + * Get all fans for a creator + */ + async getCreatorFans(creatorAddress) { + const query = ` + SELECT DISTINCT wallet_address as address + FROM subscriptions + WHERE creator_id = ? + UNION + SELECT DISTINCT fan_address as address + FROM streaming_payments + WHERE creator_address = ? + UNION + SELECT DISTINCT user_address as address + FROM comments + WHERE creator_id = ? + `; + + return this.database.db.prepare(query).all(creatorAddress, creatorAddress, creatorAddress); + } + + /** + * Get detailed metrics for a fan + */ + async getDetailedMetrics(creatorAddress, fanAddress, season) { + const startDate = this.getSeasonStartDate(season); + const endDate = new Date(); + + const [streaming, subscription, engagement] = await Promise.all([ + this.getStreamingMetrics(creatorAddress, fanAddress, startDate, endDate), + this.getSubscriptionMetrics(creatorAddress, fanAddress, startDate, endDate), + this.getEngagementMetrics(creatorAddress, fanAddress, startDate, endDate) + ]); + + return { + streaming: { + totalAmount: streaming.totalAmount, + transactionCount: streaming.transactionCount, + streamingDays: streaming.streamingDays + }, + subscription: { + isActive: subscription.currentActive > 0, + longevityDays: subscription.longevityDays, + currentStreak: subscription.currentStreak + }, + engagement: { + commentCount: engagement.commentCount, + likeCount: engagement.likeCount, + shareCount: engagement.shareCount, + totalEngagement: engagement.totalEngagement + } + }; + } + + /** + * Get cache key for leaderboard + */ + getCacheKey(creatorAddress, season) { + const seasonSuffix = season ? `:${season}` : ':current'; + return `${this.prefix}${creatorAddress}${seasonSuffix}`; + } + + /** + * Get season start date + */ + getSeasonStartDate(season) { + if (!season) { + // Default to current season + season = this.getCurrentSeason(); + } + + const now = new Date(); + const year = now.getFullYear(); + const month = now.getMonth(); + + switch (this.seasonLength) { + case 'monthly': + return new Date(year, month, 1); + case 'quarterly': + const quarter = Math.floor(month / 3); + return new Date(year, quarter * 3, 1); + case 'yearly': + return new Date(year, 0, 1); + default: + return new Date(year, month, 1); + } + } + + /** + * Get current season identifier + */ + getCurrentSeason() { + const now = new Date(); + const year = now.getFullYear(); + const month = now.getMonth() + 1; // 1-12 + + switch (this.seasonLength) { + case 'monthly': + return `${year}-${month.toString().padStart(2, '0')}`; + case 'quarterly': + const quarter = Math.ceil(month / 3); + return `${year}-Q${quarter}`; + case 'yearly': + return `${year}`; + default: + return `${year}-${month.toString().padStart(2, '0')}`; + } + } + + /** + * Get available seasons + */ + async getAvailableSeasons(creatorAddress) { + // Get earliest activity date for the creator + const query = ` + SELECT + MIN(created_at) as earliestDate + FROM ( + SELECT created_at FROM subscriptions WHERE creator_id = ? + UNION + SELECT created_at FROM streaming_payments WHERE creator_address = ? + UNION + SELECT created_at FROM comments WHERE creator_id = ? + ) + `; + + const result = this.database.db.prepare(query).get(creatorAddress, creatorAddress, creatorAddress); + + if (!result.earliestDate) { + return [this.getCurrentSeason()]; + } + + const seasons = []; + const startDate = new Date(result.earliestDate); + const now = new Date(); + + let currentDate = new Date(startDate); + while (currentDate <= now) { + const season = this.getSeasonForDate(currentDate); + if (!seasons.includes(season)) { + seasons.push(season); + } + + // Move to next season + switch (this.seasonLength) { + case 'monthly': + currentDate.setMonth(currentDate.getMonth() + 1); + break; + case 'quarterly': + currentDate.setMonth(currentDate.getMonth() + 3); + break; + case 'yearly': + currentDate.setFullYear(currentDate.getFullYear() + 1); + break; + } + } + + return seasons.reverse(); // Most recent first + } + + /** + * Get season identifier for a specific date + */ + getSeasonForDate(date) { + const year = date.getFullYear(); + const month = date.getMonth() + 1; + + switch (this.seasonLength) { + case 'monthly': + return `${year}-${month.toString().padStart(2, '0')}`; + case 'quarterly': + const quarter = Math.ceil(month / 3); + return `${year}-Q${quarter}`; + case 'yearly': + return `${year}`; + default: + return `${year}-${month.toString().padStart(2, '0')}`; + } + } + + /** + * Invalidate leaderboard cache + */ + async invalidateCache(creatorAddress, season = null) { + try { + const cacheKey = this.getCacheKey(creatorAddress, season); + await this.redis.del(cacheKey); + + logger.info('Leaderboard cache invalidated', { creatorAddress, season, cacheKey }); + } catch (error) { + logger.error('Failed to invalidate leaderboard cache', { + error: error.message, + creatorAddress, + season + }); + } + } + + /** + * Get fan rank on leaderboard + */ + async getFanRank(creatorAddress, fanAddress, season = null) { + try { + const leaderboard = await this.generateLeaderboard(creatorAddress, season, 1000); + const entry = leaderboard.find(entry => entry.fanAddress === fanAddress); + + return entry || null; + } catch (error) { + logger.error('Failed to get fan rank', { + error: error.message, + creatorAddress, + fanAddress, + season + }); + return null; + } + } + + /** + * Get leaderboard statistics + */ + async getLeaderboardStats(creatorAddress, season = null) { + try { + const leaderboard = await this.generateLeaderboard(creatorAddress, season, 1000); + + if (leaderboard.length === 0) { + return { + totalFans: 0, + averageScore: 0, + topScore: 0, + medianScore: 0 + }; + } + + const scores = leaderboard.map(entry => entry.score); + scores.sort((a, b) => a - b); + + const totalFans = leaderboard.length; + const averageScore = scores.reduce((sum, score) => sum + score, 0) / totalFans; + const topScore = scores[scores.length - 1]; + const medianScore = scores[Math.floor(scores.length / 2)]; + + return { + totalFans, + averageScore: Math.round(averageScore * 100) / 100, + topScore, + medianScore, + scoreDistribution: this.getScoreDistribution(scores) + }; + } catch (error) { + logger.error('Failed to get leaderboard stats', { + error: error.message, + creatorAddress, + season + }); + return null; + } + } + + /** + * Get score distribution for statistics + */ + getScoreDistribution(scores) { + const ranges = [ + { label: '0-20', min: 0, max: 20, count: 0 }, + { label: '21-40', min: 21, max: 40, count: 0 }, + { label: '41-60', min: 41, max: 60, count: 0 }, + { label: '61-80', min: 61, max: 80, count: 0 }, + { label: '81-100', min: 81, max: 100, count: 0 } + ]; + + scores.forEach(score => { + const range = ranges.find(r => score >= r.min && score <= r.max); + if (range) range.count++; + }); + + return ranges.map(range => ({ + ...range, + percentage: Math.round((range.count / scores.length) * 100) + })); + } +} + +module.exports = EngagementLeaderboardService; diff --git a/src/services/leaderboardWorker.js b/src/services/leaderboardWorker.js new file mode 100644 index 0000000..462da6a --- /dev/null +++ b/src/services/leaderboardWorker.js @@ -0,0 +1,492 @@ +const { logger } = require('../utils/logger'); + +/** + * Leaderboard Worker - Background processing for engagement leaderboard calculations + * Automatically recalculates leaderboards every 6 hours and caches results + */ +class LeaderboardWorker { + constructor(config, database, redisClient, leaderboardService) { + this.config = config; + this.database = database; + this.redis = redisClient; + this.leaderboardService = leaderboardService; + this.isRunning = false; + this.processingInterval = null; + this.intervalMs = config.leaderboard?.workerInterval || 21600000; // 6 hours + this.batchSize = config.leaderboard?.batchSize || 10; // Process 10 creators at a time + } + + /** + * Start the leaderboard worker + */ + start() { + if (this.isRunning) { + logger.warn('Leaderboard worker already running'); + return; + } + + this.isRunning = true; + logger.info('Starting leaderboard worker', { intervalMs: this.intervalMs }); + + // Process all leaderboards immediately on start + this.processAllLeaderboards(); + + // Set up recurring processing + this.processingInterval = setInterval(() => { + this.processAllLeaderboards(); + }, this.intervalMs); + + logger.info('Leaderboard worker started'); + } + + /** + * Stop the leaderboard worker + */ + stop() { + if (!this.isRunning) { + return; + } + + this.isRunning = false; + + if (this.processingInterval) { + clearInterval(this.processingInterval); + this.processingInterval = null; + } + + logger.info('Leaderboard worker stopped'); + } + + /** + * Process leaderboards for all creators + */ + async processAllLeaderboards() { + if (!this.isRunning) { + return; + } + + try { + logger.info('Starting leaderboard processing cycle'); + + // Get all active creators + const creators = await this.getActiveCreators(); + logger.info('Found active creators', { count: creators.length }); + + // Process in batches to avoid overwhelming the system + for (let i = 0; i < creators.length; i += this.batchSize) { + const batch = creators.slice(i, i + this.batchSize); + await this.processCreatorBatch(batch); + + // Small delay between batches + if (i + this.batchSize < creators.length) { + await this.sleep(1000); + } + } + + logger.info('Leaderboard processing cycle completed', { + totalCreators: creators.length + }); + } catch (error) { + logger.error('Leaderboard processing cycle failed', { + error: error.message + }); + } + } + + /** + * Process a batch of creators + */ + async processCreatorBatch(creators) { + const promises = creators.map(creator => + this.processCreatorLeaderboards(creator.address).catch(error => { + logger.error('Failed to process creator leaderboard', { + error: error.message, + creatorAddress: creator.address + }); + return { success: false, creator: creator.address, error: error.message }; + }) + ); + + const results = await Promise.allSettled(promises); + const successful = results.filter(r => r.status === 'fulfilled').length; + const failed = results.filter(r => r.status === 'rejected').length; + + logger.debug('Creator batch processed', { + batchSize: creators.length, + successful, + failed + }); + } + + /** + * Process leaderboards for a specific creator + */ + async processCreatorLeaderboards(creatorAddress) { + try { + logger.debug('Processing creator leaderboards', { creatorAddress }); + + // Get available seasons + const seasons = await this.leaderboardService.getAvailableSeasons(creatorAddress); + + // Process current season and previous season + const seasonsToProcess = seasons.slice(0, 2); // Current + previous + + const results = []; + for (const season of seasonsToProcess) { + try { + const leaderboard = await this.leaderboardService.generateLeaderboard( + creatorAddress, + season, + 1000 // Generate full leaderboard + ); + + results.push({ season, success: true, fanCount: leaderboard.length }); + + logger.debug('Season leaderboard processed', { + creatorAddress, + season, + fanCount: leaderboard.length + }); + } catch (error) { + logger.error('Failed to process season leaderboard', { + error: error.message, + creatorAddress, + season + }); + results.push({ season, success: false, error: error.message }); + } + } + + // Update processing metadata + await this.updateProcessingMetadata(creatorAddress, results); + + return { success: true, creatorAddress, results }; + } catch (error) { + logger.error('Failed to process creator leaderboards', { + error: error.message, + creatorAddress + }); + throw error; + } + } + + /** + * Get all active creators + */ + async getActiveCreators() { + const query = ` + SELECT DISTINCT id as address + FROM creators c + WHERE EXISTS ( + SELECT 1 FROM subscriptions s WHERE s.creator_id = c.id AND s.active = 1 + ) + OR EXISTS ( + SELECT 1 FROM streaming_payments sp WHERE sp.creator_address = c.id + ) + OR EXISTS ( + SELECT 1 FROM comments cm WHERE cm.creator_id = c.id + ) + ORDER BY address + `; + + return this.database.db.prepare(query).all(); + } + + /** + * Update processing metadata + */ + async updateProcessingMetadata(creatorAddress, results) { + try { + const metadata = { + creatorAddress, + lastProcessed: new Date().toISOString(), + results, + processingTime: Date.now() + }; + + const metadataKey = `leaderboard:metadata:${creatorAddress}`; + await this.redis.setex(metadataKey, 86400, JSON.stringify(metadata)); // 24 hours TTL + + logger.debug('Processing metadata updated', { creatorAddress }); + } catch (error) { + logger.error('Failed to update processing metadata', { + error: error.message, + creatorAddress + }); + } + } + + /** + * Get processing metadata for a creator + */ + async getProcessingMetadata(creatorAddress) { + try { + const metadataKey = `leaderboard:metadata:${creatorAddress}`; + const metadata = await this.redis.get(metadataKey); + return metadata ? JSON.parse(metadata) : null; + } catch (error) { + logger.error('Failed to get processing metadata', { + error: error.message, + creatorAddress + }); + return null; + } + } + + /** + * Force recalculation for a specific creator + */ + async forceRecalculate(creatorAddress, seasons = null) { + try { + logger.info('Force recalculating leaderboards', { creatorAddress, seasons }); + + // Invalidate existing cache + if (!seasons) { + seasons = await this.leaderboardService.getAvailableSeasons(creatorAddress); + } + + const results = []; + for (const season of seasons) { + await this.leaderboardService.invalidateCache(creatorAddress, season); + const leaderboard = await this.leaderboardService.generateLeaderboard( + creatorAddress, + season, + 1000 + ); + results.push({ season, fanCount: leaderboard.length }); + } + + await this.updateProcessingMetadata(creatorAddress, results); + + logger.info('Force recalculation completed', { + creatorAddress, + seasons, + results + }); + + return results; + } catch (error) { + logger.error('Force recalculation failed', { + error: error.message, + creatorAddress, + seasons + }); + throw error; + } + } + + /** + * Get worker status + */ + getStatus() { + return { + isRunning: this.isRunning, + intervalMs: this.intervalMs, + batchSize: this.batchSize, + nextProcessTime: this.processingInterval ? Date.now() + this.intervalMs : null + }; + } + + /** + * Get worker statistics + */ + async getWorkerStats() { + try { + // Get total creators + const totalCreators = this.database.db.prepare(` + SELECT COUNT(DISTINCT id) as count FROM creators + `).get().count; + + // Get active creators + const activeCreators = (await this.getActiveCreators()).length; + + // Get cache statistics + const cacheStats = await this.getCacheStats(); + + // Get last processing time + const lastProcessing = await this.getLastProcessingTime(); + + return { + totalCreators, + activeCreators, + cacheStats, + lastProcessing, + isRunning: this.isRunning, + intervalMs: this.intervalMs + }; + } catch (error) { + logger.error('Failed to get worker stats', { error: error.message }); + return null; + } + } + + /** + * Get cache statistics + */ + async getCacheStats() { + try { + const keys = await this.redis.keys(`${this.leaderboardService.prefix}*`); + const leaderboards = keys.filter(key => !key.includes(':metadata:')); + const metadata = keys.filter(key => key.includes(':metadata:')); + + // Get memory usage + const info = await this.redis.info('memory'); + const usedMemory = this.parseMemoryInfo(info); + + return { + totalKeys: keys.length, + leaderboardKeys: leaderboards.length, + metadataKeys: metadata.length, + usedMemoryMB: Math.round(usedMemory / 1024 / 1024) + }; + } catch (error) { + logger.error('Failed to get cache stats', { error: error.message }); + return { totalKeys: 0, leaderboardKeys: 0, metadataKeys: 0, usedMemoryMB: 0 }; + } + } + + /** + * Parse Redis memory info + */ + parseMemoryInfo(info) { + const match = info.match(/used_memory:(\d+)/); + return match ? parseInt(match[1]) : 0; + } + + /** + * Get last processing time + */ + async getLastProcessingTime() { + try { + const keys = await this.redis.keys(`${this.leaderboardService.prefix}:metadata:*`); + if (keys.length === 0) return null; + + const metadatas = await Promise.all( + keys.map(async key => { + const data = await this.redis.get(key); + return data ? JSON.parse(data) : null; + }) + ); + + const validMetadatas = metadatas.filter(Boolean); + if (validMetadatas.length === 0) return null; + + // Find the most recent processing time + const mostRecent = validMetadatas.reduce((latest, current) => { + return current.lastProcessed > latest.lastProcessed ? current : latest; + }); + + return mostRecent.lastProcessed; + } catch (error) { + logger.error('Failed to get last processing time', { error: error.message }); + return null; + } + } + + /** + * Clean up old cache entries + */ + async cleanupOldCache(daysToKeep = 30) { + try { + const cutoffDate = new Date(Date.now() - (daysToKeep * 24 * 60 * 60 * 1000)); + const keys = await this.redis.keys(`${this.leaderboardService.prefix}*`); + + let deletedCount = 0; + for (const key of keys) { + const ttl = await this.redis.ttl(key); + if (ttl === -1) { // No expiry set + await this.redis.expire(key, this.leaderboardService.cacheTTL); + } + } + + logger.info('Cache cleanup completed', { + keysProcessed: keys.length, + deletedCount + }); + + return deletedCount; + } catch (error) { + logger.error('Cache cleanup failed', { error: error.message }); + throw error; + } + } + + /** + * Export leaderboard data for a creator + */ + async exportLeaderboard(creatorAddress, season = null, format = 'json') { + try { + const leaderboard = await this.leaderboardService.generateLeaderboard( + creatorAddress, + season, + 1000 + ); + + const exportData = { + creatorAddress, + season: season || 'current', + exportedAt: new Date().toISOString(), + totalFans: leaderboard.length, + fans: leaderboard.map(entry => ({ + rank: entry.rank, + fanAddress: entry.fanAddress, + score: entry.score, + metrics: entry.metrics, + lastUpdated: entry.lastUpdated + })) + }; + + if (format === 'csv') { + return this.convertToCSV(exportData); + } + + return exportData; + } catch (error) { + logger.error('Failed to export leaderboard', { + error: error.message, + creatorAddress, + season, + format + }); + throw error; + } + } + + /** + * Convert leaderboard data to CSV + */ + convertToCSV(data) { + const headers = [ + 'Rank', 'Fan Address', 'Score', 'Streaming Amount', 'Transaction Count', + 'Subscription Active', 'Subscription Days', 'Comment Count', 'Like Count', 'Share Count' + ]; + + const rows = data.fans.map(fan => [ + fan.rank, + fan.fanAddress, + fan.score, + fan.metrics.streaming.totalAmount, + fan.metrics.streaming.transactionCount, + fan.metrics.subscription.isActive, + fan.metrics.subscription.longevityDays, + fan.metrics.engagement.commentCount, + fan.metrics.engagement.likeCount, + fan.metrics.engagement.shareCount + ]); + + const csvContent = [headers, ...rows] + .map(row => row.map(field => `"${field}"`).join(',')) + .join('\n'); + + return csvContent; + } + + /** + * Sleep utility for delays + */ + sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} + +module.exports = LeaderboardWorker;