Ausführung

Job-Execution-API

Verwalten und Überwachen der Ausführung von Process Mining-Jobs, Handhabung asynchroner Operationen und Verfolgung des Jobfortschritts in Echtzeit.

Funktionen

Job-Warteschlange

Verwalten der Job-Warteschlange und Prioritäten.

Warteschlange anzeigen

Job-Tracking

Verfolgen des Jobstatus und Fortschritts.

Jobs verfolgen

Async-Operationen

Handhabung lang andauernder asynchroner Operationen.

Async-Operationen

Job-Status abrufen

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Ruft den aktuellen Status und Details eines Ausführungsjobs ab, einschließlich Fortschrittsinformationen, Ausführungsmetriken und Abschlussstatus.

Parameter

Parameter Typ Standort Beschreibung
tenantId GUID Pfad Die Mandanten-ID
projectId GUID Pfad Die Projekt-ID
jobId GUID Pfad Die Job-ID

Antwort

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Umfassende Analyse von Kundenkontaktpunkten und -verhalten",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 minutes 32 seconds"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Customer Analytics Pipeline"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 records/second"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

Alle Jobs auflisten

GET /api/{tenantId}/{projectId}/execution/jobs

Ruft eine paginierte Liste aller Ausführungsjobs im Projekt ab mit Filteroptionen für Status, Jobtyp und Datumsbereiche.

Query-Parameter

Parameter Typ Beschreibung
status string Nach Status filtern: Queued, Running, Completed, Failed, Cancelled
jobType string Nach Jobtyp filtern: ProcessMining, DataEnrichment, Notebook, Analysis
priority string Nach Priorität filtern: Low, Normal, High, Critical
submittedBy string Nach Benutzer filtern, der den Job eingereicht hat
dateFrom datetime Jobs ab diesem Datum filtern
dateTo datetime Jobs bis zu diesem Datum filtern
page integer Seitenzahl für Paginierung (Standard: 1)
pageSize integer Anzahl der Elemente pro Seite (Standard: 20, max: 100)

Antwort

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Customer Analytics Pipeline"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 minutes",
      "submittedBy": "system",
      "resourceName": "Sales Data Pipeline"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Neuen Job einreichen

POST /api/{tenantId}/{projectId}/execution/job

Reicht einen neuen Ausführungsjob im System ein. Der Job wird in die Warteschlange gestellt und basierend auf Priorität und Ressourcenverfügbarkeit verarbeitet.

Anfragetext

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Automatisierte wöchentliche Analyse der Prozessleistung",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Antwort

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 minutes",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

Job abbrechen

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Bricht einen wartenden oder laufenden Job ab. Abgeschlossene Jobs können nicht abgebrochen werden. Laufende Jobs werden wenn möglich schonend gestoppt.

Anfragetext (Optional)

{
  "reason": "User requested cancellation",
  "forceTermination": false,
  "preservePartialResults": true
}

Antwortcodes

  • 200 OK - Job erfolgreich abgebrochen
  • 404 Not Found - Job nicht gefunden
  • 409 Conflict - Job bereits abgeschlossen oder kann nicht abgebrochen werden

Job-Ergebnisse abrufen

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Ruft die Ergebnisse und Ausgaben eines abgeschlossenen Ausführungsjobs ab, einschließlich erzeugter Artefakte, Berichte und Datendateien.

Query-Parameter

Parameter Typ Beschreibung
format string Antwortformat: summary, detailed, download (Standard: summary)
includeArtifacts boolean Beinhaltet herunterladbare Artefakte in der Antwort (Standard: true)
outputType string Nach Ausgabetyp filtern: reports, data, models, visualizations

Antwort

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 minutes",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 minutes",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Some timestamps had to be inferred",
        "count": 125
      }
    ]
  }
}

Fehlgeschlagenen Job erneut versuchen

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Versucht eine fehlgeschlagene Jobausführung mit optionalen Parameteränderungen erneut. Der Job wird mit gleicher oder aktualisierter Konfiguration wieder in die Warteschlange gestellt.

Anfragetext

{
  "retryReason": "Infrastructure issue resolved",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Antwort

Gibt 200 OK mit einem neuen Job-Objekt zurück, das die aktualisierte Job-ID und Retry-Informationen enthält.

Systemausführungsstatus abrufen

GET /api/{tenantId}/execution/system/status

Ruft den aktuellen systemweiten Ausführungsstatus ab, einschließlich Ressourcenauslastung, Warteschlangen-Gesundheit und Leistungsmetriken.

Antwort

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 minutes",
    "estimatedProcessingTime": "23 minutes"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 minutes",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Beispiel: Komplett-Workflow zur Jobverwaltung

Dieses Beispiel zeigt das Einreichen eines Jobs, die Überwachung des Fortschritts und das Abrufen der Ergebnisse:

// 1. Neuen Job einreichen
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Wöchentliche Analyse von Kundeninteraktionsmustern',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. Job-Fortschritt überwachen
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Aktuelle Phase: ${job.progress.currentStage}`);
    console.log(`Geschätzte Fertigstellung: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Alle 30 Sekunden prüfen
    } else if (job.status === 'Completed') {
      console.log('Job erfolgreich abgeschlossen!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('Job fehlgeschlagen:', job.error);
    }
  };

  await checkStatus();
};

// 3. Job-Ergebnisse abrufen
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Job-Ergebnisse:', results.summary);
  console.log('Hauptergebnis:', results.results.primaryOutput.downloadUrl);

  // Zusätzliche Ergebnisse herunterladen
  for (const output of results.results.additionalOutputs) {
    console.log(`Download ${output.type}: ${output.downloadUrl}`);
  }

  return results;
};

// 4. Systemstatus abrufen
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`Systemstatus: ${status.systemStatus}`);
  console.log(`Warteschlange: ${status.queueStatistics.totalQueuedJobs} wartende Jobs`);
  console.log(`Durchschnittliche Wartezeit: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// Workflow ausführen
submitJob()
  .then(job => {
    console.log(`Job eingereicht: ${job.jobId}`);
    console.log(`Position in der Warteschlange: ${job.queuePosition}`);
    console.log(`Geschätzter Beginn: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('Job-Workflow fehlgeschlagen:', error));

Python-Beispiel

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Reicht einen neuen Ausführungsjob ein"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Liefert aktuellen Jobstatus"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """Listet Jobs mit optionalem Filter"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """Wartet auf Jobabschluss mit periodischen Statusprüfungen"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"Job {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Aktuelle Phase: {job['progress']['currentStage']}")
            print(f"  Verstrichene Zeit: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"Job {job_id} hat innerhalb von {timeout} Sekunden nicht abgeschlossen")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """Liefert Ausführungsergebnisse eines Jobs"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="User cancellation", force=False):
        """Bricht einen laufenden oder wartenden Job ab"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
        """Versucht einen fehlgeschlagenen Job erneut"""
        url = f"{self.base_url}/api/{self.tenantId}/{self.projectId}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Liefert systemweiten Ausführungsstatus"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Beispiel zur Verwendung
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Systemstatus abfragen
    system_status = manager.get_system_status()
    print(f"Systemstatus: {system_status['systemStatus']}")
    print(f"Jobs in Warteschlange: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Durchschnittliche Wartezeit: {system_status['queueStatistics']['averageWaitTime']}")

    # Einen umfassenden Process Mining-Job einreichen
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Job eingereicht: {job['jobId']}")
    print(f"Position in Warteschlange: {job['queuePosition']}")
    print(f"Geschätzter Start: {job['estimatedStartTime']}")

    # Auf Abschluss warten
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Detaillierte Ergebnisse abrufen
        results = manager.get_job_results(job['jobId'])

        print("Job erfolgreich abgeschlossen!")
        print(f"Verarbeitete Datensätze: {results['summary']['recordsProcessed']:,}")
        print(f"Datenqualitätsbewertung: {results['summary']['dataQualityScore']}")
        print(f"Verarbeitungseffizienz: {results['summary']['processingEfficiency']}%")

        # Primären Bericht herunterladen
        print(f"Bericht herunterladen: {results['results']['primaryOutput']['downloadUrl']}")

        # Alle weiteren Ausgaben auflisten
        for output in results['results']['additionalOutputs']:
            print(f"Download {output['type']}: {output['downloadUrl']}")

    else:
        print(f"Job fehlgeschlagen mit Status: {final_job['status']}")
        if 'error' in final_job:
            print(f"Fehler: {final_job['error']}")

except Exception as e:
    print(f"Fehler im Ausführungsworkflow: {e}")