Execution

Job Execution API

Manage and monitor the execution of process mining jobs, handle asynchronous operations, and track job progress in real-time.

Features

Job Queue

Manage job queue and priorities.

View Queue

Job Tracking

Track job status and progress.

Track Jobs

Async Operations

Handle long-running asynchronous operations.

Async Operations

Get Job Status

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Retrieves the current status and details of any execution job, including progress information, execution metrics, and completion status.

Parameters

Parameter Type Location Description
tenantId GUID Path The tenant identifier
projectId GUID Path The project identifier
jobId GUID Path The job identifier

Response

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 minutes 32 seconds"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Customer Analytics Pipeline"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 records/second"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

List All Jobs

GET /api/{tenantId}/{projectId}/execution/jobs

Retrieves a paginated list of all execution jobs in the project with filtering options for status, job type, and date ranges.

Query Parameters

Parameter Type Description
status string Filter by status: Queued, Running, Completed, Failed, Cancelled
jobType string Filter by job type: ProcessMining, DataEnrichment, Notebook, Analysis
priority string Filter by priority: Low, Normal, High, Critical
submittedBy string Filter by user who submitted the job
dateFrom datetime Filter jobs from this date
dateTo datetime Filter jobs to this date
page integer Page number for pagination (default: 1)
pageSize integer Number of items per page (default: 20, max: 100)

Response

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Customer Analytics Pipeline"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 minutes",
      "submittedBy": "system",
      "resourceName": "Sales Data Pipeline"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Submit New Job

POST /api/{tenantId}/{projectId}/execution/job

Submits a new execution job to the system. The job will be queued and processed based on priority and resource availability.

Request Body

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Automated weekly analysis of process performance",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Response

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 minutes",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

Cancel Job

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Cancels a queued or running job. Completed jobs cannot be cancelled. Running jobs will be stopped gracefully when possible.

Request Body (Optional)

{
  "reason": "User requested cancellation",
  "forceTermination": false,
  "preservePartialResults": true
}

Response Codes

  • 200 OK - Job cancelled successfully
  • 404 Not Found - Job not found
  • 409 Conflict - Job already completed or cannot be cancelled

Get Job Results

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Retrieves the results and outputs from a completed job execution, including generated artifacts, reports, and data files.

Query Parameters

Parameter Type Description
format string Response format: summary, detailed, download (default: summary)
includeArtifacts boolean Include downloadable artifacts in response (default: true)
outputType string Filter by output type: reports, data, models, visualizations

Response

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 minutes",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 minutes",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Some timestamps had to be inferred",
        "count": 125
      }
    ]
  }
}

Retry Failed Job

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Retries a failed job execution with optional parameter modifications. The job will be re-queued with the same or updated configuration.

Request Body

{
  "retryReason": "Infrastructure issue resolved",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Response

Returns 200 OK with a new job object containing updated job ID and retry information.

Get System Execution Status

GET /api/{tenantId}/execution/system/status

Retrieves the current system-wide execution status including resource utilization, queue health, and performance metrics.

Response

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 minutes",
    "estimatedProcessingTime": "23 minutes"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 minutes",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Example: Complete Job Management Workflow

This example demonstrates submitting a job, monitoring its progress, and retrieving results:

// 1. Submit a new job
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Weekly analysis of customer interaction patterns',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. Monitor job progress
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Current stage: ${job.progress.currentStage}`);
    console.log(`ETA: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Check every 30 seconds
    } else if (job.status === 'Completed') {
      console.log('Job completed successfully!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('Job failed:', job.error);
    }
  };

  await checkStatus();
};

// 3. Get job results
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Job Results:', results.summary);
  console.log('Primary Output:', results.results.primaryOutput.downloadUrl);

  // Download additional outputs
  for (const output of results.results.additionalOutputs) {
    console.log(`Download ${output.type}: ${output.downloadUrl}`);
  }

  return results;
};

// 4. Get system status
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`System Status: ${status.systemStatus}`);
  console.log(`Queue: ${status.queueStatistics.totalQueuedJobs} jobs waiting`);
  console.log(`Average wait time: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// Execute the workflow
submitJob()
  .then(job => {
    console.log(`Submitted job: ${job.jobId}`);
    console.log(`Queue position: ${job.queuePosition}`);
    console.log(`Estimated start: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('Job workflow failed:', error));

Python Example

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Submit a new execution job"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Get current job status"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """List jobs with optional filtering"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """Wait for job to complete with periodic status checks"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"Job {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Current stage: {job['progress']['currentStage']}")
            print(f"  Elapsed time: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """Get job execution results"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="User cancellation", force=False):
        """Cancel a running or queued job"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
        """Retry a failed job"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Get system-wide execution status"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Usage example
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Check system status
    system_status = manager.get_system_status()
    print(f"System Status: {system_status['systemStatus']}")
    print(f"Jobs in queue: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Average wait time: {system_status['queueStatistics']['averageWaitTime']}")

    # Submit a comprehensive process mining job
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Submitted job: {job['jobId']}")
    print(f"Queue position: {job['queuePosition']}")
    print(f"Estimated start: {job['estimatedStartTime']}")

    # Wait for completion
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Get detailed results
        results = manager.get_job_results(job['jobId'])

        print("Job completed successfully!")
        print(f"Records processed: {results['summary']['recordsProcessed']:,}")
        print(f"Data quality score: {results['summary']['dataQualityScore']}")
        print(f"Processing efficiency: {results['summary']['processingEfficiency']}%")

        # Download primary report
        print(f"Download report: {results['results']['primaryOutput']['downloadUrl']}")

        # List all additional outputs
        for output in results['results']['additionalOutputs']:
            print(f"Download {output['type']}: {output['downloadUrl']}")

    else:
        print(f"Job failed with status: {final_job['status']}")
        if 'error' in final_job:
            print(f"Error: {final_job['error']}")

except Exception as e:
    print(f"Error in execution workflow: {e}")