Ausführung
Job-Execution-API
Verwalten und Überwachen der Ausführung von Process Mining-Jobs, Handhabung asynchroner Operationen und Verfolgung des Jobfortschritts in Echtzeit.
Funktionen
Job-Warteschlange
Verwalten der Job-Warteschlange und Prioritäten.
Job-Tracking
Verfolgen des Jobstatus und Fortschritts.
Async-Operationen
Handhabung lang andauernder asynchroner Operationen.
Job-Status abrufen
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
Ruft den aktuellen Status und Details eines Ausführungsjobs ab, einschließlich Fortschrittsinformationen, Ausführungsmetriken und Abschlussstatus.
Parameter
| Parameter | Typ | Standort | Beschreibung |
|---|---|---|---|
tenantId |
GUID | Pfad | Die Mandanten-ID |
projectId |
GUID | Pfad | Die Projekt-ID |
jobId |
GUID | Pfad | Die Job-ID |
Antwort
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"jobDescription": "Umfassende Analyse von Kundenkontaktpunkten und -verhalten",
"status": "Running",
"priority": "High",
"progress": {
"percentage": 65,
"currentStage": "Data Processing",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8 minutes 32 seconds"
},
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "Customer Analytics Pipeline"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 records/second"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
Alle Jobs auflisten
GET /api/{tenantId}/{projectId}/execution/jobs
Ruft eine paginierte Liste aller Ausführungsjobs im Projekt ab mit Filteroptionen für Status, Jobtyp und Datumsbereiche.
Query-Parameter
| Parameter | Typ | Beschreibung |
|---|---|---|
status |
string | Nach Status filtern: Queued, Running, Completed, Failed, Cancelled |
jobType |
string | Nach Jobtyp filtern: ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | Nach Priorität filtern: Low, Normal, High, Critical |
submittedBy |
string | Nach Benutzer filtern, der den Job eingereicht hat |
dateFrom |
datetime | Jobs ab diesem Datum filtern |
dateTo |
datetime | Jobs bis zu diesem Datum filtern |
page |
integer | Seitenzahl für Paginierung (Standard: 1) |
pageSize |
integer | Anzahl der Elemente pro Seite (Standard: 20, max: 100) |
Antwort
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"status": "Running",
"priority": "High",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "Customer Analytics Pipeline"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "Daily Sales Enrichment",
"status": "Completed",
"priority": "Normal",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 minutes",
"submittedBy": "system",
"resourceName": "Sales Data Pipeline"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Neuen Job einreichen
POST /api/{tenantId}/{projectId}/execution/job
Reicht einen neuen Ausführungsjob im System ein. Der Job wird in die Warteschlange gestellt und basierend auf Priorität und Ressourcenverfügbarkeit verarbeitet.
Anfragetext
{
"jobName": "Weekly Process Analysis",
"jobDescription": "Automatisierte wöchentliche Analyse der Prozessleistung",
"jobType": "ProcessMining",
"priority": "Normal",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
Antwort
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 minutes",
"jobName": "Weekly Process Analysis",
"priority": "Normal",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
Job abbrechen
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
Bricht einen wartenden oder laufenden Job ab. Abgeschlossene Jobs können nicht abgebrochen werden. Laufende Jobs werden wenn möglich schonend gestoppt.
Anfragetext (Optional)
{
"reason": "User requested cancellation",
"forceTermination": false,
"preservePartialResults": true
}
Antwortcodes
200 OK- Job erfolgreich abgebrochen404 Not Found- Job nicht gefunden409 Conflict- Job bereits abgeschlossen oder kann nicht abgebrochen werden
Job-Ergebnisse abrufen
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
Ruft die Ergebnisse und Ausgaben eines abgeschlossenen Ausführungsjobs ab, einschließlich erzeugter Artefakte, Berichte und Datendateien.
Query-Parameter
| Parameter | Typ | Beschreibung |
|---|---|---|
format |
string | Antwortformat: summary, detailed, download (Standard: summary) |
includeArtifacts |
boolean | Beinhaltet herunterladbare Artefakte in der Antwort (Standard: true) |
outputType |
string | Nach Ausgabetyp filtern: reports, data, models, visualizations |
Antwort
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 minutes",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "Customer Journey Analysis Report",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "Customer Journey Data Enhanced",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "Customer Journey Process Map",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "Journey Prediction Model",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 minutes",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "Some timestamps had to be inferred",
"count": 125
}
]
}
}
Fehlgeschlagenen Job erneut versuchen
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
Versucht eine fehlgeschlagene Jobausführung mit optionalen Parameteränderungen erneut. Der Job wird mit gleicher oder aktualisierter Konfiguration wieder in die Warteschlange gestellt.
Anfragetext
{
"retryReason": "Infrastructure issue resolved",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "High",
"immediateExecution": false
}
Antwort
Gibt 200 OK mit einem neuen Job-Objekt zurück, das die aktualisierte Job-ID und Retry-Informationen enthält.
Systemausführungsstatus abrufen
GET /api/{tenantId}/execution/system/status
Ruft den aktuellen systemweiten Ausführungsstatus ab, einschließlich Ressourcenauslastung, Warteschlangen-Gesundheit und Leistungsmetriken.
Antwort
{
"systemStatus": "Healthy",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "Active",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "Active",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 minutes",
"estimatedProcessingTime": "23 minutes"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 minutes",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
Beispiel: Komplett-Workflow zur Jobverwaltung
Dieses Beispiel zeigt das Einreichen eines Jobs, die Überwachung des Fortschritts und das Abrufen der Ergebnisse:
// 1. Neuen Job einreichen
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: 'Customer Behavior Analysis',
jobDescription: 'Wöchentliche Analyse von Kundeninteraktionsmustern',
jobType: 'ProcessMining',
priority: 'High',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. Job-Fortschritt überwachen
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`Aktuelle Phase: ${job.progress.currentStage}`);
console.log(`Geschätzte Fertigstellung: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // Alle 30 Sekunden prüfen
} else if (job.status === 'Completed') {
console.log('Job erfolgreich abgeschlossen!');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('Job fehlgeschlagen:', job.error);
}
};
await checkStatus();
};
// 3. Job-Ergebnisse abrufen
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Job-Ergebnisse:', results.summary);
console.log('Hauptergebnis:', results.results.primaryOutput.downloadUrl);
// Zusätzliche Ergebnisse herunterladen
for (const output of results.results.additionalOutputs) {
console.log(`Download ${output.type}: ${output.downloadUrl}`);
}
return results;
};
// 4. Systemstatus abrufen
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`Systemstatus: ${status.systemStatus}`);
console.log(`Warteschlange: ${status.queueStatistics.totalQueuedJobs} wartende Jobs`);
console.log(`Durchschnittliche Wartezeit: ${status.queueStatistics.averageWaitTime}`);
return status;
};
// Workflow ausführen
submitJob()
.then(job => {
console.log(`Job eingereicht: ${job.jobId}`);
console.log(`Position in der Warteschlange: ${job.queuePosition}`);
console.log(`Geschätzter Beginn: ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('Job-Workflow fehlgeschlagen:', error));
Python-Beispiel
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""Reicht einen neuen Ausführungsjob ein"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""Liefert aktuellen Jobstatus"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""Listet Jobs mit optionalem Filter"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""Wartet auf Jobabschluss mit periodischen Statusprüfungen"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"Job {job_id}: {job['status']} ({job['progress']['percentage']}%)")
print(f" Aktuelle Phase: {job['progress']['currentStage']}")
print(f" Verstrichene Zeit: {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"Job {job_id} hat innerhalb von {timeout} Sekunden nicht abgeschlossen")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""Liefert Ausführungsergebnisse eines Jobs"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="User cancellation", force=False):
"""Bricht einen laufenden oder wartenden Job ab"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
"""Versucht einen fehlgeschlagenen Job erneut"""
url = f"{self.base_url}/api/{self.tenantId}/{self.projectId}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""Liefert systemweiten Ausführungsstatus"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# Beispiel zur Verwendung
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Systemstatus abfragen
system_status = manager.get_system_status()
print(f"Systemstatus: {system_status['systemStatus']}")
print(f"Jobs in Warteschlange: {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"Durchschnittliche Wartezeit: {system_status['queueStatistics']['averageWaitTime']}")
# Einen umfassenden Process Mining-Job einreichen
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'Monthly Process Analytics',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'High'
)
print(f"Job eingereicht: {job['jobId']}")
print(f"Position in Warteschlange: {job['queuePosition']}")
print(f"Geschätzter Start: {job['estimatedStartTime']}")
# Auf Abschluss warten
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# Detaillierte Ergebnisse abrufen
results = manager.get_job_results(job['jobId'])
print("Job erfolgreich abgeschlossen!")
print(f"Verarbeitete Datensätze: {results['summary']['recordsProcessed']:,}")
print(f"Datenqualitätsbewertung: {results['summary']['dataQualityScore']}")
print(f"Verarbeitungseffizienz: {results['summary']['processingEfficiency']}%")
# Primären Bericht herunterladen
print(f"Bericht herunterladen: {results['results']['primaryOutput']['downloadUrl']}")
# Alle weiteren Ausgaben auflisten
for output in results['results']['additionalOutputs']:
print(f"Download {output['type']}: {output['downloadUrl']}")
else:
print(f"Job fehlgeschlagen mit Status: {final_job['status']}")
if 'error' in final_job:
print(f"Fehler: {final_job['error']}")
except Exception as e:
print(f"Fehler im Ausführungsworkflow: {e}")