Pipeline-Ausführung
Ausführen von Enrichment-Pipelines
Führen Sie Enrichment-Pipelines auf Datensätzen aus, überwachen Sie den Fortschritt und rufen Sie verbesserte Ergebnisse ab.
Pipeline ausführen
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute
Löst die Ausführung einer Enrichment-Pipeline auf einem angegebenen Datensatz aus. Die Ausführung läuft asynchron und gibt eine Ausführungs-ID zurück, um den Fortschritt zu verfolgen.
Parameter
| Parameter | Typ | Ort | Beschreibung |
|---|---|---|---|
tenantId |
GUID | Pfad | Die Mandantenkennung |
projectId |
GUID | Pfad | Die Projektkennung |
pipelineId |
GUID | Pfad | Die Pipeline-Kennung |
Request Body
{
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"executionName": "Monatliche Prozessanalyse",
"executionDescription": "Enrichment für die monatliche Leistungsbewertung",
"parameters": {
"timeRange": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"filterCriteria": {
"includeWeekends": false,
"minCaseDuration": "1h"
},
"outputOptions": {
"includeRawData": true,
"generateSummary": true,
"exportFormat": "CSV"
}
},
"priority": "Normal",
"notifyOnCompletion": true
}
Antwort
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"estimatedDuration": "15-20 Minuten",
"executionName": "Monatliche Prozessanalyse",
"dateSubmitted": "2024-01-20T10:30:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Datenvalidierung",
"status": "Pending",
"estimatedDuration": "2-3 Minuten"
},
{
"stageId": "stage-002",
"stageName": "Zeit-Enrichment",
"status": "Pending",
"estimatedDuration": "8-10 Minuten"
}
]
}
Ausführungsstatus abrufen
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Ruft den aktuellen Status und Fortschrittsinformationen für eine Pipeline-Ausführung ab, einschließlich detailliertem Fortschritt für jede Phase.
Antwort
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"progress": 45,
"currentStage": {
"stageId": "stage-002",
"stageName": "Zeit-Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"estimatedCompletion": "2024-01-20T10:45:00Z"
},
"executionName": "Monatliche Prozessanalyse",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateStarted": "2024-01-20T10:32:00Z",
"estimatedCompletion": "2024-01-20T10:50:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Datenvalidierung",
"status": "Completed",
"progress": 100,
"startTime": "2024-01-20T10:32:00Z",
"endTime": "2024-01-20T10:35:00Z",
"duration": "3 Minuten",
"recordsProcessed": 15420,
"validationResults": {
"totalRecords": 15420,
"validRecords": 15418,
"errors": 2,
"warnings": 15
}
},
{
"stageId": "stage-002",
"stageName": "Zeit-Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"recordsProcessed": 9252,
"totalRecords": 15418
}
],
"metrics": {
"totalRecords": 15420,
"processedRecords": 9252,
"errorCount": 2,
"warningCount": 15
}
}
Ausführungsergebnisse abrufen
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results
Ruft die endgültigen Ergebnisse einer abgeschlossenen Pipeline-Ausführung ab, einschließlich angereicherter Daten, Zusammenfassungsstatistiken und herunterladbarer Ausgaben.
Abfrageparameter
| Parameter | Typ | Beschreibung |
|---|---|---|
format |
string | Antwortformat: summary, full, download (Standard: summary) |
includeRawData |
boolean | Originaldatensatz in der Antwort einschließen (Standard: false) |
limit |
integer | Begrenzung der zurückgegebenen Datensätze (max: 10000) |
Antwort
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionDate": "2024-01-20T10:48:00Z",
"totalDuration": "18 Minuten",
"summary": {
"originalRecords": 15420,
"enrichedRecords": 15418,
"newAttributes": 8,
"dataQualityScore": 98.7,
"enrichmentCoverage": 99.9
},
"enrichedAttributes": [
{
"attributeName": "dayOfWeek",
"attributeType": "string",
"coverage": 100,
"uniqueValues": 7,
"description": "Wochentag für jedes Ereignis"
},
{
"attributeName": "businessHours",
"attributeType": "boolean",
"coverage": 100,
"description": "Ob das Ereignis während der Geschäftszeiten stattgefunden hat"
},
{
"attributeName": "cycleTime",
"attributeType": "duration",
"coverage": 99.8,
"averageValue": "4,2 Stunden",
"description": "Zeit vom Start bis zum Abschluss des Vorgangs"
}
],
"dataQuality": {
"completeness": 99.9,
"accuracy": 98.5,
"consistency": 99.2,
"validity": 97.8,
"issues": [
{
"type": "Fehlender Zeitstempel",
"count": 2,
"severity": "Hoch"
},
{
"type": "Ungültige Dauer",
"count": 15,
"severity": "Mittel"
}
]
},
"downloadUrls": {
"enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
"summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
"dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
}
}
Liste der Pipeline-Ausführungen
GET /api/{tenantId}/{projectId}/enrichment/executions
Ruft eine Liste aller Pipeline-Ausführungen mit Filter- und Paginierungsoptionen ab. Nützlich zur Überwachung der Ausführungshistorie und der Leistung.
Abfrageparameter
| Parameter | Typ | Beschreibung |
|---|---|---|
pipelineId |
GUID | Nach spezifischer Pipeline filtern |
status |
string | Nach Status filtern: Queued, Running, Completed, Failed, Cancelled |
dateFrom |
datetime | Ausführungen ab diesem Datum filtern |
dateTo |
datetime | Ausführungen bis zu diesem Datum filtern |
page |
integer | Seitenzahl für Paginierung (Standard: 1) |
pageSize |
integer | Anzahl der Elemente pro Seite (Standard: 20, max: 100) |
Antwort
{
"executions": [
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"executionName": "Monatliche Prozessanalyse",
"status": "Completed",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateCompleted": "2024-01-20T10:48:00Z",
"duration": "18 Minuten",
"recordsProcessed": 15418,
"priority": "Normal",
"submittedBy": "user123"
}
],
"totalCount": 47,
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Ausführung abbrechen
DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Bricht eine laufende oder in der Warteschlange befindliche Pipeline-Ausführung ab. Abgeschlossene Phasen bleiben erhalten, aber die Ausführung wird in der aktuellen Phase gestoppt.
Optionale Request Body
{
"reason": "Benutzer hat Abbruch angefordert",
"preservePartialResults": true
}
Antwortcodes
200 OK- Ausführung erfolgreich abgebrochen404 Not Found- Ausführung nicht gefunden409 Conflict- Ausführung bereits abgeschlossen oder kann nicht abgebrochen werden
Fehlgeschlagene Ausführung neu starten
POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart
Startet eine fehlgeschlagene Pipeline-Ausführung ab dem Fehlerpunkt neu. Bereits abgeschlossene Phasen werden übersprungen, sofern nicht explizit eine Wiederholung angefordert wird.
Request Body
{
"restartFromStage": "stage-003",
"rerunCompletedStages": false,
"updateParameters": {
"retryFailedRecords": true,
"increaseTimeout": true
}
}
Antwort
Gibt 200 OK mit einem neuen Ausführungsobjekt zurück, das eine aktualisierte Ausführungs-ID und Status enthält.
Beispiel: Vollständiger Ausführungsworkflow
Dieses Beispiel zeigt die Ausführung einer Pipeline und die Überwachung ihres Fortschritts:
// 1. Pipeline ausführen
const executeEnrichment = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
datasetId: '880e8400-e29b-41d4-a716-446655440000',
executionName: 'Analyse der Customer Journey',
executionDescription: 'Anreicherung von Kundendaten mit Journey-Metriken',
parameters: {
timeRange: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
outputOptions: {
includeRawData: true,
generateSummary: true,
exportFormat: 'CSV'
}
},
priority: 'High',
notifyOnCompletion: true
})
});
return await response.json();
};
// 2. Ausführungsfortschritt überwachen
const monitorExecution = async (executionId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const execution = await response.json();
console.log(`Status: ${execution.status}, Fortschritt: ${execution.progress}%`);
if (execution.status === 'Running' || execution.status === 'Queued') {
// Erneut in 30 Sekunden prüfen
setTimeout(() => checkStatus(), 30000);
} else if (execution.status === 'Completed') {
console.log('Ausführung erfolgreich abgeschlossen!');
await getResults(executionId);
} else if (execution.status === 'Failed') {
console.log('Ausführung fehlgeschlagen:', execution.error);
}
};
await checkStatus();
};
// 3. Ergebnisse abrufen, wenn abgeschlossen
const getResults = async (executionId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Enrichment-Zusammenfassung:', results.summary);
console.log('Download-URLs:', results.downloadUrls);
return results;
};
// Workflow ausführen
executeEnrichment()
.then(execution => {
console.log(`Ausführung gestartet: ${execution.executionId}`);
return monitorExecution(execution.executionId);
})
.catch(error => console.error('Ausführung fehlgeschlagen:', error));
Python-Beispiel
import requests
import time
import json
from datetime import datetime, timedelta
class PipelineExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
"""Führt eine Enrichment-Pipeline aus"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
payload = {
'datasetId': dataset_id,
'executionName': execution_name,
'parameters': parameters or {},
'priority': priority,
'notifyOnCompletion': True
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_execution_status(self, execution_id):
"""Gibt den aktuellen Ausführungsstatus zurück"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
"""Wartet auf Abschluss der Ausführung mit periodischen Statusprüfungen"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_execution_status(execution_id)
print(f"Ausführung {execution_id}: {status['status']} ({status.get('progress', 0)}%)")
if status['status'] in ['Completed', 'Failed', 'Cancelled']:
return status
time.sleep(poll_interval)
raise TimeoutError(f"Ausführung {execution_id} wurde nicht innerhalb von {timeout} Sekunden abgeschlossen")
def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
"""Holt die Ausführungsergebnisse"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
params = {
'format': format_type,
'includeRawData': include_raw_data
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_execution(self, execution_id, reason="Benutzerabbruch"):
"""Bricht eine laufende Ausführung ab"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
payload = {
'reason': reason,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
"""Listet Pipeline-Ausführungen mit Filterung auf"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
params = {'page': page, 'pageSize': page_size}
if pipeline_id:
params['pipelineId'] = pipeline_id
if status:
params['status'] = status
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
# Beispiel für die Nutzung
manager = PipelineExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Pipeline mit benutzerdefinierten Parametern ausführen
execution_params = {
'timeRange': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'filterCriteria': {
'includeWeekends': False,
'minCaseDuration': '1h'
},
'outputOptions': {
'includeRawData': True,
'generateSummary': True,
'exportFormat': 'CSV'
}
}
try:
# Ausführung starten
execution = manager.execute_pipeline(
'pipeline-guid',
'dataset-guid',
'Monatliche Prozessanalyse',
execution_params,
'High'
)
print(f"Ausführung gestartet: {execution['executionId']}")
print(f"Geschätzte Dauer: {execution['estimatedDuration']}")
# Auf Abschluss warten
final_status = manager.wait_for_completion(execution['executionId'])
if final_status['status'] == 'Completed':
# Ergebnisse abrufen
results = manager.get_execution_results(execution['executionId'])
print(f"Enrichment erfolgreich abgeschlossen!")
print(f"Originaldatensätze: {results['summary']['originalRecords']}")
print(f"Angereicherte Datensätze: {results['summary']['enrichedRecords']}")
print(f"Datenqualitätsbewertung: {results['summary']['dataQualityScore']}")
print(f"Download angereicherte Daten: {results['downloadUrls']['enrichedDataset']}")
else:
print(f"Ausführung fehlgeschlagen mit Status: {final_status['status']}")
except Exception as e:
print(f"Fehler bei der Pipeline-Ausführung: {e}")