Skip to content

Building Integrations

Complete development guide for creating custom platform integrations with Overwatch, covering API clients, webhook processors, and alert parsers.

API Client

Communication with external platform

  • REST/GraphQL client
  • Authentication handling
  • Rate limiting and retries
  • Response parsing

Webhook Processor

Incoming alert processing

  • Signature validation
  • Payload normalization
  • Incident creation
  • Background processing

Alert Parser

Alert format detection

  • Factory pattern for routing
  • Platform-specific parsing
  • Field mapping
  • Severity normalization

Data Transformer

Data normalization

  • Metric correlation
  • Log aggregation
  • Context enrichment
  • Time series handling
backend/app/services/
├── integrations/
│ └── {platform}/
│ ├── __init__.py
│ ├── client.py # API client
│ ├── webhook_processor.py # Webhook handler
│ ├── data_transformer.py # Data normalization
│ └── exceptions.py # Custom exceptions
└── alert_parsers/
├── base.py # Abstract parser
├── factory.py # Parser factory
└── {platform}.py # Platform parser

Create a client for communicating with the external platform:

backend/app/services/integrations/custom_platform/client.py
import httpx
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.core.logging import logger
from app.services.integrations.custom_platform.exceptions import (
CustomPlatformAPIError,
CustomPlatformAuthError,
CustomPlatformRateLimitError
)
class CustomPlatformClient:
"""API client for Custom Platform integration."""
def __init__(
self,
api_url: str,
api_key: str,
timeout: int = 30
):
self.api_url = api_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
# Create HTTP client
self.client = httpx.AsyncClient(
base_url=self.api_url,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"User-Agent": "Overwatch-Integration/1.0"
},
timeout=timeout
)
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.client.aclose()
async def test_connection(self) -> bool:
"""
Test API connection and authentication.
Returns:
True if connection successful
Raises:
CustomPlatformAuthError: If authentication fails
CustomPlatformAPIError: If API request fails
"""
try:
response = await self.client.get("/api/v1/health")
response.raise_for_status()
return response.status_code == 200
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise CustomPlatformAuthError("Invalid API key")
raise CustomPlatformAPIError(f"Connection test failed: {e}")
except httpx.RequestError as e:
raise CustomPlatformAPIError(f"Connection error: {e}")
async def get_alerts(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
severity: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Retrieve alerts from platform.
Args:
start_time: Filter alerts after this time
end_time: Filter alerts before this time
severity: Filter by severity levels
Returns:
List of alert dictionaries
"""
try:
params = {}
if start_time:
params["start_time"] = start_time.isoformat()
if end_time:
params["end_time"] = end_time.isoformat()
if severity:
params["severity"] = ",".join(severity)
response = await self.client.get(
"/api/v1/alerts",
params=params
)
response.raise_for_status()
return response.json()["alerts"]
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
raise CustomPlatformRateLimitError("Rate limit exceeded")
raise CustomPlatformAPIError(f"Failed to fetch alerts: {e}")
async def get_metrics(
self,
query: str,
start_time: datetime,
end_time: datetime
) -> Dict[str, Any]:
"""
Query metrics from platform.
Args:
query: Platform-specific query string
start_time: Query start time
end_time: Query end time
Returns:
Metrics data dictionary
"""
try:
payload = {
"query": query,
"start": start_time.isoformat(),
"end": end_time.isoformat()
}
response = await self.client.post(
"/api/v1/metrics/query",
json=payload
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise CustomPlatformAPIError(f"Metrics query failed: {e}")
async def create_alert_rule(
self,
name: str,
condition: Dict[str, Any],
actions: List[Dict[str, Any]]
) -> str:
"""
Create alert rule on platform.
Args:
name: Alert rule name
condition: Alert condition configuration
actions: Alert actions (notifications)
Returns:
Created alert rule ID
"""
try:
payload = {
"name": name,
"condition": condition,
"actions": actions
}
response = await self.client.post(
"/api/v1/alert-rules",
json=payload
)
response.raise_for_status()
return response.json()["id"]
except httpx.HTTPStatusError as e:
raise CustomPlatformAPIError(f"Failed to create alert rule: {e}")

Define platform-specific exceptions:

backend/app/services/integrations/custom_platform/exceptions.py
class CustomPlatformError(Exception):
"""Base exception for Custom Platform integration."""
pass
class CustomPlatformAPIError(CustomPlatformError):
"""API request failed."""
pass
class CustomPlatformAuthError(CustomPlatformError):
"""Authentication failed."""
pass
class CustomPlatformRateLimitError(CustomPlatformError):
"""Rate limit exceeded."""
pass
class CustomPlatformWebhookValidationError(CustomPlatformError):
"""Webhook validation failed."""
def __init__(self, message: str, validation_errors: List[str]):
super().__init__(message)
self.validation_errors = validation_errors

Handle incoming webhooks from the platform:

backend/app/services/integrations/custom_platform/webhook_processor.py
import hmac
import hashlib
import json
from typing import Dict, Any, Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import logger
from app.models.integration import Integration
from app.models.incident import Incident, IncidentSeverity, IncidentStatus
from app.services.integrations.custom_platform.exceptions import (
CustomPlatformWebhookValidationError
)
class CustomPlatformWebhookProcessor:
"""Process webhooks from Custom Platform."""
def __init__(self, session: AsyncSession):
self.session = session
def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate webhook payload structure.
Returns:
{
"is_valid": bool,
"errors": List[str]
}
"""
errors = []
# Required fields
required_fields = ["id", "title", "severity", "status"]
for field in required_fields:
if field not in payload:
errors.append(f"Missing required field: {field}")
# Validate severity
valid_severities = ["low", "medium", "high", "critical"]
if "severity" in payload and payload["severity"] not in valid_severities:
errors.append(f"Invalid severity: {payload['severity']}")
# Validate status
valid_statuses = ["firing", "resolved"]
if "status" in payload and payload["status"] not in valid_statuses:
errors.append(f"Invalid status: {payload['status']}")
return {
"is_valid": len(errors) == 0,
"errors": errors
}
def validate_webhook_signature(
self,
payload: Dict[str, Any],
signature: str,
secret: str
) -> bool:
"""
Validate HMAC signature for webhook authenticity.
Args:
payload: Webhook payload
signature: Received signature
secret: Webhook secret
Returns:
True if signature is valid
"""
# Serialize payload
payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature
expected_signature = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
# Constant-time comparison
return hmac.compare_digest(
f"sha256={expected_signature}",
signature
)
async def process_webhook(
self,
payload: Dict[str, Any],
integration: Integration,
signature: Optional[str] = None
) -> Optional[Incident]:
"""
Process webhook and create/update incident.
Args:
payload: Webhook payload
integration: Integration instance
signature: Optional webhook signature for validation
Returns:
Created or updated incident, or None if processing failed
"""
try:
# Validate signature if provided
if signature:
webhook_secret = integration.get_config_dict().get("webhook_secret")
if webhook_secret:
if not self.validate_webhook_signature(payload, signature, webhook_secret):
logger.error("Webhook signature validation failed")
return None
# Validate payload structure
validation_result = self.validate_webhook_payload(payload)
if not validation_result["is_valid"]:
raise CustomPlatformWebhookValidationError(
"Webhook validation failed",
validation_result["errors"]
)
# Map severity
severity_mapping = {
"low": IncidentSeverity.LOW,
"medium": IncidentSeverity.MEDIUM,
"high": IncidentSeverity.HIGH,
"critical": IncidentSeverity.CRITICAL
}
# Map status
status_mapping = {
"firing": IncidentStatus.INVESTIGATING,
"resolved": IncidentStatus.RESOLVED
}
# Check if incident already exists (by external_id)
from sqlalchemy import select
stmt = select(Incident).where(
Incident.external_id == payload["id"],
Incident.integration_id == integration.id
)
result = await self.session.execute(stmt)
incident = result.scalar_one_or_none()
if incident:
# Update existing incident
incident.status = status_mapping.get(
payload.get("status", "firing"),
IncidentStatus.INVESTIGATING
)
incident.metadata = self._extract_metadata(payload)
else:
# Create new incident
incident = Incident(
organization_id=integration.organization_id,
integration_id=integration.id,
external_id=payload["id"],
title=payload["title"],
description=payload.get("description", ""),
severity=severity_mapping.get(
payload.get("severity", "medium"),
IncidentSeverity.MEDIUM
),
status=status_mapping.get(
payload.get("status", "firing"),
IncidentStatus.INVESTIGATING
),
metadata=self._extract_metadata(payload)
)
self.session.add(incident)
await self.session.commit()
await self.session.refresh(incident)
logger.info(f"Processed webhook for incident {incident.id}")
return incident
except CustomPlatformWebhookValidationError as e:
logger.error(f"Webhook validation failed: {e.validation_errors}")
return None
except Exception as e:
logger.error(f"Failed to process webhook: {e}")
await self.session.rollback()
return None
def _extract_metadata(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Extract metadata from webhook payload."""
return {
"external_link": payload.get("link"),
"tags": payload.get("tags", []),
"source": payload.get("source"),
"environment": payload.get("environment"),
"service": payload.get("service"),
"raw_payload": payload
}

Implement factory pattern for automatic platform detection:

backend/app/services/alert_parsers/custom_platform.py
from typing import Dict, Any, Optional
from app.services.alert_parsers.base import BaseAlertParser
from app.models.incident import IncidentSeverity
class CustomPlatformAlertParser(BaseAlertParser):
"""Parse alerts from Custom Platform."""
@staticmethod
def can_parse(payload: Dict[str, Any]) -> bool:
"""
Detect if payload is from Custom Platform.
Args:
payload: Alert payload
Returns:
True if this parser can handle the payload
"""
# Check for platform-specific fields
required_fields = ["id", "alert_type", "platform_version"]
has_required = all(field in payload for field in required_fields)
# Check for platform identifier
platform_match = payload.get("source") == "custom_platform"
return has_required and platform_match
async def parse(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse Custom Platform alert into standard format.
Args:
payload: Raw alert payload
Returns:
Normalized alert dictionary
"""
# Map severity
severity_mapping = {
"1": IncidentSeverity.LOW,
"2": IncidentSeverity.MEDIUM,
"3": IncidentSeverity.HIGH,
"4": IncidentSeverity.CRITICAL
}
return {
"external_id": payload.get("id"),
"title": payload.get("alert_name"),
"description": payload.get("message", ""),
"severity": severity_mapping.get(
str(payload.get("severity", 2)),
IncidentSeverity.MEDIUM
),
"source": "custom_platform",
"metadata": {
"alert_type": payload.get("alert_type"),
"environment": payload.get("environment"),
"service": payload.get("service"),
"tags": payload.get("tags", []),
"link": payload.get("link"),
"triggered_at": payload.get("triggered_at")
}
}
backend/app/services/alert_parsers/factory.py
from app.services.alert_parsers.custom_platform import CustomPlatformAlertParser
class AlertParserFactory:
"""Factory for creating alert parsers based on payload."""
_parsers = [
# ... existing parsers
CustomPlatformAlertParser(),
]
@classmethod
def get_parser(cls, payload: Dict[str, Any]) -> Optional[BaseAlertParser]:
"""
Get appropriate parser for payload.
Args:
payload: Alert payload
Returns:
Parser instance or None if no parser found
"""
for parser in cls._parsers:
if parser.can_parse(payload):
return parser
return None

Normalize data from external platform:

backend/app/services/integrations/custom_platform/data_transformer.py
from typing import Dict, Any, List
from datetime import datetime
class CustomPlatformDataTransformer:
"""Transform data from Custom Platform to Overwatch format."""
def transform_metrics(
self,
raw_metrics: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Transform platform metrics to standard format.
Args:
raw_metrics: Raw metrics from platform
Returns:
List of normalized metric dictionaries
"""
normalized_metrics = []
for series in raw_metrics.get("series", []):
normalized_metrics.append({
"metric_name": series.get("metric"),
"value": series.get("value"),
"timestamp": datetime.fromisoformat(series.get("timestamp")),
"tags": self._normalize_tags(series.get("tags", {})),
"unit": series.get("unit")
})
return normalized_metrics
def transform_logs(
self,
raw_logs: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Transform platform logs to standard format.
Args:
raw_logs: Raw logs from platform
Returns:
List of normalized log dictionaries
"""
normalized_logs = []
for log_entry in raw_logs.get("logs", []):
normalized_logs.append({
"timestamp": datetime.fromisoformat(log_entry.get("timestamp")),
"level": self._normalize_log_level(log_entry.get("severity")),
"message": log_entry.get("message"),
"service": log_entry.get("service"),
"host": log_entry.get("host"),
"attributes": log_entry.get("attributes", {})
})
return normalized_logs
def _normalize_tags(self, tags: Dict[str, str]) -> Dict[str, str]:
"""Normalize tag format."""
return {
key.lower().replace(" ", "_"): value
for key, value in tags.items()
}
def _normalize_log_level(self, severity: str) -> str:
"""Normalize log level."""
mapping = {
"1": "DEBUG",
"2": "INFO",
"3": "WARNING",
"4": "ERROR",
"5": "CRITICAL"
}
return mapping.get(str(severity), "INFO")

Add endpoints for platform-specific operations:

backend/app/api/v1/endpoints/integrations.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.core.dependencies import get_current_active_user
from app.models.user import User
from app.models.integration import Integration, IntegrationType
from app.services.integrations.custom_platform.client import CustomPlatformClient
router = APIRouter()
@router.post("/integrations/custom-platform/test")
async def test_custom_platform_connection(
*,
api_url: str,
api_key: str,
current_user: User = Depends(get_current_active_user)
):
"""Test Custom Platform API connection."""
try:
async with CustomPlatformClient(api_url, api_key) as client:
success = await client.test_connection()
return {
"success": success,
"message": "Connection successful"
}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Connection test failed: {str(e)}"
)
@router.get("/integrations/custom-platform/{integration_id}/alerts")
async def get_custom_platform_alerts(
*,
integration_id: str,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_active_user)
):
"""Fetch alerts from Custom Platform."""
integration = await session.get(Integration, integration_id)
if not integration:
raise HTTPException(status_code=404, detail="Integration not found")
if integration.integration_type != IntegrationType.CUSTOM_PLATFORM:
raise HTTPException(status_code=400, detail="Invalid integration type")
# Get API credentials
config = integration.get_config_dict()
async with CustomPlatformClient(config["api_url"], config["api_key"]) as client:
alerts = await client.get_alerts()
return {
"alerts": alerts,
"count": len(alerts)
}
import pytest
from app.services.integrations.custom_platform.client import CustomPlatformClient
from app.services.integrations.custom_platform.webhook_processor import CustomPlatformWebhookProcessor
@pytest.mark.asyncio
async def test_custom_platform_client():
"""Test Custom Platform API client."""
async with CustomPlatformClient("https://api.example.com", "test-key") as client:
# Test connection
success = await client.test_connection()
assert success is True
@pytest.mark.asyncio
async def test_webhook_processor(db_session):
"""Test webhook processing."""
processor = CustomPlatformWebhookProcessor(db_session)
payload = {
"id": "alert-123",
"title": "Test Alert",
"severity": "high",
"status": "firing"
}
# Validate payload
result = processor.validate_webhook_payload(payload)
assert result["is_valid"] is True

For integration development assistance, contact support@overwatch-observability.com.