Incoming Webhooks
Incoming Webhooks
Section titled “Incoming Webhooks”Comprehensive guide to developing incoming webhook handlers for receiving alerts and notifications from external monitoring platforms.
Development Pattern
Section titled “Development Pattern”FastAPI Endpoint Structure
Section titled “FastAPI Endpoint Structure”All incoming webhooks follow this production-ready pattern:
from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Headerfrom sqlalchemy.ext.asyncio import AsyncSessionfrom typing import Dict, Any, Optional
from app.core.database import get_sessionfrom app.core.logging import loggerfrom app.models.integration import Integration, IntegrationTypefrom app.schemas.common import SuccessResponse
router = APIRouter()
@router.post( "/webhooks/{platform}/{integration_id}", response_model=SuccessResponse, summary="{Platform} Webhook", description="Receive alerts from {Platform} monitoring")async def platform_webhook( *, integration_id: str, request: Request, background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session), x_platform_signature: Optional[str] = Header(None)) -> SuccessResponse: """Handle {Platform} webhook alerts.""" try: payload = await request.json() logger.info(f"Received {Platform} webhook for integration {integration_id}")
# 1. Find integration integration = await session.get(Integration, integration_id) if not integration: raise HTTPException( status_code=404, detail="Integration not found" )
# 2. Validate integration type if integration.integration_type != IntegrationType.PLATFORM: raise HTTPException( status_code=400, detail="Integration is not a {Platform} integration" )
# 3. Process webhook in background (non-blocking) background_tasks.add_task( process_platform_webhook, payload, integration, x_platform_signature, session )
# 4. Return success immediately (< 100ms) return SuccessResponse( message="Webhook received successfully", data={ "integration_id": integration_id, "alert_id": payload.get("id") } )
except HTTPException: raise except Exception as e: logger.error(f"Failed to process {Platform} webhook: {e}") raise HTTPException( status_code=400, detail="Failed to process webhook" )Background Processing
Section titled “Background Processing”Heavy lifting happens asynchronously to avoid blocking webhook response:
async def process_platform_webhook( payload: Dict[str, Any], integration: Integration, signature: Optional[str], session: AsyncSession): """Background task to process {Platform} webhook.""" try: logger.info(f"Processing {Platform} webhook for integration {integration.id}")
# Initialize webhook processor processor = PlatformWebhookProcessor(session)
# Validate signature if provided if signature: webhook_secret = integration.get_config_dict().get("webhook_secret") if webhook_secret: is_valid = await processor.validate_signature( payload, signature, webhook_secret ) if not is_valid: logger.error("{Platform} webhook signature validation failed") return
# Validate payload structure validation_result = processor.validate_webhook_payload(payload) if not validation_result["is_valid"]: logger.error(f"{Platform} webhook validation failed: {validation_result['errors']}") return
# Process webhook and create incident incident = await processor.process_webhook(payload, integration)
if incident: logger.info(f"Created/updated incident {incident.id} from {Platform} webhook") else: logger.warning("No incident created from {Platform} webhook")
except Exception as e: logger.error(f"Failed to process {Platform} webhook: {e}")Webhook Processor Pattern
Section titled “Webhook Processor Pattern”Base Processor Class
Section titled “Base Processor Class”All webhook processors inherit from a base class:
from abc import ABC, abstractmethodfrom typing import Dict, Any, Optional, Listfrom sqlalchemy.ext.asyncio import AsyncSession
from app.models.integration import Integrationfrom app.models.incident import Incident
class BaseWebhookProcessor(ABC): """Base class for webhook processors."""
def __init__(self, session: AsyncSession): self.session = session
@abstractmethod def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ Validate webhook payload structure.
Returns: { "is_valid": bool, "errors": List[str] } """ pass
@abstractmethod async def process_webhook( self, payload: Dict[str, Any], integration: Integration ) -> Optional[Incident]: """ Process webhook and create incident.
Returns: Created or updated incident, or None if processing failed """ pass
def validate_signature( self, payload: Dict[str, Any], signature: str, secret: str ) -> bool: """Validate HMAC signature.""" import hmac import hashlib import json
# Convert payload to string payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature expected_signature = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest()
# Compare signatures return hmac.compare_digest( f"sha256={expected_signature}", signature )Platform-Specific Processor
Section titled “Platform-Specific Processor”Implement platform-specific logic:
from app.services.webhook_processors.base import BaseWebhookProcessorfrom app.models.incident import Incident, IncidentSeverity, IncidentStatus
class DatadogWebhookProcessor(BaseWebhookProcessor): """Process Datadog webhook alerts."""
def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Validate Datadog payload structure.""" errors = []
# Required fields required_fields = ["id", "alert_name", "alert_type"] for field in required_fields: if field not in payload: errors.append(f"Missing required field: {field}")
# Validate severity valid_severities = ["low", "medium", "high", "critical"] if "severity" in payload and payload["severity"] not in valid_severities: errors.append(f"Invalid severity: {payload['severity']}")
return { "is_valid": len(errors) == 0, "errors": errors }
async def process_webhook( self, payload: Dict[str, Any], integration: Integration ) -> Optional[Incident]: """Process Datadog webhook and create incident.""" try: # Map Datadog severity to Overwatch severity severity_mapping = { "low": IncidentSeverity.LOW, "medium": IncidentSeverity.MEDIUM, "high": IncidentSeverity.HIGH, "critical": IncidentSeverity.CRITICAL }
# Create incident incident = Incident( organization_id=integration.organization_id, integration_id=integration.id, external_id=payload["id"], title=payload["alert_name"], description=payload.get("message", ""), severity=severity_mapping.get( payload.get("severity", "medium"), IncidentSeverity.MEDIUM ), status=IncidentStatus.INVESTIGATING, metadata={ "alert_type": payload.get("alert_type"), "host": payload.get("host"), "tags": payload.get("tags", []), "snapshot": payload.get("snapshot"), "link": payload.get("link") } )
self.session.add(incident) await self.session.commit() await self.session.refresh(incident)
return incident
except Exception as e: logger.error(f"Failed to create incident from Datadog webhook: {e}") await self.session.rollback() return NoneSignature Validation
Section titled “Signature Validation”HMAC Signature Validation
Section titled “HMAC Signature Validation”Standard HMAC-SHA256:
import hmacimport hashlibimport json
def validate_hmac_signature( payload: Dict[str, Any], signature: str, secret: str) -> bool: """Validate HMAC-SHA256 signature.""" # Serialize payload payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature expected_signature = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest()
# Compare with constant-time comparison return hmac.compare_digest( f"sha256={expected_signature}", signature )GitHub-style HMAC-SHA1:
import hmacimport hashlib
def validate_github_signature( payload: bytes, signature: str, secret: str) -> bool: """Validate GitHub-style HMAC-SHA1 signature.""" expected_signature = hmac.new( secret.encode(), payload, hashlib.sha1 ).hexdigest()
return hmac.compare_digest( f"sha1={expected_signature}", signature )PagerDuty V1 Signature:
import hmacimport hashlib
def validate_pagerduty_signature( raw_body: bytes, signature: str, secret: str) -> bool: """Validate PagerDuty V1 signature.""" expected_signature = hmac.new( secret.encode(), raw_body, hashlib.sha256 ).hexdigest()
# PagerDuty uses "v1=" prefix return hmac.compare_digest( f"v1={expected_signature}", signature )Timestamp Validation
Section titled “Timestamp Validation”Prevent replay attacks with timestamp validation:
from datetime import datetime, timedelta
def validate_webhook_timestamp( timestamp: str, max_age_minutes: int = 5) -> bool: """Validate webhook timestamp to prevent replay attacks.""" try: # Parse timestamp (Unix epoch) webhook_time = datetime.fromtimestamp(int(timestamp))
# Get current time current_time = datetime.now()
# Check if webhook is within acceptable time window age = current_time - webhook_time
return age < timedelta(minutes=max_age_minutes)
except (ValueError, TypeError): return FalsePayload Validation
Section titled “Payload Validation”Schema Validation with Pydantic
Section titled “Schema Validation with Pydantic”Use Pydantic for robust payload validation:
from pydantic import BaseModel, Field, validatorfrom typing import List, Optionalfrom enum import Enum
class AlertSeverity(str, Enum): """Alert severity levels.""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical"
class DatadogWebhookPayload(BaseModel): """Datadog webhook payload schema.""" id: str = Field(..., description="Unique alert ID") alert_name: str = Field(..., min_length=1, max_length=255) alert_type: str = Field(..., description="Alert type") alert_status: str = Field(..., description="Alert status") severity: AlertSeverity message: Optional[str] = Field(None, max_length=5000) host: Optional[str] = None tags: List[str] = Field(default_factory=list) snapshot: Optional[str] = None link: Optional[str] = None
@validator('tags') def validate_tags(cls, v): """Validate tags format.""" if len(v) > 50: raise ValueError("Too many tags (max 50)") return v
@validator('link') def validate_link(cls, v): """Validate link is a valid URL.""" if v and not v.startswith(('http://', 'https://')): raise ValueError("Link must be a valid URL") return v
# Usagetry: validated_payload = DatadogWebhookPayload(**payload)except ValidationError as e: logger.error(f"Payload validation failed: {e.errors()}") raise HTTPException( status_code=400, detail={"validation_errors": e.errors()} )Error Handling
Section titled “Error Handling”Graceful Error Handling
Section titled “Graceful Error Handling”from app.core.logging import loggerfrom app.core.exceptions import WebhookProcessingError
async def process_webhook_with_error_handling( payload: Dict[str, Any], integration: Integration): """Process webhook with comprehensive error handling.""" try: # Validate payload if not payload: raise WebhookProcessingError("Empty payload")
# Process webhook incident = await create_incident_from_webhook(payload, integration)
# Record success await record_webhook_success(integration.id)
return incident
except WebhookProcessingError as e: # Specific webhook error logger.error(f"Webhook processing error: {e}") await record_webhook_failure(integration.id, str(e)) return None
except ValidationError as e: # Validation error logger.error(f"Webhook validation error: {e.errors()}") await record_webhook_failure(integration.id, "validation_error") return None
except Exception as e: # Unexpected error logger.exception(f"Unexpected webhook processing error: {e}") await record_webhook_failure(integration.id, "unexpected_error") return NoneCircuit Breaker Pattern
Section titled “Circuit Breaker Pattern”Auto-disable failing webhooks:
class WebhookCircuitBreaker: """Circuit breaker for webhook processing."""
def __init__(self, integration: Integration): self.integration = integration self.failure_threshold = 10 self.reset_timeout = 3600 # 1 hour
async def record_failure(self): """Record webhook failure.""" self.integration.failure_count += 1 self.integration.last_error_at = datetime.utcnow()
if self.integration.failure_count >= self.failure_threshold: # Open circuit breaker self.integration.status = IntegrationStatus.ERROR logger.error( f"Circuit breaker opened for integration {self.integration.id}" )
async def record_success(self): """Record webhook success.""" self.integration.failure_count = 0 self.integration.last_success_at = datetime.utcnow()
# Close circuit breaker if it was open if self.integration.status == IntegrationStatus.ERROR: self.integration.status = IntegrationStatus.ACTIVE logger.info( f"Circuit breaker closed for integration {self.integration.id}" )Testing Webhooks
Section titled “Testing Webhooks”Unit Tests
Section titled “Unit Tests”-
Test Payload Validation
import pytestfrom app.services.datadog_webhook_processor import DatadogWebhookProcessor@pytest.mark.asyncioasync def test_datadog_payload_validation(db_session):"""Test Datadog payload validation."""processor = DatadogWebhookProcessor(db_session)# Valid payloadvalid_payload = {"id": "123","alert_name": "Test Alert","alert_type": "error","severity": "high"}result = processor.validate_webhook_payload(valid_payload)assert result["is_valid"] is True# Invalid payload (missing required field)invalid_payload = {"alert_name": "Test Alert"}result = processor.validate_webhook_payload(invalid_payload)assert result["is_valid"] is Falseassert "Missing required field: id" in result["errors"] -
Test Signature Validation
@pytest.mark.asyncioasync def test_signature_validation(db_session):"""Test HMAC signature validation."""processor = DatadogWebhookProcessor(db_session)payload = {"id": "123", "alert_name": "Test"}secret = "webhook_secret_key"# Generate valid signatureimport hmacimport hashlibimport jsonpayload_str = json.dumps(payload, separators=(',', ':'))valid_signature = "sha256=" + hmac.new(secret.encode(),payload_str.encode(),hashlib.sha256).hexdigest()# Test valid signatureassert processor.validate_signature(payload, valid_signature, secret) is True# Test invalid signatureinvalid_signature = "sha256=invalid"assert processor.validate_signature(payload, invalid_signature, secret) is False -
Test Incident Creation
@pytest.mark.asyncioasync def test_incident_creation(db_session, test_integration):"""Test incident creation from webhook."""processor = DatadogWebhookProcessor(db_session)payload = {"id": "alert-123","alert_name": "High CPU Usage","alert_type": "error","severity": "critical","message": "CPU usage above 90%"}incident = await processor.process_webhook(payload, test_integration)assert incident is not Noneassert incident.title == "High CPU Usage"assert incident.severity == IncidentSeverity.CRITICALassert incident.external_id == "alert-123"
Integration Tests
Section titled “Integration Tests”Test complete webhook flow:
import pytestfrom httpx import AsyncClient
@pytest.mark.asyncioasync def test_datadog_webhook_endpoint(client: AsyncClient, test_integration): """Test Datadog webhook endpoint end-to-end.""" payload = { "id": "alert-456", "alert_name": "Test Alert", "alert_type": "error", "alert_status": "triggered", "severity": "high" }
response = await client.post( f"/api/v1/webhooks/datadog/{test_integration.id}", json=payload )
# Check webhook accepted assert response.status_code == 200 assert response.json()["success"] is True
# Wait for background processing await asyncio.sleep(1)
# Verify incident created incidents = await get_incidents_by_external_id("alert-456") assert len(incidents) == 1 assert incidents[0].title == "Test Alert"Performance Optimization
Section titled “Performance Optimization”Background Task Queue
Section titled “Background Task Queue”Use task queue for heavy processing:
from celery import Celery
celery_app = Celery('overwatch', broker='redis://localhost:6379/0')
@celery_app.taskasync def process_webhook_async( payload: Dict[str, Any], integration_id: str): """Process webhook asynchronously via Celery.""" # Heavy processing here pass
# In webhook endpointbackground_tasks.add_task( celery_app.send_task, 'process_webhook_async', args=[payload, integration_id])Batch Processing
Section titled “Batch Processing”Process multiple webhooks in batches:
from typing import List
async def process_webhooks_batch( webhooks: List[Dict[str, Any]], integration: Integration): """Process multiple webhooks in batch.""" incidents = []
for webhook in webhooks: try: incident = await process_single_webhook(webhook, integration) if incident: incidents.append(incident) except Exception as e: logger.error(f"Failed to process webhook: {e}") continue
# Bulk insert incidents if incidents: await session.bulk_save_objects(incidents) await session.commit()
return incidentsMonitoring and Metrics
Section titled “Monitoring and Metrics”Prometheus Metrics
Section titled “Prometheus Metrics”Track webhook performance:
from prometheus_client import Counter, Histogram, Gauge
webhook_requests_total = Counter( 'webhook_requests_total', 'Total webhook requests', ['platform', 'status'])
webhook_processing_duration = Histogram( 'webhook_processing_seconds', 'Webhook processing duration', ['platform'])
webhook_failures_total = Counter( 'webhook_failures_total', 'Total webhook processing failures', ['platform', 'error_type'])
# Usagewebhook_requests_total.labels(platform='datadog', status='success').inc()
with webhook_processing_duration.labels(platform='datadog').time(): await process_webhook(payload, integration)Next Steps
Section titled “Next Steps”- Building Integrations - Complete integration development
- Testing Guide - Local testing with Docker Compose
- Webhooks Overview - Webhook architecture and patterns
Need Help?
Section titled “Need Help?”For webhook development assistance, contact support@overwatch-observability.com.