Exécution

API d'Exécution des Tâches

Gérez et surveillez l'exécution des tâches de process mining, gérez les opérations asynchrones et suivez la progression des tâches en temps réel.

Fonctionnalités

File d'attente des tâches

Gérez la file d'attente des tâches et leurs priorités.

Voir la file d'attente

Suivi des tâches

Suivez le statut et la progression des tâches.

Suivre les tâches

Opérations asynchrones

Gérez les opérations asynchrones de longue durée.

Opérations asynchrones

Obtenir le statut d'une tâche

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Récupère le statut actuel et les détails de toute tâche d'exécution, incluant les informations de progression, les métriques d'exécution et le statut d'achèvement.

Paramètres

Paramètre Type Emplacement Description
tenantId GUID Chemin L'identifiant du locataire
projectId GUID Chemin L'identifiant du projet
jobId GUID Chemin L'identifiant de la tâche

Réponse

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 minutes 32 seconds"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Customer Analytics Pipeline"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 records/second"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

Lister toutes les tâches

GET /api/{tenantId}/{projectId}/execution/jobs

Récupère une liste paginée de toutes les tâches d'exécution du projet avec des options de filtrage par statut, type de tâche et plages de dates.

Paramètres de requête

Paramètre Type Description
status string Filtrer par statut : Queued, Running, Completed, Failed, Cancelled
jobType string Filtrer par type de tâche : ProcessMining, DataEnrichment, Notebook, Analysis
priority string Filtrer par priorité : Low, Normal, High, Critical
submittedBy string Filtrer par utilisateur ayant soumis la tâche
dateFrom datetime Filtrer les tâches à partir de cette date
dateTo datetime Filtrer les tâches jusqu'à cette date
page integer Numéro de page pour la pagination (par défaut : 1)
pageSize integer Nombre d'éléments par page (par défaut : 20, max : 100)

Réponse

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Customer Analytics Pipeline"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 minutes",
      "submittedBy": "system",
      "resourceName": "Sales Data Pipeline"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Soumettre une nouvelle tâche

POST /api/{tenantId}/{projectId}/execution/job

Soumet une nouvelle tâche d'exécution au système. La tâche sera mise en file d'attente et traitée selon la priorité et la disponibilité des ressources.

Corps de la requête

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Automated weekly analysis of process performance",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Réponse

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 minutes",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

Annuler une tâche

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Annule une tâche en file d'attente ou en cours d'exécution. Les tâches terminées ne peuvent pas être annulées. Les tâches en cours seront arrêtées proprement si possible.

Corps de la requête (optionnel)

{
  "reason": "User requested cancellation",
  "forceTermination": false,
  "preservePartialResults": true
}

Codes de réponse

  • 200 OK - Tâche annulée avec succès
  • 404 Not Found - Tâche non trouvée
  • 409 Conflict - Tâche déjà terminée ou ne peut pas être annulée

Obtenir les résultats d'une tâche

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Récupère les résultats et sorties d'une tâche d'exécution terminée, incluant les artefacts générés, rapports et fichiers de données.

Paramètres de requête

Paramètre Type Description
format string Format de réponse : summary, detailed, download (par défaut : summary)
includeArtifacts boolean Inclure les artefacts téléchargeables dans la réponse (par défaut : true)
outputType string Filtrer par type de sortie : reports, data, models, visualizations

Réponse

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 minutes",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 minutes",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Some timestamps had to be inferred",
        "count": 125
      }
    ]
  }
}

Retenter une tâche échouée

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Retente l'exécution d'une tâche échouée avec une modification optionnelle des paramètres. La tâche sera remise en file d'attente avec la même ou une configuration mise à jour.

Corps de la requête

{
  "retryReason": "Infrastructure issue resolved",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Réponse

Retourne 200 OK avec un nouvel objet tâche contenant un nouvel ID de tâche et les informations de tentative.

Obtenir le statut d'exécution du système

GET /api/{tenantId}/execution/system/status

Récupère le statut courant d'exécution à l'échelle du système incluant l'utilisation des ressources, la santé de la file d'attente et les métriques de performance.

Réponse

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 minutes",
    "estimatedProcessingTime": "23 minutes"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 minutes",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Exemple : Flux complet de gestion des tâches

Cet exemple illustre la soumission d'une tâche, le suivi de sa progression et la récupération des résultats :

// 1. Soumettre une nouvelle tâche
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Weekly analysis of customer interaction patterns',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. Surveiller la progression de la tâche
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Current stage: ${job.progress.currentStage}`);
    console.log(`ETA: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Vérifier toutes les 30 secondes
    } else if (job.status === 'Completed') {
      console.log('Tâche terminée avec succès !');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('Tâche échouée :', job.error);
    }
  };

  await checkStatus();
};

// 3. Obtenir les résultats de la tâche
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Résultats de la tâche :', results.summary);
  console.log('Sortie principale :', results.results.primaryOutput.downloadUrl);

  // Télécharger les sorties additionnelles
  for (const output of results.results.additionalOutputs) {
    console.log(`Télécharger ${output.type} : ${output.downloadUrl}`);
  }

  return results;
};

// 4. Obtenir le statut système
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`Statut du système : ${status.systemStatus}`);
  console.log(`File d'attente : ${status.queueStatistics.totalQueuedJobs} tâches en attente`);
  console.log(`Temps d'attente moyen : ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// Exécuter le flux de travail
submitJob()
  .then(job => {
    console.log(`Tâche soumise : ${job.jobId}`);
    console.log(`Position dans la file d'attente : ${job.queuePosition}`);
    console.log(`Début estimé : ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('Échec du flux de tâches :', error));

Exemple Python

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Soumettre une nouvelle tâche d'exécution"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Obtenir le statut actuel de la tâche"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """Lister les tâches avec filtrage optionnel"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """Attendre la complétion de la tâche avec vérifications périodiques de statut"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"Tâche {job_id} : {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Étape actuelle : {job['progress']['currentStage']}")
            print(f"  Temps écoulé : {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"La tâche {job_id} n'a pas terminé dans les {timeout} secondes")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """Obtenir les résultats de l'exécution de la tâche"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="User cancellation", force=False):
        """Annuler une tâche en cours ou en file d'attente"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
        """Retenter une tâche échouée"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Obtenir le statut d'exécution global du système"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Exemple d'utilisation
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Vérifier le statut du système
    system_status = manager.get_system_status()
    print(f"Statut du système : {system_status['systemStatus']}")
    print(f"Tâches en file d'attente : {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Temps d'attente moyen : {system_status['queueStatistics']['averageWaitTime']}")

    # Soumettre une tâche de process mining complète
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Tâche soumise : {job['jobId']}")
    print(f"Position dans la file d'attente : {job['queuePosition']}")
    print(f"Début estimé : {job['estimatedStartTime']}")

    # Attendre la complétion
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Obtenir les résultats détaillés
        results = manager.get_job_results(job['jobId'])

        print("Tâche terminée avec succès !")
        print(f"Enregistrements traités : {results['summary']['recordsProcessed']:,}")
        print(f"Score de qualité des données : {results['summary']['dataQualityScore']}")
        print(f"Efficacité du traitement : {results['summary']['processingEfficiency']}%")

        # Télécharger le rapport principal
        print(f"Télécharger le rapport : {results['results']['primaryOutput']['downloadUrl']}")

        # Lister toutes les sorties additionnelles
        for output in results['results']['additionalOutputs']:
            print(f"Télécharger {output['type']} : {output['downloadUrl']}")

    else:
        print(f"Tâche échouée avec le statut : {final_job['status']}")
        if 'error' in final_job:
            print(f"Erreur : {final_job['error']}")

except Exception as e:
    print(f"Erreur dans le flux d'exécution : {e}")