Execution
Job Execution API
Manage and monitor the execution of process mining jobs, handle asynchronous operations, and track job progress in real-time.
Features
Job Queue
Manage job queue and priorities.
Job Tracking
Track job status and progress.
Async Operations
Handle long-running asynchronous operations.
Get Job Status
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
Retrieves the current status and details of any execution job, including progress information, execution metrics, and completion status.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
tenantId |
GUID | Path | The tenant identifier |
projectId |
GUID | Path | The project identifier |
jobId |
GUID | Path | The job identifier |
Response
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
"status": "Running",
"priority": "High",
"progress": {
"percentage": 65,
"currentStage": "Data Processing",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8 minutes 32 seconds"
},
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "Customer Analytics Pipeline"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 records/second"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
List All Jobs
GET /api/{tenantId}/{projectId}/execution/jobs
Retrieves a paginated list of all execution jobs in the project with filtering options for status, job type, and date ranges.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
status |
string | Filter by status: Queued, Running, Completed, Failed, Cancelled |
jobType |
string | Filter by job type: ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | Filter by priority: Low, Normal, High, Critical |
submittedBy |
string | Filter by user who submitted the job |
dateFrom |
datetime | Filter jobs from this date |
dateTo |
datetime | Filter jobs to this date |
page |
integer | Page number for pagination (default: 1) |
pageSize |
integer | Number of items per page (default: 20, max: 100) |
Response
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"status": "Running",
"priority": "High",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "Customer Analytics Pipeline"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "Daily Sales Enrichment",
"status": "Completed",
"priority": "Normal",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 minutes",
"submittedBy": "system",
"resourceName": "Sales Data Pipeline"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Submit New Job
POST /api/{tenantId}/{projectId}/execution/job
Submits a new execution job to the system. The job will be queued and processed based on priority and resource availability.
Request Body
{
"jobName": "Weekly Process Analysis",
"jobDescription": "Automated weekly analysis of process performance",
"jobType": "ProcessMining",
"priority": "Normal",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
Response
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 minutes",
"jobName": "Weekly Process Analysis",
"priority": "Normal",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
Cancel Job
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
Cancels a queued or running job. Completed jobs cannot be cancelled. Running jobs will be stopped gracefully when possible.
Request Body (Optional)
{
"reason": "User requested cancellation",
"forceTermination": false,
"preservePartialResults": true
}
Response Codes
200 OK- Job cancelled successfully404 Not Found- Job not found409 Conflict- Job already completed or cannot be cancelled
Get Job Results
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
Retrieves the results and outputs from a completed job execution, including generated artifacts, reports, and data files.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
format |
string | Response format: summary, detailed, download (default: summary) |
includeArtifacts |
boolean | Include downloadable artifacts in response (default: true) |
outputType |
string | Filter by output type: reports, data, models, visualizations |
Response
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 minutes",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "Customer Journey Analysis Report",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "Customer Journey Data Enhanced",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "Customer Journey Process Map",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "Journey Prediction Model",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 minutes",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "Some timestamps had to be inferred",
"count": 125
}
]
}
}
Retry Failed Job
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
Retries a failed job execution with optional parameter modifications. The job will be re-queued with the same or updated configuration.
Request Body
{
"retryReason": "Infrastructure issue resolved",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "High",
"immediateExecution": false
}
Response
Returns 200 OK with a new job object containing updated job ID and retry information.
Get System Execution Status
GET /api/{tenantId}/execution/system/status
Retrieves the current system-wide execution status including resource utilization, queue health, and performance metrics.
Response
{
"systemStatus": "Healthy",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "Active",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "Active",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 minutes",
"estimatedProcessingTime": "23 minutes"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 minutes",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
Example: Complete Job Management Workflow
This example demonstrates submitting a job, monitoring its progress, and retrieving results:
// 1. Submit a new job
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: 'Customer Behavior Analysis',
jobDescription: 'Weekly analysis of customer interaction patterns',
jobType: 'ProcessMining',
priority: 'High',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. Monitor job progress
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`Current stage: ${job.progress.currentStage}`);
console.log(`ETA: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // Check every 30 seconds
} else if (job.status === 'Completed') {
console.log('Job completed successfully!');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('Job failed:', job.error);
}
};
await checkStatus();
};
// 3. Get job results
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Job Results:', results.summary);
console.log('Primary Output:', results.results.primaryOutput.downloadUrl);
// Download additional outputs
for (const output of results.results.additionalOutputs) {
console.log(`Download ${output.type}: ${output.downloadUrl}`);
}
return results;
};
// 4. Get system status
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`System Status: ${status.systemStatus}`);
console.log(`Queue: ${status.queueStatistics.totalQueuedJobs} jobs waiting`);
console.log(`Average wait time: ${status.queueStatistics.averageWaitTime}`);
return status;
};
// Execute the workflow
submitJob()
.then(job => {
console.log(`Submitted job: ${job.jobId}`);
console.log(`Queue position: ${job.queuePosition}`);
console.log(`Estimated start: ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('Job workflow failed:', error));
Python Example
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""Submit a new execution job"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""Get current job status"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""List jobs with optional filtering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""Wait for job to complete with periodic status checks"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"Job {job_id}: {job['status']} ({job['progress']['percentage']}%)")
print(f" Current stage: {job['progress']['currentStage']}")
print(f" Elapsed time: {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""Get job execution results"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="User cancellation", force=False):
"""Cancel a running or queued job"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
"""Retry a failed job"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""Get system-wide execution status"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# Usage example
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Check system status
system_status = manager.get_system_status()
print(f"System Status: {system_status['systemStatus']}")
print(f"Jobs in queue: {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"Average wait time: {system_status['queueStatistics']['averageWaitTime']}")
# Submit a comprehensive process mining job
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'Monthly Process Analytics',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'High'
)
print(f"Submitted job: {job['jobId']}")
print(f"Queue position: {job['queuePosition']}")
print(f"Estimated start: {job['estimatedStartTime']}")
# Wait for completion
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# Get detailed results
results = manager.get_job_results(job['jobId'])
print("Job completed successfully!")
print(f"Records processed: {results['summary']['recordsProcessed']:,}")
print(f"Data quality score: {results['summary']['dataQualityScore']}")
print(f"Processing efficiency: {results['summary']['processingEfficiency']}%")
# Download primary report
print(f"Download report: {results['results']['primaryOutput']['downloadUrl']}")
# List all additional outputs
for output in results['results']['additionalOutputs']:
print(f"Download {output['type']}: {output['downloadUrl']}")
else:
print(f"Job failed with status: {final_job['status']}")
if 'error' in final_job:
print(f"Error: {final_job['error']}")
except Exception as e:
print(f"Error in execution workflow: {e}")