Implementing the Event Queue System

Build reliable analytics with Forge Events for async processing and error handling

What's covered here

  • Why queues improve analytics reliability
  • How to set up Forge Events for analytics
  • Queue consumer implementation
  • Error handling and retry logic
  • Monitoring queue health
📦

Working Example Available

The following code is available in our sample app available on GitHub.

🔗 View the full Forge Analytics Example on GitHub

Why Use Queues for Analytics?

The Problem with Direct Calls

// ❌ Direct approach - fragile
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // If analytics fails, the whole operation fails
  await sendAnalytics('Todo Created', context.accountId);
  
  return todo;
});

Issues:

  • Analytics failures break core functionality
  • No retry mechanism
  • User waits for analytics calls
  • All-or-nothing processing

The Queue Solution

// ✅ Queue approach - reliable
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // Queue the event (fast, reliable)
  await analyticsQueue.push({
    type: 'track',
    userId: context.accountId,
    event: 'Todo Created'
  });
  
  return todo; // User gets immediate response
});

Benefits:

  • Core functionality never blocked by analytics
  • Automatic retry on failures
  • Batch processing efficiency
  • Decoupled architecture

Step 1: Queue Configuration

Update Manifest

Add queue and consumer to your manifest.yml:

modules:
  jira:issuePanel:
    - key: todo-panel
      resource: main
      resolver:
        function: resolver

  # Queue consumer for processing analytics events
  consumer:
    - key: analytics-consumer
      queue: analytics-queue
      resolver:
        function: analytics-consumer-func
        method: analytics-listener

  function:
    - key: resolver
      handler: index.handler
    - key: analytics-consumer-func
      handler: analytics/consumer.handler

permissions:
  external:
    fetch:
      backend:
        - address: "in.accoil.com"
          category: analytics
          inScopeEUD: false

Step 2: Queue Producer

Enhanced Events Module (src/analytics/events.js)

import { Queue } from '@forge/events';

// Initialize the analytics queue
const analyticsQueue = new Queue({ key: 'analytics-queue' });

/**
 * Get privacy-safe identifiers from context
 */
const getAnalyticsIds = (context) => {
  // Option 1: Individual user tracking
  const userId = context.accountId;
  
  // Option 2: Account-level tracking (cost optimization)
  // const userId = context.cloudId;
  
  const groupId = context.cloudId;
  
  return { userId, groupId };
};

/**
 * Core tracking function that queues events
 * 
 * Bundles identify, group, and track events for complete user profile.
 * All events are processed asynchronously via queue consumer.
 */
export const track = async (context, eventName) => {
  const { userId, groupId } = getAnalyticsIds(context);
  
  // Minimal traits to avoid PII
  const identifyTraits = { name: userId };
  const groupTraits = { name: groupId };

  // Bundle all event types for atomic processing
  const events = [
    {
      type: 'identify',
      userId: userId,
      groupId: groupId,
      traits: identifyTraits
    },
    {
      type: 'group',
      groupId: groupId,
      traits: groupTraits
    },
    {
      type: 'track',
      userId: userId,
      event: eventName
    }
  ];

  // Queue events for async processing
  await analyticsQueue.push(events);
};

/**
 * BACKEND EVENT DEFINITIONS
 * All events use the central track() function
 */
export const trackTodoCreated = (context) => track(context, 'Todo Created');
export const trackTodoUpdated = (context) => track(context, 'Todo Updated');
export const trackTodoDeleted = (context) => track(context, 'Todo Deleted');
export const trackTodosCleared = (context) => track(context, 'Todos Cleared');

Step 3: Queue Consumer

Consumer Implementation (src/analytics/consumer.js)

import Resolver from '@forge/resolver';
import { sendAnalytics } from './dispatcher';

const resolver = new Resolver();

/**
 * Process analytics events from the queue
 * 
 * This consumer runs asynchronously and handles different event types.
 * Events are processed reliably with automatic retry on failures.
 */
resolver.define('analytics-listener', async ({ payload }) => {
  try {
    await processAnalyticsEvent(payload);
  } catch (error) {
    // Log error but let Forge Events handle retry
    console.error('[Analytics Consumer] Processing failed:', error.message);
    throw error; // Re-throw to trigger retry
  }
});

/**
 * Route different event types to appropriate handlers
 */
const processAnalyticsEvent = async (event) => {
  switch (event.type) {
    case 'identify':
      await handleIdentifyEvent(event);
      break;
      
    case 'group':
      await handleGroupEvent(event);
      break;
      
    case 'track':
      await handleTrackEvent(event);
      break;
      
    default:
      console.warn(`[Analytics Consumer] Unknown event type: ${event.type}`);
  }
};

/**
 * Handle user identification events
 */
const handleIdentifyEvent = async (event) => {
  await sendAnalytics('users', {
    user_id: event.userId,
    group_id: event.groupId,
    traits: event.traits || {}
  });
};

/**
 * Handle group/organization events
 */
const handleGroupEvent = async (event) => {
  await sendAnalytics('groups', {
    group_id: event.groupId,
    traits: event.traits || {}
  });
};

/**
 * Handle tracking events
 */
const handleTrackEvent = async (event) => {
  await sendAnalytics('events', {
    user_id: event.userId,
    event: event.event
  });
};

export const handler = resolver.getDefinitions();

Enhanced Dispatcher (src/analytics/dispatcher.js)

import { fetch } from '@forge/api';

/**
 * Send analytics events to provider with retry logic
 */
export const sendAnalytics = async (endpoint, payload) => {
  const apiKey = process.env.ANALYTICS_API_KEY;
  
  if (!apiKey) {
    throw new Error('Analytics API key not configured');
  }

  // Debug mode
  if (process.env.ANALYTICS_DEBUG === 'true') {
    console.log(`[Analytics Debug] ${endpoint}:`, JSON.stringify(payload, null, 2));
    return;
  }

  const url = `https://in.accoil.com/v1/${endpoint}`;
  const requestPayload = {
    ...payload,
    api_key: apiKey,
    timestamp: Date.now()
  };

  // Retry configuration
  const maxRetries = 3;
  const baseDelay = 1000; // 1 second

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        },
        body: JSON.stringify(requestPayload)
      });

      if (response.ok) {
        return; // Success
      }

      // Non-retryable errors (client errors)
      if (response.status >= 400 && response.status < 500) {
        throw new Error(`Client error ${response.status}: ${await response.text()}`);
      }

      // Server errors - retry
      if (attempt === maxRetries) {
        throw new Error(`Server error ${response.status} after ${maxRetries} attempts`);
      }

    } catch (error) {
      if (attempt === maxRetries) {
        throw error;
      }

      // Exponential backoff
      const delay = baseDelay * Math.pow(2, attempt - 1);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
};

Step 4: Integration with Resolvers

Enhanced Resolver Integration (src/analytics/resolvers.js)

import { track } from './events';

/**
 * Resolver for frontend-triggered events
 * Routes events through the queue system
 */
export const trackEvent = async ({ payload, context }) => {
  await track(context, payload.event);
};

// Optional: Direct identify/group resolvers (rarely needed)
export const identify = async ({ context }) => {
  // Implementation if needed for special cases
};

export const group = async ({ context }) => {
  // Implementation if needed for special cases
};

Update Main Resolver (src/index.js)

import Resolver from '@forge/resolver';
import { trackEvent } from './analytics/resolvers';
import { trackTodoCreated, trackTodoUpdated, trackTodoDeleted, trackTodosCleared } from './analytics/events';

const resolver = new Resolver();

// Business logic with analytics
resolver.define('create-todo', async ({ payload, context }) => {
  const todo = await createTodo(payload);
  
  // Queue analytics event (non-blocking)
  await trackTodoCreated(context);
  
  return todo;
});

resolver.define('update-todo', async ({ payload, context }) => {
  const todo = await updateTodo(payload);
  await trackTodoUpdated(context);
  return todo;
});

resolver.define('delete-todo', async ({ payload, context }) => {
  const result = await deleteTodo(payload);
  await trackTodoDeleted(context);
  return result;
});

resolver.define('delete-all-todos', async ({ context }) => {
  const result = await deleteAllTodos(context);
  await trackTodosCleared(context);
  return result;
});

// Frontend analytics resolver
resolver.define('track-event', trackEvent);

export const handler = resolver.getDefinitions();

Step 5: Testing the Queue System

Local Testing

  1. Enable Debug Mode:
forge variables set ANALYTICS_DEBUG true
forge deploy
  1. Monitor Logs:
forge logs --tail
  1. Trigger Events:
    Use your app and watch for queue processing:
[Analytics Debug] events: {
  "user_id": "557058:...",
  "event": "Todo Created",
  "timestamp": 1704067200000
}

Queue Health Monitoring

Add monitoring to your consumer:

// Enhanced consumer with monitoring
resolver.define('analytics-listener', async ({ payload }) => {
  const startTime = Date.now();
  
  try {
    await processAnalyticsEvent(payload);
    
    const duration = Date.now() - startTime;
    console.log(`[Analytics Consumer] Processed ${payload.type} in ${duration}ms`);
    
  } catch (error) {
    console.error(`[Analytics Consumer] Failed to process ${payload.type}:`, error.message);
    throw error;
  }
});

Advanced Patterns

Dead Letter Handling

Handle events that consistently fail:

resolver.define('analytics-listener', async ({ payload, context }) => {
  try {
    await processAnalyticsEvent(payload);
  } catch (error) {
    // Check retry count
    const retryCount = context.retryCount || 0;
    
    if (retryCount >= 5) {
      // Send to dead letter queue or log for manual review
      console.error('[Analytics] Event failed after 5 retries:', payload);
      await deadLetterQueue.push(payload);
      return; // Don't re-throw
    }
    
    throw error; // Trigger retry
  }
});

Production Checklist

  • Consumer function deployed and registered
  • Queue key matches in producer and consumer
  • Error handling prevents infinite retries
  • Debug mode disabled in production
  • Monitoring in place for queue health
  • Dead letter queue for failed events
  • API rate limiting handled

Key Takeaways

  1. Non blocking - Analytics events can't hold up your application
  2. Decouple analytics from business logic - Core functionality unaffected
  3. Automatic retry handling - Forge Events handles failures
  4. Async processing - Better user experience
  5. Monitoring is essential - Know when things break