Implementing the Event Queue System
Build reliable analytics with Forge Events for async processing and error handling
What's covered here
- Why queues improve analytics reliability
- How to set up Forge Events for analytics
- Queue consumer implementation
- Error handling and retry logic
- Monitoring queue health
Working Example Available
The following code is available in our sample app available on GitHub.
π View the full Forge Analytics Example on GitHub
Why Use Queues for Analytics?
The Problem with Direct Calls
// β Direct approach - fragile
resolver.define('create-todo', async ({ payload, context }) => {
const todo = await createTodo(payload);
// If analytics fails, the whole operation fails
await sendAnalytics('Todo Created', context.accountId);
return todo;
});
Issues:
- Analytics failures break core functionality
- No retry mechanism
- User waits for analytics calls
- All-or-nothing processing
The Queue Solution
// β
Queue approach - reliable
resolver.define('create-todo', async ({ payload, context }) => {
const todo = await createTodo(payload);
// Queue the event (fast, reliable)
await analyticsQueue.push({
type: 'track',
userId: context.accountId,
event: 'Todo Created'
});
return todo; // User gets immediate response
});
Benefits:
- Core functionality never blocked by analytics
- Automatic retry on failures
- Batch processing efficiency
- Decoupled architecture
Step 1: Queue Configuration
Update Manifest
Add queue and consumer to your manifest.yml
:
modules:
jira:issuePanel:
- key: todo-panel
resource: main
resolver:
function: resolver
# Queue consumer for processing analytics events
consumer:
- key: analytics-consumer
queue: analytics-queue
resolver:
function: analytics-consumer-func
method: analytics-listener
function:
- key: resolver
handler: index.handler
- key: analytics-consumer-func
handler: analytics/consumer.handler
permissions:
external:
fetch:
backend:
- address: "in.accoil.com"
category: analytics
inScopeEUD: false
Step 2: Queue Producer
Enhanced Events Module (src/analytics/events.js
)
src/analytics/events.js
)import { Queue } from '@forge/events';
// Initialize the analytics queue
const analyticsQueue = new Queue({ key: 'analytics-queue' });
/**
* Get privacy-safe identifiers from context
*/
const getAnalyticsIds = (context) => {
// Option 1: Individual user tracking
const userId = context.accountId;
// Option 2: Account-level tracking (cost optimization)
// const userId = context.cloudId;
const groupId = context.cloudId;
return { userId, groupId };
};
/**
* Core tracking function that queues events
*
* Bundles identify, group, and track events for complete user profile.
* All events are processed asynchronously via queue consumer.
*/
export const track = async (context, eventName) => {
const { userId, groupId } = getAnalyticsIds(context);
// Minimal traits to avoid PII
const identifyTraits = { name: userId };
const groupTraits = { name: groupId };
// Bundle all event types for atomic processing
const events = [
{
type: 'identify',
userId: userId,
groupId: groupId,
traits: identifyTraits
},
{
type: 'group',
groupId: groupId,
traits: groupTraits
},
{
type: 'track',
userId: userId,
event: eventName
}
];
// Queue events for async processing
await analyticsQueue.push(events);
};
/**
* BACKEND EVENT DEFINITIONS
* All events use the central track() function
*/
export const trackTodoCreated = (context) => track(context, 'Todo Created');
export const trackTodoUpdated = (context) => track(context, 'Todo Updated');
export const trackTodoDeleted = (context) => track(context, 'Todo Deleted');
export const trackTodosCleared = (context) => track(context, 'Todos Cleared');
Step 3: Queue Consumer
Consumer Implementation (src/analytics/consumer.js
)
src/analytics/consumer.js
)import Resolver from '@forge/resolver';
import { sendAnalytics } from './dispatcher';
const resolver = new Resolver();
/**
* Process analytics events from the queue
*
* This consumer runs asynchronously and handles different event types.
* Events are processed reliably with automatic retry on failures.
*/
resolver.define('analytics-listener', async ({ payload }) => {
try {
await processAnalyticsEvent(payload);
} catch (error) {
// Log error but let Forge Events handle retry
console.error('[Analytics Consumer] Processing failed:', error.message);
throw error; // Re-throw to trigger retry
}
});
/**
* Route different event types to appropriate handlers
*/
const processAnalyticsEvent = async (event) => {
switch (event.type) {
case 'identify':
await handleIdentifyEvent(event);
break;
case 'group':
await handleGroupEvent(event);
break;
case 'track':
await handleTrackEvent(event);
break;
default:
console.warn(`[Analytics Consumer] Unknown event type: ${event.type}`);
}
};
/**
* Handle user identification events
*/
const handleIdentifyEvent = async (event) => {
await sendAnalytics('users', {
user_id: event.userId,
group_id: event.groupId,
traits: event.traits || {}
});
};
/**
* Handle group/organization events
*/
const handleGroupEvent = async (event) => {
await sendAnalytics('groups', {
group_id: event.groupId,
traits: event.traits || {}
});
};
/**
* Handle tracking events
*/
const handleTrackEvent = async (event) => {
await sendAnalytics('events', {
user_id: event.userId,
event: event.event
});
};
export const handler = resolver.getDefinitions();
Enhanced Dispatcher (src/analytics/dispatcher.js
)
src/analytics/dispatcher.js
)import { fetch } from '@forge/api';
/**
* Send analytics events to provider with retry logic
*/
export const sendAnalytics = async (endpoint, payload) => {
const apiKey = process.env.ANALYTICS_API_KEY;
if (!apiKey) {
throw new Error('Analytics API key not configured');
}
// Debug mode
if (process.env.ANALYTICS_DEBUG === 'true') {
console.log(`[Analytics Debug] ${endpoint}:`, JSON.stringify(payload, null, 2));
return;
}
const url = `https://in.accoil.com/v1/${endpoint}`;
const requestPayload = {
...payload,
api_key: apiKey,
timestamp: Date.now()
};
// Retry configuration
const maxRetries = 3;
const baseDelay = 1000; // 1 second
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(requestPayload)
});
if (response.ok) {
return; // Success
}
// Non-retryable errors (client errors)
if (response.status >= 400 && response.status < 500) {
throw new Error(`Client error ${response.status}: ${await response.text()}`);
}
// Server errors - retry
if (attempt === maxRetries) {
throw new Error(`Server error ${response.status} after ${maxRetries} attempts`);
}
} catch (error) {
if (attempt === maxRetries) {
throw error;
}
// Exponential backoff
const delay = baseDelay * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
};
Step 4: Integration with Resolvers
Enhanced Resolver Integration (src/analytics/resolvers.js
)
src/analytics/resolvers.js
)import { track } from './events';
/**
* Resolver for frontend-triggered events
* Routes events through the queue system
*/
export const trackEvent = async ({ payload, context }) => {
await track(context, payload.event);
};
// Optional: Direct identify/group resolvers (rarely needed)
export const identify = async ({ context }) => {
// Implementation if needed for special cases
};
export const group = async ({ context }) => {
// Implementation if needed for special cases
};
Update Main Resolver (src/index.js
)
src/index.js
)import Resolver from '@forge/resolver';
import { trackEvent } from './analytics/resolvers';
import { trackTodoCreated, trackTodoUpdated, trackTodoDeleted, trackTodosCleared } from './analytics/events';
const resolver = new Resolver();
// Business logic with analytics
resolver.define('create-todo', async ({ payload, context }) => {
const todo = await createTodo(payload);
// Queue analytics event (non-blocking)
await trackTodoCreated(context);
return todo;
});
resolver.define('update-todo', async ({ payload, context }) => {
const todo = await updateTodo(payload);
await trackTodoUpdated(context);
return todo;
});
resolver.define('delete-todo', async ({ payload, context }) => {
const result = await deleteTodo(payload);
await trackTodoDeleted(context);
return result;
});
resolver.define('delete-all-todos', async ({ context }) => {
const result = await deleteAllTodos(context);
await trackTodosCleared(context);
return result;
});
// Frontend analytics resolver
resolver.define('track-event', trackEvent);
export const handler = resolver.getDefinitions();
Step 5: Testing the Queue System
Local Testing
- Enable Debug Mode:
forge variables set ANALYTICS_DEBUG true
forge deploy
- Monitor Logs:
forge logs --tail
- Trigger Events:
Use your app and watch for queue processing:
[Analytics Debug] events: {
"user_id": "557058:...",
"event": "Todo Created",
"timestamp": 1704067200000
}
Queue Health Monitoring
Add monitoring to your consumer:
// Enhanced consumer with monitoring
resolver.define('analytics-listener', async ({ payload }) => {
const startTime = Date.now();
try {
await processAnalyticsEvent(payload);
const duration = Date.now() - startTime;
console.log(`[Analytics Consumer] Processed ${payload.type} in ${duration}ms`);
} catch (error) {
console.error(`[Analytics Consumer] Failed to process ${payload.type}:`, error.message);
throw error;
}
});
Advanced Patterns
Dead Letter Handling
Handle events that consistently fail:
resolver.define('analytics-listener', async ({ payload, context }) => {
try {
await processAnalyticsEvent(payload);
} catch (error) {
// Check retry count
const retryCount = context.retryCount || 0;
if (retryCount >= 5) {
// Send to dead letter queue or log for manual review
console.error('[Analytics] Event failed after 5 retries:', payload);
await deadLetterQueue.push(payload);
return; // Don't re-throw
}
throw error; // Trigger retry
}
});
Production Checklist
- Consumer function deployed and registered
- Queue key matches in producer and consumer
- Error handling prevents infinite retries
- Debug mode disabled in production
- Monitoring in place for queue health
- Dead letter queue for failed events
- API rate limiting handled
Key Takeaways
- Non blocking - Analytics events can't hold up your application
- Decouple analytics from business logic - Core functionality unaffected
- Automatic retry handling - Forge Events handles failures
- Async processing - Better user experience
- Monitoring is essential - Know when things break
Updated 14 days ago