Pipeline-Ausführung

Ausführen von Enrichment-Pipelines

Führen Sie Enrichment-Pipelines auf Datensätzen aus, überwachen Sie den Fortschritt und rufen Sie verbesserte Ergebnisse ab.

Pipeline ausführen

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Löst die Ausführung einer Enrichment-Pipeline auf einem angegebenen Datensatz aus. Die Ausführung läuft asynchron und gibt eine Ausführungs-ID zurück, um den Fortschritt zu verfolgen.

Parameter

Parameter Typ Ort Beschreibung
tenantId GUID Pfad Die Mandantenkennung
projectId GUID Pfad Die Projektkennung
pipelineId GUID Pfad Die Pipeline-Kennung

Request Body

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Monatliche Prozessanalyse",
  "executionDescription": "Enrichment für die monatliche Leistungsbewertung",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Antwort

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 Minuten",
  "executionName": "Monatliche Prozessanalyse",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Datenvalidierung",
      "status": "Pending",
      "estimatedDuration": "2-3 Minuten"
    },
    {
      "stageId": "stage-002",
      "stageName": "Zeit-Enrichment",
      "status": "Pending",
      "estimatedDuration": "8-10 Minuten"
    }
  ]
}

Ausführungsstatus abrufen

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Ruft den aktuellen Status und Fortschrittsinformationen für eine Pipeline-Ausführung ab, einschließlich detailliertem Fortschritt für jede Phase.

Antwort

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Zeit-Enrichment",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Monatliche Prozessanalyse",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Datenvalidierung",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 Minuten",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Zeit-Enrichment",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Ausführungsergebnisse abrufen

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Ruft die endgültigen Ergebnisse einer abgeschlossenen Pipeline-Ausführung ab, einschließlich angereicherter Daten, Zusammenfassungsstatistiken und herunterladbarer Ausgaben.

Abfrageparameter

Parameter Typ Beschreibung
format string Antwortformat: summary, full, download (Standard: summary)
includeRawData boolean Originaldatensatz in der Antwort einschließen (Standard: false)
limit integer Begrenzung der zurückgegebenen Datensätze (max: 10000)

Antwort

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 Minuten",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Wochentag für jedes Ereignis"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Ob das Ereignis während der Geschäftszeiten stattgefunden hat"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4,2 Stunden",
      "description": "Zeit vom Start bis zum Abschluss des Vorgangs"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Fehlender Zeitstempel",
        "count": 2,
        "severity": "Hoch"
      },
      {
        "type": "Ungültige Dauer",
        "count": 15,
        "severity": "Mittel"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

Liste der Pipeline-Ausführungen

GET /api/{tenantId}/{projectId}/enrichment/executions

Ruft eine Liste aller Pipeline-Ausführungen mit Filter- und Paginierungsoptionen ab. Nützlich zur Überwachung der Ausführungshistorie und der Leistung.

Abfrageparameter

Parameter Typ Beschreibung
pipelineId GUID Nach spezifischer Pipeline filtern
status string Nach Status filtern: Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Ausführungen ab diesem Datum filtern
dateTo datetime Ausführungen bis zu diesem Datum filtern
page integer Seitenzahl für Paginierung (Standard: 1)
pageSize integer Anzahl der Elemente pro Seite (Standard: 20, max: 100)

Antwort

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "executionName": "Monatliche Prozessanalyse",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 Minuten",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Ausführung abbrechen

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Bricht eine laufende oder in der Warteschlange befindliche Pipeline-Ausführung ab. Abgeschlossene Phasen bleiben erhalten, aber die Ausführung wird in der aktuellen Phase gestoppt.

Optionale Request Body

{
  "reason": "Benutzer hat Abbruch angefordert",
  "preservePartialResults": true
}

Antwortcodes

  • 200 OK - Ausführung erfolgreich abgebrochen
  • 404 Not Found - Ausführung nicht gefunden
  • 409 Conflict - Ausführung bereits abgeschlossen oder kann nicht abgebrochen werden

Fehlgeschlagene Ausführung neu starten

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Startet eine fehlgeschlagene Pipeline-Ausführung ab dem Fehlerpunkt neu. Bereits abgeschlossene Phasen werden übersprungen, sofern nicht explizit eine Wiederholung angefordert wird.

Request Body

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Antwort

Gibt 200 OK mit einem neuen Ausführungsobjekt zurück, das eine aktualisierte Ausführungs-ID und Status enthält.

Beispiel: Vollständiger Ausführungsworkflow

Dieses Beispiel zeigt die Ausführung einer Pipeline und die Überwachung ihres Fortschritts:

// 1. Pipeline ausführen
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Analyse der Customer Journey',
      executionDescription: 'Anreicherung von Kundendaten mit Journey-Metriken',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Ausführungsfortschritt überwachen
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Status: ${execution.status}, Fortschritt: ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // Erneut in 30 Sekunden prüfen
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Ausführung erfolgreich abgeschlossen!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Ausführung fehlgeschlagen:', execution.error);
    }
  };

  await checkStatus();
};

// 3. Ergebnisse abrufen, wenn abgeschlossen
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Enrichment-Zusammenfassung:', results.summary);
  console.log('Download-URLs:', results.downloadUrls);

  return results;
};

// Workflow ausführen
executeEnrichment()
  .then(execution => {
    console.log(`Ausführung gestartet: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Ausführung fehlgeschlagen:', error));

Python-Beispiel

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Führt eine Enrichment-Pipeline aus"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Gibt den aktuellen Ausführungsstatus zurück"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Wartet auf Abschluss der Ausführung mit periodischen Statusprüfungen"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Ausführung {execution_id}: {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"Ausführung {execution_id} wurde nicht innerhalb von {timeout} Sekunden abgeschlossen")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Holt die Ausführungsergebnisse"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="Benutzerabbruch"):
        """Bricht eine laufende Ausführung ab"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """Listet Pipeline-Ausführungen mit Filterung auf"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Beispiel für die Nutzung
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Pipeline mit benutzerdefinierten Parametern ausführen
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Ausführung starten
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Monatliche Prozessanalyse',
        execution_params,
        'High'
    )

    print(f"Ausführung gestartet: {execution['executionId']}")
    print(f"Geschätzte Dauer: {execution['estimatedDuration']}")

    # Auf Abschluss warten
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Ergebnisse abrufen
        results = manager.get_execution_results(execution['executionId'])
        print(f"Enrichment erfolgreich abgeschlossen!")
        print(f"Originaldatensätze: {results['summary']['originalRecords']}")
        print(f"Angereicherte Datensätze: {results['summary']['enrichedRecords']}")
        print(f"Datenqualitätsbewertung: {results['summary']['dataQualityScore']}")
        print(f"Download angereicherte Daten: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Ausführung fehlgeschlagen mit Status: {final_status['status']}")

except Exception as e:
    print(f"Fehler bei der Pipeline-Ausführung: {e}")