Skip to main content

Webhook Implementation Guide

This guide shows you how to implement webhook handling for ContactManager events in different languages.

Setup Steps

  1. Create webhook credentials in your dashboard at dash.contactsmanager.io/webhooks
  2. Set up a server endpoint to receive webhook events
  3. Verify webhook signatures to ensure security
  4. Handle different event types appropriately

Using Our SDKs

For the easiest implementation, use our official SDKs: Both SDKs provide built-in webhook signature verification.

Handling Webhook Retries

ContactManager implements an automatic retry mechanism to ensure reliable delivery of webhook events:
  • We make up to 3 delivery attempts for each webhook (initial attempt + 2 retries)
  • If your endpoint returns a non-2xx response code or times out, we retry automatically
  • Retry schedule:
    • First retry: 3 seconds after the initial attempt
    • Second retry: 7 seconds after the first retry

Implementing Idempotency

Since a webhook may be delivered multiple times due to retries, your webhook handler should be idempotent (processing the same event multiple times should not cause duplicate effects). Here’s how to implement idempotency:
  1. Use the unique webhook id field for deduplication
  2. Before processing a webhook, check if you’ve already processed that ID
  3. If the ID has been processed, return a 200 response without further processing

Implementation Examples

  • Python
  • Node.js
  • Custom (JavaScript)
  • Custom (Python)
First, install the ContactsManager SDK:
pip install contactsmanager
Then implement your webhook handler:
from fastapi import FastAPI, Request, HTTPException, Header
from contactsmanager import ContactsManagerClient
import json
import asyncio

app = FastAPI()

# Initialize the ContactsManager client with your credentials
client = ContactsManagerClient(
    api_key="your_api_key",
    api_secret="your_api_secret",
    org_id="your_org_id",
)

# Set webhook secret from your dashboard
client.set_webhook_secret("your_webhook_secret")

# Store processed webhook IDs (use a database in production)
processed_webhook_ids = set()

@app.post("/webhooks/contactsmanager")
async def webhook_handler(request: Request, x_webhook_signature: str = Header(None)):
    # Read the request body
    body = await request.body()
    payload = json.loads(body)
    
    # Verify webhook signature
    if not client.verify_webhook_signature(body, x_webhook_signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Check for duplicate webhook delivery
    webhook_id = payload.get("id")
    if webhook_id in processed_webhook_ids:
        # Already processed this webhook, return success
        return {"status": "received"}
        
    # Process based on event type
    event_type = payload.get("event")
    event_data = payload.get("payload")
    
    # Process webhook without blocking the response
    asyncio.create_task(process_webhook_event(event_type, event_data))
    
    # Mark as processed
    processed_webhook_ids.add(webhook_id)
    
    # Return success immediately
    return {"status": "received"}

async def process_webhook_event(event_type, event_data):
    # Handle different event types
    if event_type == "user.new":
        # Handle new user event
        print(f"New user created: {event_data.get('email')}")
        pass
    elif event_type == "user.is_contact":
        # Handle user is contact event
        print(f"User {event_data.get('email')} matched with {len(event_data.get('matched_user_ids'))} contacts")
        pass
    elif event_type == "user.follow":
        # Handle user follow event
        print(f"{event_data.get('follower_name')} followed {event_data.get('followed_name')}")
        pass
    elif event_type == "user.unfollow":
        # Handle user unfollow event
        print(f"{event_data.get('follower_name')} unfollowed {event_data.get('followed_name')}")
        pass
    elif event_type == "user.create_event":
        # Handle event creation
        print(f"New event created: {event_data.get('title')}")
        pass
    elif event_type == "user.update_event":
        # Handle event update
        print(f"Event updated: {event_data.get('title')}")
        pass
    elif event_type == "user.delete_event":
        # Handle event deletion
        print(f"Event deleted: {event_data.get('event_id')}")
        pass
    elif event_type == "user.deleted":
        # Handle user deletion
        print(f"User deleted: {event_data.get('email')} - {event_data.get('user_id')}")
        # Clean up any external systems or data
        pass
    else:
        print(f"Unhandled event type: {event_type}")

Testing Your Webhook Implementation

  1. Deploy your webhook endpoint to a publicly accessible URL
  2. Register this URL in your ContactManager dashboard
  3. Generate test events in the dashboard to validate your implementation
  4. Check the logs to ensure events are being processed correctly

Next Steps

I