API Client
Communication with external platform
- REST/GraphQL client
- Authentication handling
- Rate limiting and retries
- Response parsing
Complete development guide for creating custom platform integrations with Overwatch, covering API clients, webhook processors, and alert parsers.
API Client
Communication with external platform
Webhook Processor
Incoming alert processing
Alert Parser
Alert format detection
Data Transformer
Data normalization
backend/app/services/├── integrations/│ └── {platform}/│ ├── __init__.py│ ├── client.py # API client│ ├── webhook_processor.py # Webhook handler│ ├── data_transformer.py # Data normalization│ └── exceptions.py # Custom exceptions└── alert_parsers/ ├── base.py # Abstract parser ├── factory.py # Parser factory └── {platform}.py # Platform parserCreate a client for communicating with the external platform:
import httpxfrom typing import Dict, Any, Optional, Listfrom datetime import datetime
from app.core.logging import loggerfrom app.services.integrations.custom_platform.exceptions import ( CustomPlatformAPIError, CustomPlatformAuthError, CustomPlatformRateLimitError)
class CustomPlatformClient: """API client for Custom Platform integration."""
def __init__( self, api_url: str, api_key: str, timeout: int = 30 ): self.api_url = api_url.rstrip('/') self.api_key = api_key self.timeout = timeout
# Create HTTP client self.client = httpx.AsyncClient( base_url=self.api_url, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "User-Agent": "Overwatch-Integration/1.0" }, timeout=timeout )
async def __aenter__(self): """Async context manager entry.""" return self
async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.client.aclose()
async def test_connection(self) -> bool: """ Test API connection and authentication.
Returns: True if connection successful
Raises: CustomPlatformAuthError: If authentication fails CustomPlatformAPIError: If API request fails """ try: response = await self.client.get("/api/v1/health") response.raise_for_status() return response.status_code == 200 except httpx.HTTPStatusError as e: if e.response.status_code == 401: raise CustomPlatformAuthError("Invalid API key") raise CustomPlatformAPIError(f"Connection test failed: {e}") except httpx.RequestError as e: raise CustomPlatformAPIError(f"Connection error: {e}")
async def get_alerts( self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, severity: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Retrieve alerts from platform.
Args: start_time: Filter alerts after this time end_time: Filter alerts before this time severity: Filter by severity levels
Returns: List of alert dictionaries """ try: params = {}
if start_time: params["start_time"] = start_time.isoformat() if end_time: params["end_time"] = end_time.isoformat() if severity: params["severity"] = ",".join(severity)
response = await self.client.get( "/api/v1/alerts", params=params ) response.raise_for_status()
return response.json()["alerts"]
except httpx.HTTPStatusError as e: if e.response.status_code == 429: raise CustomPlatformRateLimitError("Rate limit exceeded") raise CustomPlatformAPIError(f"Failed to fetch alerts: {e}")
async def get_metrics( self, query: str, start_time: datetime, end_time: datetime ) -> Dict[str, Any]: """ Query metrics from platform.
Args: query: Platform-specific query string start_time: Query start time end_time: Query end time
Returns: Metrics data dictionary """ try: payload = { "query": query, "start": start_time.isoformat(), "end": end_time.isoformat() }
response = await self.client.post( "/api/v1/metrics/query", json=payload ) response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e: raise CustomPlatformAPIError(f"Metrics query failed: {e}")
async def create_alert_rule( self, name: str, condition: Dict[str, Any], actions: List[Dict[str, Any]] ) -> str: """ Create alert rule on platform.
Args: name: Alert rule name condition: Alert condition configuration actions: Alert actions (notifications)
Returns: Created alert rule ID """ try: payload = { "name": name, "condition": condition, "actions": actions }
response = await self.client.post( "/api/v1/alert-rules", json=payload ) response.raise_for_status()
return response.json()["id"]
except httpx.HTTPStatusError as e: raise CustomPlatformAPIError(f"Failed to create alert rule: {e}")Define platform-specific exceptions:
class CustomPlatformError(Exception): """Base exception for Custom Platform integration.""" pass
class CustomPlatformAPIError(CustomPlatformError): """API request failed.""" pass
class CustomPlatformAuthError(CustomPlatformError): """Authentication failed.""" pass
class CustomPlatformRateLimitError(CustomPlatformError): """Rate limit exceeded.""" pass
class CustomPlatformWebhookValidationError(CustomPlatformError): """Webhook validation failed.""" def __init__(self, message: str, validation_errors: List[str]): super().__init__(message) self.validation_errors = validation_errorsHandle incoming webhooks from the platform:
import hmacimport hashlibimport jsonfrom typing import Dict, Any, Optional, Listfrom sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import loggerfrom app.models.integration import Integrationfrom app.models.incident import Incident, IncidentSeverity, IncidentStatusfrom app.services.integrations.custom_platform.exceptions import ( CustomPlatformWebhookValidationError)
class CustomPlatformWebhookProcessor: """Process webhooks from Custom Platform."""
def __init__(self, session: AsyncSession): self.session = session
def validate_webhook_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ Validate webhook payload structure.
Returns: { "is_valid": bool, "errors": List[str] } """ errors = []
# Required fields required_fields = ["id", "title", "severity", "status"] for field in required_fields: if field not in payload: errors.append(f"Missing required field: {field}")
# Validate severity valid_severities = ["low", "medium", "high", "critical"] if "severity" in payload and payload["severity"] not in valid_severities: errors.append(f"Invalid severity: {payload['severity']}")
# Validate status valid_statuses = ["firing", "resolved"] if "status" in payload and payload["status"] not in valid_statuses: errors.append(f"Invalid status: {payload['status']}")
return { "is_valid": len(errors) == 0, "errors": errors }
def validate_webhook_signature( self, payload: Dict[str, Any], signature: str, secret: str ) -> bool: """ Validate HMAC signature for webhook authenticity.
Args: payload: Webhook payload signature: Received signature secret: Webhook secret
Returns: True if signature is valid """ # Serialize payload payload_str = json.dumps(payload, separators=(',', ':'))
# Calculate expected signature expected_signature = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest()
# Constant-time comparison return hmac.compare_digest( f"sha256={expected_signature}", signature )
async def process_webhook( self, payload: Dict[str, Any], integration: Integration, signature: Optional[str] = None ) -> Optional[Incident]: """ Process webhook and create/update incident.
Args: payload: Webhook payload integration: Integration instance signature: Optional webhook signature for validation
Returns: Created or updated incident, or None if processing failed """ try: # Validate signature if provided if signature: webhook_secret = integration.get_config_dict().get("webhook_secret") if webhook_secret: if not self.validate_webhook_signature(payload, signature, webhook_secret): logger.error("Webhook signature validation failed") return None
# Validate payload structure validation_result = self.validate_webhook_payload(payload) if not validation_result["is_valid"]: raise CustomPlatformWebhookValidationError( "Webhook validation failed", validation_result["errors"] )
# Map severity severity_mapping = { "low": IncidentSeverity.LOW, "medium": IncidentSeverity.MEDIUM, "high": IncidentSeverity.HIGH, "critical": IncidentSeverity.CRITICAL }
# Map status status_mapping = { "firing": IncidentStatus.INVESTIGATING, "resolved": IncidentStatus.RESOLVED }
# Check if incident already exists (by external_id) from sqlalchemy import select stmt = select(Incident).where( Incident.external_id == payload["id"], Incident.integration_id == integration.id ) result = await self.session.execute(stmt) incident = result.scalar_one_or_none()
if incident: # Update existing incident incident.status = status_mapping.get( payload.get("status", "firing"), IncidentStatus.INVESTIGATING ) incident.metadata = self._extract_metadata(payload) else: # Create new incident incident = Incident( organization_id=integration.organization_id, integration_id=integration.id, external_id=payload["id"], title=payload["title"], description=payload.get("description", ""), severity=severity_mapping.get( payload.get("severity", "medium"), IncidentSeverity.MEDIUM ), status=status_mapping.get( payload.get("status", "firing"), IncidentStatus.INVESTIGATING ), metadata=self._extract_metadata(payload) ) self.session.add(incident)
await self.session.commit() await self.session.refresh(incident)
logger.info(f"Processed webhook for incident {incident.id}") return incident
except CustomPlatformWebhookValidationError as e: logger.error(f"Webhook validation failed: {e.validation_errors}") return None except Exception as e: logger.error(f"Failed to process webhook: {e}") await self.session.rollback() return None
def _extract_metadata(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Extract metadata from webhook payload.""" return { "external_link": payload.get("link"), "tags": payload.get("tags", []), "source": payload.get("source"), "environment": payload.get("environment"), "service": payload.get("service"), "raw_payload": payload }Implement factory pattern for automatic platform detection:
from typing import Dict, Any, Optionalfrom app.services.alert_parsers.base import BaseAlertParserfrom app.models.incident import IncidentSeverity
class CustomPlatformAlertParser(BaseAlertParser): """Parse alerts from Custom Platform."""
@staticmethod def can_parse(payload: Dict[str, Any]) -> bool: """ Detect if payload is from Custom Platform.
Args: payload: Alert payload
Returns: True if this parser can handle the payload """ # Check for platform-specific fields required_fields = ["id", "alert_type", "platform_version"] has_required = all(field in payload for field in required_fields)
# Check for platform identifier platform_match = payload.get("source") == "custom_platform"
return has_required and platform_match
async def parse(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ Parse Custom Platform alert into standard format.
Args: payload: Raw alert payload
Returns: Normalized alert dictionary """ # Map severity severity_mapping = { "1": IncidentSeverity.LOW, "2": IncidentSeverity.MEDIUM, "3": IncidentSeverity.HIGH, "4": IncidentSeverity.CRITICAL }
return { "external_id": payload.get("id"), "title": payload.get("alert_name"), "description": payload.get("message", ""), "severity": severity_mapping.get( str(payload.get("severity", 2)), IncidentSeverity.MEDIUM ), "source": "custom_platform", "metadata": { "alert_type": payload.get("alert_type"), "environment": payload.get("environment"), "service": payload.get("service"), "tags": payload.get("tags", []), "link": payload.get("link"), "triggered_at": payload.get("triggered_at") } }from app.services.alert_parsers.custom_platform import CustomPlatformAlertParser
class AlertParserFactory: """Factory for creating alert parsers based on payload."""
_parsers = [ # ... existing parsers CustomPlatformAlertParser(), ]
@classmethod def get_parser(cls, payload: Dict[str, Any]) -> Optional[BaseAlertParser]: """ Get appropriate parser for payload.
Args: payload: Alert payload
Returns: Parser instance or None if no parser found """ for parser in cls._parsers: if parser.can_parse(payload): return parser return NoneNormalize data from external platform:
from typing import Dict, Any, Listfrom datetime import datetime
class CustomPlatformDataTransformer: """Transform data from Custom Platform to Overwatch format."""
def transform_metrics( self, raw_metrics: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Transform platform metrics to standard format.
Args: raw_metrics: Raw metrics from platform
Returns: List of normalized metric dictionaries """ normalized_metrics = []
for series in raw_metrics.get("series", []): normalized_metrics.append({ "metric_name": series.get("metric"), "value": series.get("value"), "timestamp": datetime.fromisoformat(series.get("timestamp")), "tags": self._normalize_tags(series.get("tags", {})), "unit": series.get("unit") })
return normalized_metrics
def transform_logs( self, raw_logs: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Transform platform logs to standard format.
Args: raw_logs: Raw logs from platform
Returns: List of normalized log dictionaries """ normalized_logs = []
for log_entry in raw_logs.get("logs", []): normalized_logs.append({ "timestamp": datetime.fromisoformat(log_entry.get("timestamp")), "level": self._normalize_log_level(log_entry.get("severity")), "message": log_entry.get("message"), "service": log_entry.get("service"), "host": log_entry.get("host"), "attributes": log_entry.get("attributes", {}) })
return normalized_logs
def _normalize_tags(self, tags: Dict[str, str]) -> Dict[str, str]: """Normalize tag format.""" return { key.lower().replace(" ", "_"): value for key, value in tags.items() }
def _normalize_log_level(self, severity: str) -> str: """Normalize log level.""" mapping = { "1": "DEBUG", "2": "INFO", "3": "WARNING", "4": "ERROR", "5": "CRITICAL" } return mapping.get(str(severity), "INFO")Add endpoints for platform-specific operations:
from fastapi import APIRouter, Depends, HTTPExceptionfrom sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_sessionfrom app.core.dependencies import get_current_active_userfrom app.models.user import Userfrom app.models.integration import Integration, IntegrationTypefrom app.services.integrations.custom_platform.client import CustomPlatformClient
router = APIRouter()
@router.post("/integrations/custom-platform/test")async def test_custom_platform_connection( *, api_url: str, api_key: str, current_user: User = Depends(get_current_active_user)): """Test Custom Platform API connection.""" try: async with CustomPlatformClient(api_url, api_key) as client: success = await client.test_connection()
return { "success": success, "message": "Connection successful" } except Exception as e: raise HTTPException( status_code=400, detail=f"Connection test failed: {str(e)}" )
@router.get("/integrations/custom-platform/{integration_id}/alerts")async def get_custom_platform_alerts( *, integration_id: str, session: AsyncSession = Depends(get_session), current_user: User = Depends(get_current_active_user)): """Fetch alerts from Custom Platform.""" integration = await session.get(Integration, integration_id)
if not integration: raise HTTPException(status_code=404, detail="Integration not found")
if integration.integration_type != IntegrationType.CUSTOM_PLATFORM: raise HTTPException(status_code=400, detail="Invalid integration type")
# Get API credentials config = integration.get_config_dict()
async with CustomPlatformClient(config["api_url"], config["api_key"]) as client: alerts = await client.get_alerts()
return { "alerts": alerts, "count": len(alerts) }import pytestfrom app.services.integrations.custom_platform.client import CustomPlatformClientfrom app.services.integrations.custom_platform.webhook_processor import CustomPlatformWebhookProcessor
@pytest.mark.asyncioasync def test_custom_platform_client(): """Test Custom Platform API client.""" async with CustomPlatformClient("https://api.example.com", "test-key") as client: # Test connection success = await client.test_connection() assert success is True
@pytest.mark.asyncioasync def test_webhook_processor(db_session): """Test webhook processing.""" processor = CustomPlatformWebhookProcessor(db_session)
payload = { "id": "alert-123", "title": "Test Alert", "severity": "high", "status": "firing" }
# Validate payload result = processor.validate_webhook_payload(payload) assert result["is_valid"] is TrueFor integration development assistance, contact support@overwatch-observability.com.