Enrichment Pipelines

Datenanreicherungs-Workflows erstellen

Erstellen und verwalten Sie Enrichment-Pipelines, um Ihre Process-Mining-Datensätze zu transformieren und zu verbessern.

Pipeline-Details abrufen

GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Ruft umfassende Informationen zu einer bestimmten Enrichment-Pipeline ab, einschließlich deren Stufen, Konfiguration und Ausführungsmetadaten.

Parameter

Parameter Typ Ort Beschreibung
tenantId GUID Pfad Die Mandantenkennung
projectId GUID Pfad Die Projektkennung
pipelineId GUID Pfad Die Pipeline-Kennung

Antwort

{
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "pipelineName": "Process Mining Data Enrichment",
  "pipelineDescription": "Erweitert Ereignisprotokolle mit zusätzlichen Attributen und Berechnungen",
  "status": "Active",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Datenvalidierung",
      "stageType": "Validation",
      "order": 1,
      "configuration": {
        "validateCaseId": true,
        "validateTimestamps": true,
        "requireActivityNames": true
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Zeitliche Anreicherung",
      "stageType": "TimeCalculation",
      "order": 2,
      "configuration": {
        "addDayOfWeek": true,
        "addBusinessHours": true,
        "timezoneId": "UTC"
      }
    }
  ],
  "triggers": {
    "automatic": true,
    "schedule": "0 2 * * *",
    "onDataUpdate": true
  },
  "dateCreated": "2024-01-15T10:30:00Z",
  "dateModified": "2024-01-20T14:45:00Z",
  "createdBy": "user123",
  "lastExecutionDate": "2024-01-20T02:00:00Z",
  "lastExecutionStatus": "Success",
  "executionCount": 45
}

Alle Pipelines auflisten

GET /api/{tenantId}/{projectId}/enrichment/pipelines

Ruft eine Liste aller Enrichment-Pipelines im Projekt mit grundlegenden Metadaten und Statusinformationen ab.

Abfrageparameter

Parameter Typ Beschreibung
status string Filter nach Pipeline-Status: Active, Inactive, Failed
page integer Seitenzahl für Pagination (Standard: 1)
pageSize integer Anzahl der Elemente pro Seite (Standard: 20, max.: 100)

Antwort

{
  "pipelines": [
    {
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "status": "Active",
      "stageCount": 5,
      "lastExecutionDate": "2024-01-20T02:00:00Z",
      "lastExecutionStatus": "Success",
      "dateCreated": "2024-01-15T10:30:00Z"
    }
  ],
  "totalCount": 12,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": false
}

Neue Pipeline erstellen

POST /api/{tenantId}/{projectId}/enrichment/pipeline

Erstellt eine neue Enrichment-Pipeline mit angegebenen Stufen und Konfiguration. Die Pipeline kann so konfiguriert werden, dass sie automatisch oder manuell ausgeführt wird.

Anfragetext

{
  "pipelineName": "Customer Journey Enrichment",
  "pipelineDescription": "Erweitert Kundendaten der Customer Journey mit Demografie- und Verhaltensmustern",
  "stages": [
    {
      "stageName": "Kundendaten-Abfrage",
      "stageType": "DataLookup",
      "order": 1,
      "configuration": {
        "lookupTable": "customer_demographics",
        "joinKey": "customerId",
        "selectFields": ["age", "segment", "region"]
      }
    },
    {
      "stageName": "Journey-Metriken",
      "stageType": "Calculation",
      "order": 2,
      "configuration": {
        "calculations": [
          {
            "fieldName": "journeyDuration",
            "formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
            "groupBy": "caseId"
          },
          {
            "fieldName": "touchpointCount",
            "formula": "COUNT(*)",
            "groupBy": "caseId"
          }
        ]
      }
    }
  ],
  "triggers": {
    "automatic": false,
    "schedule": null,
    "onDataUpdate": true
  }
}

Antwort

Gibt 201 Created mit dem vollständigen Pipeline-Objekt inklusive generierter IDs und Zeitstempel zurück.

Pipeline aktualisieren

PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Aktualisiert die Konfiguration, Stufen oder Trigger einer existierenden Pipeline. Änderungen werden bei der nächsten Ausführung wirksam.

Anfragetext

{
  "pipelineName": "Aktualisierte Customer Journey Enrichment",
  "pipelineDescription": "Erweiterte Anreicherung der Customer Journey Daten mit ML-Erkenntnissen",
  "status": "Active",
  "triggers": {
    "automatic": true,
    "schedule": "0 3 * * *",
    "onDataUpdate": true
  }
}

Antwort

Gibt das aktualisierte Pipeline-Objekt mit derselben Struktur wie der GET-Endpunkt zurück.

Pipeline löschen

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Entfernt eine Pipeline und deren gesamte Ausführungshistorie dauerhaft. Dieser Vorgang kann nicht rückgängig gemacht werden und stoppt alle aktuell laufenden Ausführungen.

Antwortcodes

  • 204 No Content - Pipeline erfolgreich gelöscht
  • 404 Not Found - Pipeline nicht gefunden oder Zugriff verweigert
  • 409 Conflict - Pipeline wird aktuell ausgeführt und kann nicht gelöscht werden

Stufe zur Pipeline hinzufügen

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage

Fügt einer bestehenden Pipeline eine neue Verarbeitungsetappe hinzu. Die Stufe wird an der angegebenen Reihenfolge eingefügt.

Anfragetext

{
  "stageName": "Leistungskennzahlen des Prozesses",
  "stageType": "PerformanceCalculation",
  "order": 3,
  "configuration": {
    "metrics": [
      {
        "name": "cycleTime",
        "calculation": "CASE_DURATION",
        "unit": "hours"
      },
      {
        "name": "waitTime",
        "calculation": "ACTIVITY_WAITING_TIME",
        "unit": "hours"
      }
    ],
    "aggregations": ["AVG", "MAX", "MIN", "P95"]
  }
}

Antwort

Gibt 201 Created mit dem vollständigen Stufenobjekt inklusive generierter Stufen-ID zurück.

Stufe aus Pipeline entfernen

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}

Entfernt eine spezifische Stufe aus der Pipeline. Nachfolgende Stufen werden automatisch neu sortiert.

Antwortcodes

  • 204 No Content - Stufe erfolgreich entfernt
  • 404 Not Found - Stufe in Pipeline nicht gefunden
  • 409 Conflict - Stufe kann nicht entfernt werden, solange die Pipeline ausgeführt wird

Beispiel: Komplett-Workflow einer Pipeline

Dieses Beispiel zeigt die Erstellung und Verwaltung einer Enrichment-Pipeline:

// 1. Erstelle eine neue Enrichment-Pipeline
const createPipeline = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      pipelineName: 'Order Processing Enrichment',
      pipelineDescription: 'Erweitert Bestelldaten mit Fulfillment-Kennzahlen',
      stages: [
        {
          stageName: 'Auftragsvalidierung',
          stageType: 'Validation',
          order: 1,
          configuration: {
            validateOrderId: true,
            validateCustomerId: true,
            validateAmounts: true
          }
        },
        {
          stageName: 'Berechnung der Erfüllungszeit',
          stageType: 'TimeCalculation',
          order: 2,
          configuration: {
            startActivity: 'Order Received',
            endActivity: 'Order Shipped',
            outputField: 'fulfillmentTime'
          }
        }
      ],
      triggers: {
        automatic: true,
        onDataUpdate: true
      }
    })
  });

  return await response.json();
};

// 2. Neue Stufe zu bestehender Pipeline hinzufügen
const addStage = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      stageName: 'Kundensegmentierung',
      stageType: 'Classification',
      order: 3,
      configuration: {
        segmentationRules: [
          {
            segment: 'VIP',
            condition: 'orderValue > 1000'
          },
          {
            segment: 'Regular',
            condition: 'orderValue <= 1000'
          }
        ]
      }
    })
  });

  return await response.json();
};

// 3. Status der Pipeline abrufen
const getPipelineStatus = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  return await response.json();
};

Python-Beispiel

import requests
import json
from datetime import datetime

class EnrichmentPipelineManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def create_pipeline(self, name, description, stages, triggers=None):
        """Erstellt eine neue Enrichment-Pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
        payload = {
            'pipelineName': name,
            'pipelineDescription': description,
            'stages': stages,
            'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_pipeline(self, pipeline_id):
        """Ermittelt Pipeline-Details"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_pipelines(self, status=None, page=1, page_size=20):
        """Listet alle Pipelines mit optionaler Filterung"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
        params = {'page': page, 'pageSize': page_size}
        if status:
            params['status'] = status

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
        """Fügt einer bestehenden Pipeline eine neue Stufe hinzu"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
        payload = {
            'stageName': stage_name,
            'stageType': stage_type,
            'order': order,
            'configuration': configuration
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
        """Aktualisiert die Pipeline-Konfiguration"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        payload = {}
        if name:
            payload['pipelineName'] = name
        if description:
            payload['pipelineDescription'] = description
        if status:
            payload['status'] = status
        if triggers:
            payload['triggers'] = triggers

        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def delete_pipeline(self, pipeline_id):
        """Löscht eine Pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.delete(url, headers=self.headers)
        return response.status_code == 204

# Beispiel zur Verwendung
manager = EnrichmentPipelineManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Erstelle eine umfassende Enrichment-Pipeline
stages = [
    {
        'stageName': 'Datenqualitätsprüfung',
        'stageType': 'Validation',
        'order': 1,
        'configuration': {
            'checkDuplicates': True,
            'validateTimestamps': True,
            'checkMissingValues': True
        }
    },
    {
        'stageName': 'Process Mining Kennzahlen',
        'stageType': 'ProcessCalculation',
        'order': 2,
        'configuration': {
            'calculateCycleTime': True,
            'calculateWaitingTime': True,
            'calculateResourceUtilization': True,
            'detectBottlenecks': True
        }
    },
    {
        'stageName': 'Anomalieerkennung',
        'stageType': 'AnomalyDetection',
        'order': 3,
        'configuration': {
            'algorithm': 'isolation_forest',
            'threshold': 0.1,
            'features': ['duration', 'cost', 'resourceCount']
        }
    }
]

pipeline = manager.create_pipeline(
    'Umfassende Prozessanalyse',
    'End-to-End Prozessanalyse mit Anomalieerkennung',
    stages,
    {'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)

print(f"Erstellte Pipeline: {pipeline['pipelineId']}")

# Liste alle aktiven Pipelines auf
active_pipelines = manager.list_pipelines(status='Active')
print(f"Gefundene aktive Pipelines: {active_pipelines['totalCount']}")