Get Started
Features
Services
Implementation
Webhook Implementation Guide
This guide shows you how to implement webhook handling for ContactManager events in different languages.
Setup Steps
- Create webhook credentials in your dashboard at dash.contactsmanager.io/webhooks
- Set up a server endpoint to receive webhook events
- Verify webhook signatures to ensure security
- Handle different event types appropriately
Using Our SDKs
For the easiest implementation, use our official SDKs:
Both SDKs provide built-in webhook signature verification.
Handling Webhook Retries
ContactManager implements an automatic retry mechanism to ensure reliable delivery of webhook events:
- We make up to 3 delivery attempts for each webhook (initial attempt + 2 retries)
- If your endpoint returns a non-2xx response code or times out, we retry automatically
- Retry schedule:
- First retry: 3 seconds after the initial attempt
- Second retry: 7 seconds after the first retry
Implementing Idempotency
Since a webhook may be delivered multiple times due to retries, your webhook handler should be idempotent (processing the same event multiple times should not cause duplicate effects). Here’s how to implement idempotency:
- Use the unique webhook
id
field for deduplication - Before processing a webhook, check if you’ve already processed that ID
- If the ID has been processed, return a 200 response without further processing
Implementation Examples
First, install the ContactsManager SDK:
pip install contactsmanager
Then implement your webhook handler:
from fastapi import FastAPI, Request, HTTPException, Header
from contactsmanager import ContactsManagerClient
import json
import asyncio
app = FastAPI()
# Initialize the ContactsManager client with your credentials
client = ContactsManagerClient(
api_key="your_api_key",
api_secret="your_api_secret",
org_id="your_org_id",
)
# Set webhook secret from your dashboard
client.set_webhook_secret("your_webhook_secret")
# Store processed webhook IDs (use a database in production)
processed_webhook_ids = set()
@app.post("/webhooks/contactsmanager")
async def webhook_handler(request: Request, x_webhook_signature: str = Header(None)):
# Read the request body
body = await request.body()
payload = json.loads(body)
# Verify webhook signature
if not client.verify_webhook_signature(body, x_webhook_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Check for duplicate webhook delivery
webhook_id = payload.get("id")
if webhook_id in processed_webhook_ids:
# Already processed this webhook, return success
return {"status": "received"}
# Process based on event type
event_type = payload.get("event")
event_data = payload.get("payload")
# Process webhook without blocking the response
asyncio.create_task(process_webhook_event(event_type, event_data))
# Mark as processed
processed_webhook_ids.add(webhook_id)
# Return success immediately
return {"status": "received"}
async def process_webhook_event(event_type, event_data):
# Handle different event types
if event_type == "user.new":
# Handle new user event
print(f"New user created: {event_data.get('email')}")
pass
elif event_type == "user.is_contact":
# Handle user is contact event
print(f"User {event_data.get('email')} matched with {len(event_data.get('matched_user_ids'))} contacts")
pass
elif event_type == "user.follow":
# Handle user follow event
print(f"{event_data.get('follower_name')} followed {event_data.get('followed_name')}")
pass
elif event_type == "user.unfollow":
# Handle user unfollow event
print(f"{event_data.get('follower_name')} unfollowed {event_data.get('followed_name')}")
pass
elif event_type == "user.create_event":
# Handle event creation
print(f"New event created: {event_data.get('title')}")
pass
elif event_type == "user.update_event":
# Handle event update
print(f"Event updated: {event_data.get('title')}")
pass
elif event_type == "user.delete_event":
# Handle event deletion
print(f"Event deleted: {event_data.get('event_id')}")
pass
elif event_type == "user.deleted":
# Handle user deletion
print(f"User deleted: {event_data.get('email')} - {event_data.get('user_id')}")
# Clean up any external systems or data
pass
else:
print(f"Unhandled event type: {event_type}")
First, install the ContactsManager SDK:
pip install contactsmanager
Then implement your webhook handler:
from fastapi import FastAPI, Request, HTTPException, Header
from contactsmanager import ContactsManagerClient
import json
import asyncio
app = FastAPI()
# Initialize the ContactsManager client with your credentials
client = ContactsManagerClient(
api_key="your_api_key",
api_secret="your_api_secret",
org_id="your_org_id",
)
# Set webhook secret from your dashboard
client.set_webhook_secret("your_webhook_secret")
# Store processed webhook IDs (use a database in production)
processed_webhook_ids = set()
@app.post("/webhooks/contactsmanager")
async def webhook_handler(request: Request, x_webhook_signature: str = Header(None)):
# Read the request body
body = await request.body()
payload = json.loads(body)
# Verify webhook signature
if not client.verify_webhook_signature(body, x_webhook_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Check for duplicate webhook delivery
webhook_id = payload.get("id")
if webhook_id in processed_webhook_ids:
# Already processed this webhook, return success
return {"status": "received"}
# Process based on event type
event_type = payload.get("event")
event_data = payload.get("payload")
# Process webhook without blocking the response
asyncio.create_task(process_webhook_event(event_type, event_data))
# Mark as processed
processed_webhook_ids.add(webhook_id)
# Return success immediately
return {"status": "received"}
async def process_webhook_event(event_type, event_data):
# Handle different event types
if event_type == "user.new":
# Handle new user event
print(f"New user created: {event_data.get('email')}")
pass
elif event_type == "user.is_contact":
# Handle user is contact event
print(f"User {event_data.get('email')} matched with {len(event_data.get('matched_user_ids'))} contacts")
pass
elif event_type == "user.follow":
# Handle user follow event
print(f"{event_data.get('follower_name')} followed {event_data.get('followed_name')}")
pass
elif event_type == "user.unfollow":
# Handle user unfollow event
print(f"{event_data.get('follower_name')} unfollowed {event_data.get('followed_name')}")
pass
elif event_type == "user.create_event":
# Handle event creation
print(f"New event created: {event_data.get('title')}")
pass
elif event_type == "user.update_event":
# Handle event update
print(f"Event updated: {event_data.get('title')}")
pass
elif event_type == "user.delete_event":
# Handle event deletion
print(f"Event deleted: {event_data.get('event_id')}")
pass
elif event_type == "user.deleted":
# Handle user deletion
print(f"User deleted: {event_data.get('email')} - {event_data.get('user_id')}")
# Clean up any external systems or data
pass
else:
print(f"Unhandled event type: {event_type}")
First, install the ContactsManager SDK:
npm install @contactsmanager/server
Then implement your webhook handler:
const express = require('express');
const { ContactsManagerClient } = require('@contactsmanager/server');
const app = express();
app.use(express.json());
// Initialize the ContactsManager client
const client = new ContactsManagerClient({
apiKey: 'your_api_key',
apiSecret: 'your_api_secret',
orgId: 'your_org_id'
});
// Set webhook secret from your dashboard
client.setWebhookSecret('your_webhook_secret');
// For deduplication (use a database in production)
const processedWebhookIds = new Set();
// Webhook handler
app.post('/webhooks/contactsmanager', (req, res) => {
const signature = req.headers['x-webhook-signature'];
if (!signature) {
return res.status(400).json({ error: 'No signature provided' });
}
// Verify the webhook signature
if (!client.verifyWebhookSignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const { id, event, payload } = req.body;
// Check for duplicate webhook
if (processedWebhookIds.has(id)) {
return res.status(200).json({ received: true, duplicate: true });
}
// Add to processed list
processedWebhookIds.add(id);
// Return success response immediately
res.status(200).json({ received: true });
// Process the webhook asynchronously
switch (event) {
case 'user.new':
// Handle new user
console.log(`New user created: ${payload.email}`);
break;
case 'user.is_contact':
// Handle user is contact
console.log(`User ${payload.email} matched with ${payload.matched_user_ids.length} contacts`);
break;
case 'user.follow':
// Handle user follow
console.log(`${payload.follower_name} followed ${payload.followed_name}`);
break;
case 'user.unfollow':
// Handle user unfollow
console.log(`${payload.follower_name} unfollowed ${payload.followed_name}`);
break;
case 'user.create_event':
// Handle event creation
console.log(`New event created: ${payload.title}`);
break;
case 'user.update_event':
// Handle event update
console.log(`Event updated: ${payload.title}`);
break;
case 'user.delete_event':
// Handle event deletion
console.log(`Event deleted: ${payload.event_id}`);
break;
case 'user.deleted':
// Handle user deletion
console.log(`User deleted: ${payload.email} - ${payload.user_id}`);
// Clean up any external systems or references to this user
break;
default:
console.log(`Unhandled event type: ${event}`);
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
If you prefer not to use our SDK, here’s how to implement webhook verification manually:
const crypto = require('crypto');
const express = require('express');
const app = express();
app.use(express.json({
verify: (req, res, buf) => {
// Store the raw body for signature verification
req.rawBody = buf;
}
}));
// Your webhook secret from your dashboard
const WEBHOOK_SECRET = 'your_webhook_secret';
// Verify the signature from ContactsManager
function verifySignature(payload, signatureHeader) {
try {
// Parse the signature header
const components = {};
signatureHeader.split(',').forEach(part => {
const [key, value] = part.split('=');
components[key] = value;
});
if (!components.t || !components.v1) {
return false;
}
const timestamp = components.t;
const providedSignature = components.v1;
// Check if timestamp is not too old (15 minute window)
const currentTime = Math.floor(Date.now() / 1000);
if (currentTime - parseInt(timestamp) > 900) {
return false;
}
// Create the signature
const signedPayload = `${timestamp}.${JSON.stringify(payload)}`;
const expectedSignature = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(signedPayload)
.digest('hex');
// Compare signatures using constant-time comparison
return crypto.timingSafeEqual(
Buffer.from(providedSignature),
Buffer.from(expectedSignature)
);
} catch (error) {
console.error('Signature verification error:', error);
return false;
}
}
// For deduplication (use a database in production)
const processedWebhookIds = new Set();
app.post('/webhooks/contactsmanager', (req, res) => {
const signature = req.headers['x-webhook-signature'];
if (!signature) {
return res.status(400).json({ error: 'No signature provided' });
}
if (!verifySignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const { id, event, payload } = req.body;
// Check for duplicate webhook
if (processedWebhookIds.has(id)) {
return res.status(200).json({ received: true, duplicate: true });
}
// Add to processed list
processedWebhookIds.add(id);
// Return success response immediately
res.status(200).json({ received: true });
// Process the webhook asynchronously
switch (event) {
case 'user.new':
// Handle new user
break;
case 'user.is_contact':
// Handle user is contact
break;
case 'user.follow':
// Handle user follow
break;
case 'user.unfollow':
// Handle user unfollow
break;
case 'user.create_event':
// Handle event creation
break;
case 'user.update_event':
// Handle event update
break;
case 'user.delete_event':
// Handle event deletion
break;
case 'user.deleted':
// Handle user deletion event
// This is a good place to clean up any external data related to this user
break;
default:
console.log(`Unhandled event type: ${event}`);
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
If you prefer not to use our SDK, here’s how to implement webhook verification manually:
import hmac
import hashlib
import time
import json
from fastapi import FastAPI, Request, HTTPException, Header
app = FastAPI()
# Your webhook secret from your dashboard
WEBHOOK_SECRET = "your_webhook_secret"
# Verify the signature from ContactsManager
def verify_signature(payload, signature_header):
try:
# Parse the signature header
components = {}
for part in signature_header.split(','):
key, value = part.split('=')
components[key] = value
if 't' not in components or 'v1' not in components:
return False
timestamp = components['t']
given_signature = components['v1']
# Check if the timestamp is not too old (15 minute window)
current_time = int(time.time())
if current_time - int(timestamp) > 900: # 15 minutes
return False
# Compute the expected signature
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Compare signatures (constant-time comparison)
return hmac.compare_digest(given_signature, expected_signature)
except Exception:
return False
# For deduplication (use a database in production)
processed_webhook_ids = set()
@app.post("/webhooks/contactsmanager")
async def webhook_handler(request: Request, x_webhook_signature: str = Header(None)):
body = await request.body()
if not x_webhook_signature:
raise HTTPException(status_code=400, detail="No signature provided")
if not verify_signature(body, x_webhook_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(body)
webhook_id = payload.get("id")
# Check for duplicate webhook
if webhook_id in processed_webhook_ids:
return {"status": "received", "duplicate": True}
# Add to processed list
processed_webhook_ids.add(webhook_id)
# Process the webhook asynchronously
event_type = payload.get("event")
event_data = payload.get("payload")
# Handle different event types
if event_type == "user.new":
# Handle new user event
pass
elif event_type == "user.is_contact":
# Handle user is contact event
pass
elif event_type == "user.follow":
# Handle user follow event
pass
elif event_type == "user.unfollow":
# Handle user unfollow event
pass
elif event_type == "user.create_event":
# Handle event creation
pass
elif event_type == "user.update_event":
# Handle event update
pass
elif event_type == "user.delete_event":
# Handle event deletion
pass
elif event_type == "user.deleted":
# Handle user deletion
# Perform any cleanup operations in external systems
pass
else:
print(f"Unhandled event type: {event_type}")
return {"status": "received"}
Testing Your Webhook Implementation
- Deploy your webhook endpoint to a publicly accessible URL
- Register this URL in your ContactManager dashboard
- Generate test events in the dashboard to validate your implementation
- Check the logs to ensure events are being processed correctly