Uitvoering
Job Execution API
Beheer en monitor de uitvoering van process mining-taken, verwerk asynchrone operaties en volg de voortgang van taken in realtime.
Functies
Jobwachtrij
Beheer de wachtrij en prioriteiten van taken.
Jobtracking
Volg de status en voortgang van taken.
Asynchrone operaties
Verwerk langdurige asynchrone operaties.
Status van taak ophalen
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
Haalt de huidige status en details op van elke uitvoeringstaak, inclusief voortgangsinformatie, uitvoeringsstatistieken en voltooiingsstatus.
Parameters
| Parameter | Type | Locatie | Beschrijving |
|---|---|---|---|
tenantId |
GUID | Pad | De tenant-identificatie |
projectId |
GUID | Pad | De projectidentificatie |
jobId |
GUID | Pad | De taakidentificatie |
Response
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"jobDescription": "Uitgebreide analyse van klantcontactpunten en gedragingen",
"status": "Running",
"priority": "High",
"progress": {
"percentage": 65,
"currentStage": "Data Processing",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8 minuten 32 seconden"
},
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "Customer Analytics Pipeline"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 records/second"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
Alle taken weergeven
GET /api/{tenantId}/{projectId}/execution/jobs
Haalt een gepagineerde lijst op van alle uitvoeringstaken in het project met filteropties voor status, taaktype en datumbereiken.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
status |
string | Filter op status: Queued, Running, Completed, Failed, Cancelled |
jobType |
string | Filter op taaktype: ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | Filter op prioriteit: Low, Normal, High, Critical |
submittedBy |
string | Filter op gebruiker die de taak heeft ingediend |
dateFrom |
datetime | Filter taken vanaf deze datum |
dateTo |
datetime | Filter taken tot deze datum |
page |
integer | Paginanummer voor paginering (standaard: 1) |
pageSize |
integer | Aantal items per pagina (standaard: 20, max: 100) |
Response
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"status": "Running",
"priority": "High",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "Customer Analytics Pipeline"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "Daily Sales Enrichment",
"status": "Completed",
"priority": "Normal",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 minuten",
"submittedBy": "system",
"resourceName": "Sales Data Pipeline"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Nieuwe taak indienen
POST /api/{tenantId}/{projectId}/execution/job
Dient een nieuwe uitvoeringstaak in bij het systeem. De taak wordt in de wachtrij geplaatst en verwerkt op basis van prioriteit en beschikbare resources.
Request Body
{
"jobName": "Weekly Process Analysis",
"jobDescription": "Geautomatiseerde wekelijkse analyse van procesprestaties",
"jobType": "ProcessMining",
"priority": "Normal",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
Response
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 minuten",
"jobName": "Weekly Process Analysis",
"priority": "Normal",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
Taak annuleren
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
Annuleert een taak die in de wachtrij staat of momenteel wordt uitgevoerd. Voltooide taken kunnen niet worden geannuleerd. Lopende taken worden indien mogelijk op een nette manier gestopt.
Request Body (optioneel)
{
"reason": "Annulering aangevraagd door gebruiker",
"forceTermination": false,
"preservePartialResults": true
}
Response Codes
200 OK- Taak succesvol geannuleerd404 Not Found- Taak niet gevonden409 Conflict- Taak is al voltooid of kan niet worden geannuleerd
Resultaten van taak ophalen
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
Haalt de resultaten en uitvoer op van een voltooide taakuitvoering, inclusief gegenereerde artefacten, rapporten en databestanden.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
format |
string | Responsformaat: summary, detailed, download (standaard: summary) |
includeArtifacts |
boolean | Neem downloadbare artefacten mee in de response (standaard: true) |
outputType |
string | Filter op uitvoertype: reports, data, models, visualizations |
Response
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 minuten",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "Customer Journey Analysis Report",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "Customer Journey Data Enhanced",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "Customer Journey Process Map",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "Journey Prediction Model",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 minuten",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "Sommige tijdstempels moesten worden afgeleid",
"count": 125
}
]
}
}
Mislukte taak opnieuw proberen
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
Probeert een mislukte taakuitvoering opnieuw met optionele parameterwijzigingen. De taak wordt opnieuw in de wachtrij gezet met dezelfde of een bijgewerkte configuratie.
Request Body
{
"retryReason": "Infrastructuurprobleem opgelost",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "High",
"immediateExecution": false
}
Response
Geeft 200 OK terug met een nieuw taakobject dat een bijgewerkte taak-ID en informatie over de herhaling bevat.
Systeemuitvoeringsstatus ophalen
GET /api/{tenantId}/execution/system/status
Haalt de huidige systeemwijde uitvoeringsstatus op, inclusief resourcegebruik, wachtrijstatus en prestatiestatistieken.
Response
{
"systemStatus": "Healthy",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "Active",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "Active",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 minuten",
"estimatedProcessingTime": "23 minuten"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 minuten",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
Voorbeeld: Volledig taakbeheerproces
Dit voorbeeld demonstreert het indienen van een taak, het volgen van de voortgang en het ophalen van resultaten:
// 1. Nieuwe taak indienen
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: 'Customer Behavior Analysis',
jobDescription: 'Wekelijkse analyse van klantinteractiepatronen',
jobType: 'ProcessMining',
priority: 'High',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. Voortgang taak monitoren
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`Taak ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`Huidige fase: ${job.progress.currentStage}`);
console.log(`Geschatte voltooiing: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // Controleer elke 30 seconden
} else if (job.status === 'Completed') {
console.log('Taak succesvol voltooid!');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('Taak mislukt:', job.error);
}
};
await checkStatus();
};
// 3. Taakresultaten ophalen
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Taakresultaten:', results.summary);
console.log('Primaire uitvoer:', results.results.primaryOutput.downloadUrl);
// Download aanvullende uitvoer
for (const output of results.results.additionalOutputs) {
console.log(`Download ${output.type}: ${output.downloadUrl}`);
}
return results;
};
// 4. Systeemstatus ophalen
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`Systeemstatus: ${status.systemStatus}`);
console.log(`Wachtrij: ${status.queueStatistics.totalQueuedJobs} taken in wachtrij`);
console.log(`Gemiddelde wachttijd: ${status.queueStatistics.averageWaitTime}`);
return status;
};
// Workflow uitvoeren
submitJob()
.then(job => {
console.log(`Ingediende taak: ${job.jobId}`);
console.log(`Wachtrijpositie: ${job.queuePosition}`);
console.log(`Geschatte start: ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('Taakworkflow mislukt:', error));
Python Voorbeeld
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""Dient een nieuwe uitvoeringstaak in"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""Haalt de huidige status van een taak op"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""Geeft een lijst van taken met optionele filters"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""Wacht tot taak voltooid is met periodieke statuscontroles"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"Taak {job_id}: {job['status']} ({job['progress']['percentage']}%)")
print(f" Huidige fase: {job['progress']['currentStage']}")
print(f" Verstreken tijd: {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"Taak {job_id} werd niet voltooid binnen {timeout} seconden")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""Haalt de resultaten van een taakuitvoering op"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="User cancellation", force=False):
"""Annuleert een lopende of in de wachtrij geplaatste taak"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
"""Probeert een mislukte taak opnieuw"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""Haalt de systeemwijde uitvoeringsstatus op"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# Gebruik voorbeeld
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Controleer systeemstatus
system_status = manager.get_system_status()
print(f"Systeemstatus: {system_status['systemStatus']}")
print(f"Taken in wachtrij: {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"Gemiddelde wachttijd: {system_status['queueStatistics']['averageWaitTime']}")
# Dien een uitgebreide process mining-taak in
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'Monthly Process Analytics',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'High'
)
print(f"Ingediende taak: {job['jobId']}")
print(f"Wachtrijpositie: {job['queuePosition']}")
print(f"Geschatte start: {job['estimatedStartTime']}")
# Wacht op voltooiing
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# Haal gedetailleerde resultaten op
results = manager.get_job_results(job['jobId'])
print("Taak succesvol voltooid!")
print(f"Verwerkte records: {results['summary']['recordsProcessed']:,}")
print(f"Datakwaliteitsscore: {results['summary']['dataQualityScore']}")
print(f"Verwerkingsefficiëntie: {results['summary']['processingEfficiency']}%")
# Download primair rapport
print(f"Download rapport: {results['results']['primaryOutput']['downloadUrl']}")
# Toon alle aanvullende uitvoer
for output in results['results']['additionalOutputs']:
print(f"Download {output['type']}: {output['downloadUrl']}")
else:
print(f"Taak mislukt met status: {final_job['status']}")
if 'error' in final_job:
print(f"Fout: {final_job['error']}")
except Exception as e:
print(f"Fout in uitvoeringsworkflow: {e}")