Uitvoering

Job Execution API

Beheer en monitor de uitvoering van process mining-taken, verwerk asynchrone operaties en volg de voortgang van taken in realtime.

Functies

Jobwachtrij

Beheer de wachtrij en prioriteiten van taken.

Wachtrij bekijken

Jobtracking

Volg de status en voortgang van taken.

Taken volgen

Asynchrone operaties

Verwerk langdurige asynchrone operaties.

Async Operaties

Status van taak ophalen

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Haalt de huidige status en details op van elke uitvoeringstaak, inclusief voortgangsinformatie, uitvoeringsstatistieken en voltooiingsstatus.

Parameters

Parameter Type Locatie Beschrijving
tenantId GUID Pad De tenant-identificatie
projectId GUID Pad De projectidentificatie
jobId GUID Pad De taakidentificatie

Response

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Uitgebreide analyse van klantcontactpunten en gedragingen",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 minuten 32 seconden"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Customer Analytics Pipeline"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 records/second"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

Alle taken weergeven

GET /api/{tenantId}/{projectId}/execution/jobs

Haalt een gepagineerde lijst op van alle uitvoeringstaken in het project met filteropties voor status, taaktype en datumbereiken.

Query Parameters

Parameter Type Beschrijving
status string Filter op status: Queued, Running, Completed, Failed, Cancelled
jobType string Filter op taaktype: ProcessMining, DataEnrichment, Notebook, Analysis
priority string Filter op prioriteit: Low, Normal, High, Critical
submittedBy string Filter op gebruiker die de taak heeft ingediend
dateFrom datetime Filter taken vanaf deze datum
dateTo datetime Filter taken tot deze datum
page integer Paginanummer voor paginering (standaard: 1)
pageSize integer Aantal items per pagina (standaard: 20, max: 100)

Response

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Customer Analytics Pipeline"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 minuten",
      "submittedBy": "system",
      "resourceName": "Sales Data Pipeline"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Nieuwe taak indienen

POST /api/{tenantId}/{projectId}/execution/job

Dient een nieuwe uitvoeringstaak in bij het systeem. De taak wordt in de wachtrij geplaatst en verwerkt op basis van prioriteit en beschikbare resources.

Request Body

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Geautomatiseerde wekelijkse analyse van procesprestaties",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Response

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 minuten",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

Taak annuleren

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Annuleert een taak die in de wachtrij staat of momenteel wordt uitgevoerd. Voltooide taken kunnen niet worden geannuleerd. Lopende taken worden indien mogelijk op een nette manier gestopt.

Request Body (optioneel)

{
  "reason": "Annulering aangevraagd door gebruiker",
  "forceTermination": false,
  "preservePartialResults": true
}

Response Codes

  • 200 OK - Taak succesvol geannuleerd
  • 404 Not Found - Taak niet gevonden
  • 409 Conflict - Taak is al voltooid of kan niet worden geannuleerd

Resultaten van taak ophalen

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Haalt de resultaten en uitvoer op van een voltooide taakuitvoering, inclusief gegenereerde artefacten, rapporten en databestanden.

Query Parameters

Parameter Type Beschrijving
format string Responsformaat: summary, detailed, download (standaard: summary)
includeArtifacts boolean Neem downloadbare artefacten mee in de response (standaard: true)
outputType string Filter op uitvoertype: reports, data, models, visualizations

Response

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 minuten",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 minuten",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Sommige tijdstempels moesten worden afgeleid",
        "count": 125
      }
    ]
  }
}

Mislukte taak opnieuw proberen

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Probeert een mislukte taakuitvoering opnieuw met optionele parameterwijzigingen. De taak wordt opnieuw in de wachtrij gezet met dezelfde of een bijgewerkte configuratie.

Request Body

{
  "retryReason": "Infrastructuurprobleem opgelost",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Response

Geeft 200 OK terug met een nieuw taakobject dat een bijgewerkte taak-ID en informatie over de herhaling bevat.

Systeemuitvoeringsstatus ophalen

GET /api/{tenantId}/execution/system/status

Haalt de huidige systeemwijde uitvoeringsstatus op, inclusief resourcegebruik, wachtrijstatus en prestatiestatistieken.

Response

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 minuten",
    "estimatedProcessingTime": "23 minuten"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 minuten",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Voorbeeld: Volledig taakbeheerproces

Dit voorbeeld demonstreert het indienen van een taak, het volgen van de voortgang en het ophalen van resultaten:

// 1. Nieuwe taak indienen
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Wekelijkse analyse van klantinteractiepatronen',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. Voortgang taak monitoren
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`Taak ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Huidige fase: ${job.progress.currentStage}`);
    console.log(`Geschatte voltooiing: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Controleer elke 30 seconden
    } else if (job.status === 'Completed') {
      console.log('Taak succesvol voltooid!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('Taak mislukt:', job.error);
    }
  };

  await checkStatus();
};

// 3. Taakresultaten ophalen
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Taakresultaten:', results.summary);
  console.log('Primaire uitvoer:', results.results.primaryOutput.downloadUrl);

  // Download aanvullende uitvoer
  for (const output of results.results.additionalOutputs) {
    console.log(`Download ${output.type}: ${output.downloadUrl}`);
  }

  return results;
};

// 4. Systeemstatus ophalen
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`Systeemstatus: ${status.systemStatus}`);
  console.log(`Wachtrij: ${status.queueStatistics.totalQueuedJobs} taken in wachtrij`);
  console.log(`Gemiddelde wachttijd: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// Workflow uitvoeren
submitJob()
  .then(job => {
    console.log(`Ingediende taak: ${job.jobId}`);
    console.log(`Wachtrijpositie: ${job.queuePosition}`);
    console.log(`Geschatte start: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('Taakworkflow mislukt:', error));

Python Voorbeeld

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Dient een nieuwe uitvoeringstaak in"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Haalt de huidige status van een taak op"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """Geeft een lijst van taken met optionele filters"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """Wacht tot taak voltooid is met periodieke statuscontroles"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"Taak {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Huidige fase: {job['progress']['currentStage']}")
            print(f"  Verstreken tijd: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"Taak {job_id} werd niet voltooid binnen {timeout} seconden")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """Haalt de resultaten van een taakuitvoering op"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="User cancellation", force=False):
        """Annuleert een lopende of in de wachtrij geplaatste taak"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
        """Probeert een mislukte taak opnieuw"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Haalt de systeemwijde uitvoeringsstatus op"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Gebruik voorbeeld
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Controleer systeemstatus
    system_status = manager.get_system_status()
    print(f"Systeemstatus: {system_status['systemStatus']}")
    print(f"Taken in wachtrij: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Gemiddelde wachttijd: {system_status['queueStatistics']['averageWaitTime']}")

    # Dien een uitgebreide process mining-taak in
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Ingediende taak: {job['jobId']}")
    print(f"Wachtrijpositie: {job['queuePosition']}")
    print(f"Geschatte start: {job['estimatedStartTime']}")

    # Wacht op voltooiing
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Haal gedetailleerde resultaten op
        results = manager.get_job_results(job['jobId'])

        print("Taak succesvol voltooid!")
        print(f"Verwerkte records: {results['summary']['recordsProcessed']:,}")
        print(f"Datakwaliteitsscore: {results['summary']['dataQualityScore']}")
        print(f"Verwerkingsefficiëntie: {results['summary']['processingEfficiency']}%")

        # Download primair rapport
        print(f"Download rapport: {results['results']['primaryOutput']['downloadUrl']}")

        # Toon alle aanvullende uitvoer
        for output in results['results']['additionalOutputs']:
            print(f"Download {output['type']}: {output['downloadUrl']}")

    else:
        print(f"Taak mislukt met status: {final_job['status']}")
        if 'error' in final_job:
            print(f"Fout: {final_job['error']}")

except Exception as e:
    print(f"Fout in uitvoeringsworkflow: {e}")