Enrichment Pipelines
Datenanreicherungs-Workflows erstellen
Erstellen und verwalten Sie Enrichment-Pipelines, um Ihre Process-Mining-Datensätze zu transformieren und zu verbessern.
Pipeline-Details abrufen
GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Ruft umfassende Informationen zu einer bestimmten Enrichment-Pipeline ab, einschließlich deren Stufen, Konfiguration und Ausführungsmetadaten.
Parameter
| Parameter | Typ | Ort | Beschreibung |
|---|---|---|---|
tenantId |
GUID | Pfad | Die Mandantenkennung |
projectId |
GUID | Pfad | Die Projektkennung |
pipelineId |
GUID | Pfad | Die Pipeline-Kennung |
Antwort
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"pipelineDescription": "Erweitert Ereignisprotokolle mit zusätzlichen Attributen und Berechnungen",
"status": "Active",
"stages": [
{
"stageId": "stage-001",
"stageName": "Datenvalidierung",
"stageType": "Validation",
"order": 1,
"configuration": {
"validateCaseId": true,
"validateTimestamps": true,
"requireActivityNames": true
}
},
{
"stageId": "stage-002",
"stageName": "Zeitliche Anreicherung",
"stageType": "TimeCalculation",
"order": 2,
"configuration": {
"addDayOfWeek": true,
"addBusinessHours": true,
"timezoneId": "UTC"
}
}
],
"triggers": {
"automatic": true,
"schedule": "0 2 * * *",
"onDataUpdate": true
},
"dateCreated": "2024-01-15T10:30:00Z",
"dateModified": "2024-01-20T14:45:00Z",
"createdBy": "user123",
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"executionCount": 45
}
Alle Pipelines auflisten
GET /api/{tenantId}/{projectId}/enrichment/pipelines
Ruft eine Liste aller Enrichment-Pipelines im Projekt mit grundlegenden Metadaten und Statusinformationen ab.
Abfrageparameter
| Parameter | Typ | Beschreibung |
|---|---|---|
status |
string | Filter nach Pipeline-Status: Active, Inactive, Failed |
page |
integer | Seitenzahl für Pagination (Standard: 1) |
pageSize |
integer | Anzahl der Elemente pro Seite (Standard: 20, max.: 100) |
Antwort
{
"pipelines": [
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"status": "Active",
"stageCount": 5,
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"dateCreated": "2024-01-15T10:30:00Z"
}
],
"totalCount": 12,
"page": 1,
"pageSize": 20,
"hasNextPage": false
}
Neue Pipeline erstellen
POST /api/{tenantId}/{projectId}/enrichment/pipeline
Erstellt eine neue Enrichment-Pipeline mit angegebenen Stufen und Konfiguration. Die Pipeline kann so konfiguriert werden, dass sie automatisch oder manuell ausgeführt wird.
Anfragetext
{
"pipelineName": "Customer Journey Enrichment",
"pipelineDescription": "Erweitert Kundendaten der Customer Journey mit Demografie- und Verhaltensmustern",
"stages": [
{
"stageName": "Kundendaten-Abfrage",
"stageType": "DataLookup",
"order": 1,
"configuration": {
"lookupTable": "customer_demographics",
"joinKey": "customerId",
"selectFields": ["age", "segment", "region"]
}
},
{
"stageName": "Journey-Metriken",
"stageType": "Calculation",
"order": 2,
"configuration": {
"calculations": [
{
"fieldName": "journeyDuration",
"formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
"groupBy": "caseId"
},
{
"fieldName": "touchpointCount",
"formula": "COUNT(*)",
"groupBy": "caseId"
}
]
}
}
],
"triggers": {
"automatic": false,
"schedule": null,
"onDataUpdate": true
}
}
Antwort
Gibt 201 Created mit dem vollständigen Pipeline-Objekt inklusive generierter IDs und Zeitstempel zurück.
Pipeline aktualisieren
PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Aktualisiert die Konfiguration, Stufen oder Trigger einer existierenden Pipeline. Änderungen werden bei der nächsten Ausführung wirksam.
Anfragetext
{
"pipelineName": "Aktualisierte Customer Journey Enrichment",
"pipelineDescription": "Erweiterte Anreicherung der Customer Journey Daten mit ML-Erkenntnissen",
"status": "Active",
"triggers": {
"automatic": true,
"schedule": "0 3 * * *",
"onDataUpdate": true
}
}
Antwort
Gibt das aktualisierte Pipeline-Objekt mit derselben Struktur wie der GET-Endpunkt zurück.
Pipeline löschen
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Entfernt eine Pipeline und deren gesamte Ausführungshistorie dauerhaft. Dieser Vorgang kann nicht rückgängig gemacht werden und stoppt alle aktuell laufenden Ausführungen.
Antwortcodes
204 No Content- Pipeline erfolgreich gelöscht404 Not Found- Pipeline nicht gefunden oder Zugriff verweigert409 Conflict- Pipeline wird aktuell ausgeführt und kann nicht gelöscht werden
Stufe zur Pipeline hinzufügen
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage
Fügt einer bestehenden Pipeline eine neue Verarbeitungsetappe hinzu. Die Stufe wird an der angegebenen Reihenfolge eingefügt.
Anfragetext
{
"stageName": "Leistungskennzahlen des Prozesses",
"stageType": "PerformanceCalculation",
"order": 3,
"configuration": {
"metrics": [
{
"name": "cycleTime",
"calculation": "CASE_DURATION",
"unit": "hours"
},
{
"name": "waitTime",
"calculation": "ACTIVITY_WAITING_TIME",
"unit": "hours"
}
],
"aggregations": ["AVG", "MAX", "MIN", "P95"]
}
}
Antwort
Gibt 201 Created mit dem vollständigen Stufenobjekt inklusive generierter Stufen-ID zurück.
Stufe aus Pipeline entfernen
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}
Entfernt eine spezifische Stufe aus der Pipeline. Nachfolgende Stufen werden automatisch neu sortiert.
Antwortcodes
204 No Content- Stufe erfolgreich entfernt404 Not Found- Stufe in Pipeline nicht gefunden409 Conflict- Stufe kann nicht entfernt werden, solange die Pipeline ausgeführt wird
Beispiel: Komplett-Workflow einer Pipeline
Dieses Beispiel zeigt die Erstellung und Verwaltung einer Enrichment-Pipeline:
// 1. Erstelle eine neue Enrichment-Pipeline
const createPipeline = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
pipelineName: 'Order Processing Enrichment',
pipelineDescription: 'Erweitert Bestelldaten mit Fulfillment-Kennzahlen',
stages: [
{
stageName: 'Auftragsvalidierung',
stageType: 'Validation',
order: 1,
configuration: {
validateOrderId: true,
validateCustomerId: true,
validateAmounts: true
}
},
{
stageName: 'Berechnung der Erfüllungszeit',
stageType: 'TimeCalculation',
order: 2,
configuration: {
startActivity: 'Order Received',
endActivity: 'Order Shipped',
outputField: 'fulfillmentTime'
}
}
],
triggers: {
automatic: true,
onDataUpdate: true
}
})
});
return await response.json();
};
// 2. Neue Stufe zu bestehender Pipeline hinzufügen
const addStage = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
stageName: 'Kundensegmentierung',
stageType: 'Classification',
order: 3,
configuration: {
segmentationRules: [
{
segment: 'VIP',
condition: 'orderValue > 1000'
},
{
segment: 'Regular',
condition: 'orderValue <= 1000'
}
]
}
})
});
return await response.json();
};
// 3. Status der Pipeline abrufen
const getPipelineStatus = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
return await response.json();
};
Python-Beispiel
import requests
import json
from datetime import datetime
class EnrichmentPipelineManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def create_pipeline(self, name, description, stages, triggers=None):
"""Erstellt eine neue Enrichment-Pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
payload = {
'pipelineName': name,
'pipelineDescription': description,
'stages': stages,
'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_pipeline(self, pipeline_id):
"""Ermittelt Pipeline-Details"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_pipelines(self, status=None, page=1, page_size=20):
"""Listet alle Pipelines mit optionaler Filterung"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
"""Fügt einer bestehenden Pipeline eine neue Stufe hinzu"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
payload = {
'stageName': stage_name,
'stageType': stage_type,
'order': order,
'configuration': configuration
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
"""Aktualisiert die Pipeline-Konfiguration"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
payload = {}
if name:
payload['pipelineName'] = name
if description:
payload['pipelineDescription'] = description
if status:
payload['status'] = status
if triggers:
payload['triggers'] = triggers
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def delete_pipeline(self, pipeline_id):
"""Löscht eine Pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.delete(url, headers=self.headers)
return response.status_code == 204
# Beispiel zur Verwendung
manager = EnrichmentPipelineManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Erstelle eine umfassende Enrichment-Pipeline
stages = [
{
'stageName': 'Datenqualitätsprüfung',
'stageType': 'Validation',
'order': 1,
'configuration': {
'checkDuplicates': True,
'validateTimestamps': True,
'checkMissingValues': True
}
},
{
'stageName': 'Process Mining Kennzahlen',
'stageType': 'ProcessCalculation',
'order': 2,
'configuration': {
'calculateCycleTime': True,
'calculateWaitingTime': True,
'calculateResourceUtilization': True,
'detectBottlenecks': True
}
},
{
'stageName': 'Anomalieerkennung',
'stageType': 'AnomalyDetection',
'order': 3,
'configuration': {
'algorithm': 'isolation_forest',
'threshold': 0.1,
'features': ['duration', 'cost', 'resourceCount']
}
}
]
pipeline = manager.create_pipeline(
'Umfassende Prozessanalyse',
'End-to-End Prozessanalyse mit Anomalieerkennung',
stages,
{'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)
print(f"Erstellte Pipeline: {pipeline['pipelineId']}")
# Liste alle aktiven Pipelines auf
active_pipelines = manager.list_pipelines(status='Active')
print(f"Gefundene aktive Pipelines: {active_pipelines['totalCount']}")