Skip to content

Incoming Webhooks

Comprehensive guide to developing incoming webhook handlers for receiving alerts and notifications from external monitoring platforms.

All incoming webhooks follow this production-ready pattern:

from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Header
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, Any, Optional
from app.core.database import get_session
from app.core.logging import logger
from app.models.integration import Integration, IntegrationType
from app.schemas.common import SuccessResponse
router = APIRouter()
@router.post(
"/webhooks/{platform}/{integration_id}",
response_model=SuccessResponse,
summary="{Platform} Webhook",
description="Receive alerts from {Platform} monitoring"
)
async def platform_webhook(
*,
integration_id: str,
request: Request,
background_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_session),
x_platform_signature: Optional[str] = Header(None)
) -> SuccessResponse:
"""Handle {Platform} webhook alerts."""
try:
payload = await request.json()
logger.info(f"Received {Platform} webhook for integration {integration_id}")
# 1. Find integration
integration = await session.get(Integration, integration_id)
if not integration:
raise HTTPException(
status_code=404,
detail="Integration not found"
)
# 2. Validate integration type
if integration.integration_type != IntegrationType.PLATFORM:
raise HTTPException(
status_code=400,
detail="Integration is not a {Platform} integration"
)
# 3. Process webhook in background (non-blocking)
background_tasks.add_task(
process_platform_webhook,
payload,
integration,
x_platform_signature,
session
)
# 4. Return success immediately (< 100ms)
return SuccessResponse(
message="Webhook received successfully",
data={
"integration_id": integration_id,
"alert_id": payload.get("id")
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to process {Platform} webhook: {e}")
raise HTTPException(
status_code=400,
detail="Failed to process webhook"
)

Heavy lifting happens asynchronously to avoid blocking webhook response:

async def process_platform_webhook(
payload: Dict[str, Any],
integration: Integration,
signature: Optional[str],
session: AsyncSession
):
"""Background task to process {Platform} webhook."""
try:
logger.info(f"Processing {Platform} webhook for integration {integration.id}")
# Initialize webhook processor
processor = PlatformWebhookProcessor(session)
# Validate signature if provided
if signature:
webhook_secret = integration.get_config_dict().get("webhook_secret")
if webhook_secret:
is_valid = await processor.validate_signature(
payload, signature, webhook_secret
)
if not is_valid:
logger.error("{Platform} webhook signature validation failed")
return
# Validate payload structure
validation_result = processor.validate_webhook_payload(payload)
if not validation_result["is_valid"]:
logger.error(f"{Platform} webhook validation failed: {validation_result['errors']}")
return
# Process webhook and create incident
incident = await processor.process_webhook(payload, integration)
if incident:
logger.info(f"Created/updated incident {incident.id} from {Platform} webhook")
else:
logger.warning("No incident created from {Platform} webhook")
except Exception as e:
logger.error(f"Failed to process {Platform} webhook: {e}")

All webhook processors inherit from a base class:

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.integration import Integration
from app.models.incident import Incident
class BaseWebhookProcessor(ABC):
"""Base class for webhook processors."""
def __init__(self, session: AsyncSession):
self.session = session
@abstractmethod
def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate webhook payload structure.
Returns:
{
"is_valid": bool,
"errors": List[str]
}
"""
pass
@abstractmethod
async def process_webhook(
self,
payload: Dict[str, Any],
integration: Integration
) -> Optional[Incident]:
"""
Process webhook and create incident.
Returns:
Created or updated incident, or None if processing failed
"""
pass
def validate_signature(
self,
payload: Dict[str, Any],
signature: str,
secret: str
) -> bool:
"""Validate HMAC signature."""
import hmac
import hashlib
import json
# Convert payload to string
payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature
expected_signature = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
# Compare signatures
return hmac.compare_digest(
f"sha256={expected_signature}",
signature
)

Implement platform-specific logic:

from app.services.webhook_processors.base import BaseWebhookProcessor
from app.models.incident import Incident, IncidentSeverity, IncidentStatus
class DatadogWebhookProcessor(BaseWebhookProcessor):
"""Process Datadog webhook alerts."""
def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Validate Datadog payload structure."""
errors = []
# Required fields
required_fields = ["id", "alert_name", "alert_type"]
for field in required_fields:
if field not in payload:
errors.append(f"Missing required field: {field}")
# Validate severity
valid_severities = ["low", "medium", "high", "critical"]
if "severity" in payload and payload["severity"] not in valid_severities:
errors.append(f"Invalid severity: {payload['severity']}")
return {
"is_valid": len(errors) == 0,
"errors": errors
}
async def process_webhook(
self,
payload: Dict[str, Any],
integration: Integration
) -> Optional[Incident]:
"""Process Datadog webhook and create incident."""
try:
# Map Datadog severity to Overwatch severity
severity_mapping = {
"low": IncidentSeverity.LOW,
"medium": IncidentSeverity.MEDIUM,
"high": IncidentSeverity.HIGH,
"critical": IncidentSeverity.CRITICAL
}
# Create incident
incident = Incident(
organization_id=integration.organization_id,
integration_id=integration.id,
external_id=payload["id"],
title=payload["alert_name"],
description=payload.get("message", ""),
severity=severity_mapping.get(
payload.get("severity", "medium"),
IncidentSeverity.MEDIUM
),
status=IncidentStatus.INVESTIGATING,
metadata={
"alert_type": payload.get("alert_type"),
"host": payload.get("host"),
"tags": payload.get("tags", []),
"snapshot": payload.get("snapshot"),
"link": payload.get("link")
}
)
self.session.add(incident)
await self.session.commit()
await self.session.refresh(incident)
return incident
except Exception as e:
logger.error(f"Failed to create incident from Datadog webhook: {e}")
await self.session.rollback()
return None

Standard HMAC-SHA256:

import hmac
import hashlib
import json
def validate_hmac_signature(
payload: Dict[str, Any],
signature: str,
secret: str
) -> bool:
"""Validate HMAC-SHA256 signature."""
# Serialize payload
payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature
expected_signature = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
# Compare with constant-time comparison
return hmac.compare_digest(
f"sha256={expected_signature}",
signature
)

Prevent replay attacks with timestamp validation:

from datetime import datetime, timedelta
def validate_webhook_timestamp(
timestamp: str,
max_age_minutes: int = 5
) -> bool:
"""Validate webhook timestamp to prevent replay attacks."""
try:
# Parse timestamp (Unix epoch)
webhook_time = datetime.fromtimestamp(int(timestamp))
# Get current time
current_time = datetime.now()
# Check if webhook is within acceptable time window
age = current_time - webhook_time
return age < timedelta(minutes=max_age_minutes)
except (ValueError, TypeError):
return False

Use Pydantic for robust payload validation:

from pydantic import BaseModel, Field, validator
from typing import List, Optional
from enum import Enum
class AlertSeverity(str, Enum):
"""Alert severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class DatadogWebhookPayload(BaseModel):
"""Datadog webhook payload schema."""
id: str = Field(..., description="Unique alert ID")
alert_name: str = Field(..., min_length=1, max_length=255)
alert_type: str = Field(..., description="Alert type")
alert_status: str = Field(..., description="Alert status")
severity: AlertSeverity
message: Optional[str] = Field(None, max_length=5000)
host: Optional[str] = None
tags: List[str] = Field(default_factory=list)
snapshot: Optional[str] = None
link: Optional[str] = None
@validator('tags')
def validate_tags(cls, v):
"""Validate tags format."""
if len(v) > 50:
raise ValueError("Too many tags (max 50)")
return v
@validator('link')
def validate_link(cls, v):
"""Validate link is a valid URL."""
if v and not v.startswith(('http://', 'https://')):
raise ValueError("Link must be a valid URL")
return v
# Usage
try:
validated_payload = DatadogWebhookPayload(**payload)
except ValidationError as e:
logger.error(f"Payload validation failed: {e.errors()}")
raise HTTPException(
status_code=400,
detail={"validation_errors": e.errors()}
)
from app.core.logging import logger
from app.core.exceptions import WebhookProcessingError
async def process_webhook_with_error_handling(
payload: Dict[str, Any],
integration: Integration
):
"""Process webhook with comprehensive error handling."""
try:
# Validate payload
if not payload:
raise WebhookProcessingError("Empty payload")
# Process webhook
incident = await create_incident_from_webhook(payload, integration)
# Record success
await record_webhook_success(integration.id)
return incident
except WebhookProcessingError as e:
# Specific webhook error
logger.error(f"Webhook processing error: {e}")
await record_webhook_failure(integration.id, str(e))
return None
except ValidationError as e:
# Validation error
logger.error(f"Webhook validation error: {e.errors()}")
await record_webhook_failure(integration.id, "validation_error")
return None
except Exception as e:
# Unexpected error
logger.exception(f"Unexpected webhook processing error: {e}")
await record_webhook_failure(integration.id, "unexpected_error")
return None

Auto-disable failing webhooks:

class WebhookCircuitBreaker:
"""Circuit breaker for webhook processing."""
def __init__(self, integration: Integration):
self.integration = integration
self.failure_threshold = 10
self.reset_timeout = 3600 # 1 hour
async def record_failure(self):
"""Record webhook failure."""
self.integration.failure_count += 1
self.integration.last_error_at = datetime.utcnow()
if self.integration.failure_count >= self.failure_threshold:
# Open circuit breaker
self.integration.status = IntegrationStatus.ERROR
logger.error(
f"Circuit breaker opened for integration {self.integration.id}"
)
async def record_success(self):
"""Record webhook success."""
self.integration.failure_count = 0
self.integration.last_success_at = datetime.utcnow()
# Close circuit breaker if it was open
if self.integration.status == IntegrationStatus.ERROR:
self.integration.status = IntegrationStatus.ACTIVE
logger.info(
f"Circuit breaker closed for integration {self.integration.id}"
)
  1. Test Payload Validation

    import pytest
    from app.services.datadog_webhook_processor import DatadogWebhookProcessor
    @pytest.mark.asyncio
    async def test_datadog_payload_validation(db_session):
    """Test Datadog payload validation."""
    processor = DatadogWebhookProcessor(db_session)
    # Valid payload
    valid_payload = {
    "id": "123",
    "alert_name": "Test Alert",
    "alert_type": "error",
    "severity": "high"
    }
    result = processor.validate_webhook_payload(valid_payload)
    assert result["is_valid"] is True
    # Invalid payload (missing required field)
    invalid_payload = {
    "alert_name": "Test Alert"
    }
    result = processor.validate_webhook_payload(invalid_payload)
    assert result["is_valid"] is False
    assert "Missing required field: id" in result["errors"]
  2. Test Signature Validation

    @pytest.mark.asyncio
    async def test_signature_validation(db_session):
    """Test HMAC signature validation."""
    processor = DatadogWebhookProcessor(db_session)
    payload = {"id": "123", "alert_name": "Test"}
    secret = "webhook_secret_key"
    # Generate valid signature
    import hmac
    import hashlib
    import json
    payload_str = json.dumps(payload, separators=(',', ':'))
    valid_signature = "sha256=" + hmac.new(
    secret.encode(),
    payload_str.encode(),
    hashlib.sha256
    ).hexdigest()
    # Test valid signature
    assert processor.validate_signature(payload, valid_signature, secret) is True
    # Test invalid signature
    invalid_signature = "sha256=invalid"
    assert processor.validate_signature(payload, invalid_signature, secret) is False
  3. Test Incident Creation

    @pytest.mark.asyncio
    async def test_incident_creation(db_session, test_integration):
    """Test incident creation from webhook."""
    processor = DatadogWebhookProcessor(db_session)
    payload = {
    "id": "alert-123",
    "alert_name": "High CPU Usage",
    "alert_type": "error",
    "severity": "critical",
    "message": "CPU usage above 90%"
    }
    incident = await processor.process_webhook(payload, test_integration)
    assert incident is not None
    assert incident.title == "High CPU Usage"
    assert incident.severity == IncidentSeverity.CRITICAL
    assert incident.external_id == "alert-123"

Test complete webhook flow:

import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_datadog_webhook_endpoint(client: AsyncClient, test_integration):
"""Test Datadog webhook endpoint end-to-end."""
payload = {
"id": "alert-456",
"alert_name": "Test Alert",
"alert_type": "error",
"alert_status": "triggered",
"severity": "high"
}
response = await client.post(
f"/api/v1/webhooks/datadog/{test_integration.id}",
json=payload
)
# Check webhook accepted
assert response.status_code == 200
assert response.json()["success"] is True
# Wait for background processing
await asyncio.sleep(1)
# Verify incident created
incidents = await get_incidents_by_external_id("alert-456")
assert len(incidents) == 1
assert incidents[0].title == "Test Alert"

Use task queue for heavy processing:

from celery import Celery
celery_app = Celery('overwatch', broker='redis://localhost:6379/0')
@celery_app.task
async def process_webhook_async(
payload: Dict[str, Any],
integration_id: str
):
"""Process webhook asynchronously via Celery."""
# Heavy processing here
pass
# In webhook endpoint
background_tasks.add_task(
celery_app.send_task,
'process_webhook_async',
args=[payload, integration_id]
)

Process multiple webhooks in batches:

from typing import List
async def process_webhooks_batch(
webhooks: List[Dict[str, Any]],
integration: Integration
):
"""Process multiple webhooks in batch."""
incidents = []
for webhook in webhooks:
try:
incident = await process_single_webhook(webhook, integration)
if incident:
incidents.append(incident)
except Exception as e:
logger.error(f"Failed to process webhook: {e}")
continue
# Bulk insert incidents
if incidents:
await session.bulk_save_objects(incidents)
await session.commit()
return incidents

Track webhook performance:

from prometheus_client import Counter, Histogram, Gauge
webhook_requests_total = Counter(
'webhook_requests_total',
'Total webhook requests',
['platform', 'status']
)
webhook_processing_duration = Histogram(
'webhook_processing_seconds',
'Webhook processing duration',
['platform']
)
webhook_failures_total = Counter(
'webhook_failures_total',
'Total webhook processing failures',
['platform', 'error_type']
)
# Usage
webhook_requests_total.labels(platform='datadog', status='success').inc()
with webhook_processing_duration.labels(platform='datadog').time():
await process_webhook(payload, integration)

For webhook development assistance, contact support@overwatch-observability.com.