Implementing the Event Queue System

Build reliable analytics with Forge Events for async processing and error handling

What's covered here

  • Why queues improve analytics reliability
  • How to set up Forge Events for analytics
  • Queue consumer implementation
  • Error handling and retry logic
  • Monitoring queue health

πŸ“¦

Working Example Available

The following code is available in our sample app available on GitHub.

πŸ”— View the full Forge Analytics Example on GitHub

Why Use Queues for Analytics?

The Problem with Direct Calls

// ❌ Direct approach - fragile
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // If analytics fails, the whole operation fails
  await sendAnalytics('Todo Created', context.accountId);
  
  return todo;
});

Issues:

  • Analytics failures break core functionality
  • No retry mechanism
  • User waits for analytics calls
  • All-or-nothing processing

The Queue Solution

// βœ… Queue approach - reliable
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // Queue the event (fast, reliable)
  await analyticsQueue.push({
    type: 'track',
    userId: context.accountId,
    event: 'Todo Created'
  });
  
  return todo; // User gets immediate response
});

Benefits:

  • Core functionality never blocked by analytics
  • Automatic retry on failures
  • Batch processing efficiency
  • Decoupled architecture

Step 1: Queue Configuration

Update Manifest

Add queue and consumer to your manifest.yml:

modules:
  jira:issuePanel:
    - key: todo-panel
      resource: main
      resolver:
        function: resolver

  # Queue consumer for processing analytics events
  consumer:
    - key: analytics-consumer
      queue: analytics-queue
      resolver:
        function: analytics-consumer-func
        method: analytics-listener

  function:
    - key: resolver
      handler: index.handler
    - key: analytics-consumer-func
      handler: analytics/consumer.handler

permissions:
  external:
    fetch:
      backend:
        - address: "in.accoil.com"
          category: analytics
          inScopeEUD: false

Step 2: Queue Producer

Enhanced Events Module (src/analytics/events.js)

import { Queue } from '@forge/events';

// Initialize the analytics queue
const analyticsQueue = new Queue({ key: 'analytics-queue' });

/**
 * Get privacy-safe identifiers from context
 */
const getAnalyticsIds = (context) => {
  // Option 1: Individual user tracking
  const userId = context.accountId;
  
  // Option 2: Account-level tracking (cost optimization)
  // const userId = context.cloudId;
  
  const groupId = context.cloudId;
  
  return { userId, groupId };
};

/**
 * Core tracking function that queues events
 * 
 * Bundles identify, group, and track events for complete user profile.
 * All events are processed asynchronously via queue consumer.
 */
export const track = async (context, eventName) => {
  const { userId, groupId } = getAnalyticsIds(context);
  
  // Minimal traits to avoid PII
  const identifyTraits = { name: userId };
  const groupTraits = { name: groupId };

  // Bundle all event types for atomic processing
  const events = [
    {
      type: 'identify',
      userId: userId,
      groupId: groupId,
      traits: identifyTraits
    },
    {
      type: 'group',
      groupId: groupId,
      traits: groupTraits
    },
    {
      type: 'track',
      userId: userId,
      event: eventName
    }
  ];

  // Queue events for async processing
  await analyticsQueue.push(events);
};

/**
 * BACKEND EVENT DEFINITIONS
 * All events use the central track() function
 */
export const trackTodoCreated = (context) => track(context, 'Todo Created');
export const trackTodoUpdated = (context) => track(context, 'Todo Updated');
export const trackTodoDeleted = (context) => track(context, 'Todo Deleted');
export const trackTodosCleared = (context) => track(context, 'Todos Cleared');

Step 3: Queue Consumer

Consumer Implementation (src/analytics/consumer.js)

import Resolver from '@forge/resolver';
import { sendAnalytics } from './dispatcher';

const resolver = new Resolver();

/**
 * Process analytics events from the queue
 * 
 * This consumer runs asynchronously and handles different event types.
 * Events are processed reliably with automatic retry on failures.
 */
resolver.define('analytics-listener', async ({ payload }) => {
  try {
    await processAnalyticsEvent(payload);
  } catch (error) {
    // Log error but let Forge Events handle retry
    console.error('[Analytics Consumer] Processing failed:', error.message);
    throw error; // Re-throw to trigger retry
  }
});

/**
 * Route different event types to appropriate handlers
 */
const processAnalyticsEvent = async (event) => {
  switch (event.type) {
    case 'identify':
      await handleIdentifyEvent(event);
      break;
      
    case 'group':
      await handleGroupEvent(event);
      break;
      
    case 'track':
      await handleTrackEvent(event);
      break;
      
    default:
      console.warn(`[Analytics Consumer] Unknown event type: ${event.type}`);
  }
};

/**
 * Handle user identification events
 */
const handleIdentifyEvent = async (event) => {
  await sendAnalytics('users', {
    user_id: event.userId,
    group_id: event.groupId,
    traits: event.traits || {}
  });
};

/**
 * Handle group/organization events
 */
const handleGroupEvent = async (event) => {
  await sendAnalytics('groups', {
    group_id: event.groupId,
    traits: event.traits || {}
  });
};

/**
 * Handle tracking events
 */
const handleTrackEvent = async (event) => {
  await sendAnalytics('events', {
    user_id: event.userId,
    event: event.event
  });
};

export const handler = resolver.getDefinitions();

Enhanced Dispatcher (src/analytics/dispatcher.js)

import { fetch } from '@forge/api';

/**
 * Send analytics events to provider with retry logic
 */
export const sendAnalytics = async (endpoint, payload) => {
  const apiKey = process.env.ANALYTICS_API_KEY;
  
  if (!apiKey) {
    throw new Error('Analytics API key not configured');
  }

  // Debug mode
  if (process.env.ANALYTICS_DEBUG === 'true') {
    console.log(`[Analytics Debug] ${endpoint}:`, JSON.stringify(payload, null, 2));
    return;
  }

  const url = `https://in.accoil.com/v1/${endpoint}`;
  const requestPayload = {
    ...payload,
    api_key: apiKey,
    timestamp: Date.now()
  };

  // Retry configuration
  const maxRetries = 3;
  const baseDelay = 1000; // 1 second

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify(requestPayload)
      });

      if (response.ok) {
        return; // Success
      }

      // Non-retryable errors (client errors)
      if (response.status >= 400 && response.status < 500) {
        throw new Error(`Client error ${response.status}: ${await response.text()}`);
      }

      // Server errors - retry
      if (attempt === maxRetries) {
        throw new Error(`Server error ${response.status} after ${maxRetries} attempts`);
      }

    } catch (error) {
      if (attempt === maxRetries) {
        throw error;
      }

      // Exponential backoff
      const delay = baseDelay * Math.pow(2, attempt - 1);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
};

Step 4: Integration with Resolvers

Enhanced Resolver Integration (src/analytics/resolvers.js)

import { track } from './events';

/**
 * Resolver for frontend-triggered events
 * Routes events through the queue system
 */
export const trackEvent = async ({ payload, context }) => {
  await track(context, payload.event);
};

// Optional: Direct identify/group resolvers (rarely needed)
export const identify = async ({ context }) => {
  // Implementation if needed for special cases
};

export const group = async ({ context }) => {
  // Implementation if needed for special cases
};

Update Main Resolver (src/index.js)

import Resolver from '@forge/resolver';
import { trackEvent } from './analytics/resolvers';
import { trackTodoCreated, trackTodoUpdated, trackTodoDeleted, trackTodosCleared } from './analytics/events';

const resolver = new Resolver();

// Business logic with analytics
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // Queue analytics event (non-blocking)
  await trackTodoCreated(context);
  
  return todo;
});

resolver.define('update-todo', async ({ payload, context }) => {
  const todo = await updateTodo(payload);
  await trackTodoUpdated(context);
  return todo;
});

resolver.define('delete-todo', async ({ payload, context }) => {
  const result = await deleteTodo(payload);
  await trackTodoDeleted(context);
  return result;
});

resolver.define('delete-all-todos', async ({ context }) => {
  const result = await deleteAllTodos(context);
  await trackTodosCleared(context);
  return result;
});

// Frontend analytics resolver
resolver.define('track-event', trackEvent);

export const handler = resolver.getDefinitions();

Step 5: Testing the Queue System

Local Testing

  1. Enable Debug Mode:
forge variables set ANALYTICS_DEBUG true
forge deploy
  1. Monitor Logs:
forge logs --tail
  1. Trigger Events:
    Use your app and watch for queue processing:
[Analytics Debug] events: {
  "user_id": "557058:...",
  "event": "Todo Created",
  "timestamp": 1704067200000
}

Queue Health Monitoring

Add monitoring to your consumer:

// Enhanced consumer with monitoring
resolver.define('analytics-listener', async ({ payload }) => {
  const startTime = Date.now();
  
  try {
    await processAnalyticsEvent(payload);
    
    const duration = Date.now() - startTime;
    console.log(`[Analytics Consumer] Processed ${payload.type} in ${duration}ms`);
    
  } catch (error) {
    console.error(`[Analytics Consumer] Failed to process ${payload.type}:`, error.message);
    throw error;
  }
});

Advanced Patterns

Dead Letter Handling

Handle events that consistently fail:

resolver.define('analytics-listener', async ({ payload, context }) => {
  try {
    await processAnalyticsEvent(payload);
  } catch (error) {
    // Check retry count
    const retryCount = context.retryCount || 0;
    
    if (retryCount >= 5) {
      // Send to dead letter queue or log for manual review
      console.error('[Analytics] Event failed after 5 retries:', payload);
      await deadLetterQueue.push(payload);
      return; // Don't re-throw
    }
    
    throw error; // Trigger retry
  }
});

Production Checklist

  • Consumer function deployed and registered
  • Queue key matches in producer and consumer
  • Error handling prevents infinite retries
  • Debug mode disabled in production
  • Monitoring in place for queue health
  • Dead letter queue for failed events
  • API rate limiting handled

Key Takeaways

  1. Non blocking - Analytics events can't hold up your application
  2. Decouple analytics from business logic - Core functionality unaffected
  3. Automatic retry handling - Forge Events handles failures
  4. Async processing - Better user experience
  5. Monitoring is essential - Know when things break