Exécution du Pipeline

Exécuter des Pipelines d'Enrichissement

Exécutez des pipelines d'enrichissement sur des jeux de données, surveillez la progression et récupérez les résultats améliorés.

Exécuter un Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Déclenche l'exécution d'un pipeline d'enrichissement sur un jeu de données spécifié. L'exécution s'effectue de manière asynchrone et retourne un ID d'exécution pour suivre la progression.

Paramètres

Paramètre Type Emplacement Description
tenantId GUID Chemin L'identifiant du locataire
projectId GUID Chemin L'identifiant du projet
pipelineId GUID Chemin L'identifiant du pipeline

Corps de la demande

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Analyse Mensuelle du Processus",
  "executionDescription": "Enrichissement pour la revue de performance mensuelle",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Réponse

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 minutes",
  "executionName": "Analyse Mensuelle du Processus",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Validation des Données",
      "status": "Pending",
      "estimatedDuration": "2-3 minutes"
    },
    {
      "stageId": "stage-002",
      "stageName": "Enrichissement temporel",
      "status": "Pending",
      "estimatedDuration": "8-10 minutes"
    }
  ]
}

Obtenir le Statut de l'Exécution

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Récupère le statut actuel et les informations de progression pour une exécution de pipeline, y compris les détails d'avancement étape par étape.

Réponse

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Enrichissement temporel",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Analyse Mensuelle du Processus",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Validation des Données",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 minutes",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Enrichissement temporel",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Obtenir les Résultats de l'Exécution

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Récupère les résultats finaux d'une exécution de pipeline terminée, incluant les données enrichies, les statistiques sommaires et les sorties téléchargeables.

Paramètres de requête

Paramètre Type Description
format string Format de la réponse : summary, full, download (par défaut : summary)
includeRawData boolean Inclure le jeu de données d'origine dans la réponse (par défaut : false)
limit integer Limite du nombre d'enregistrements retournés (max : 10000)

Réponse

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 minutes",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Jour de la semaine pour chaque événement"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Indique si l'événement a eu lieu pendant les heures ouvrables"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 hours",
      "description": "Temps entre le début et la fin du cas"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Timestamp manquant",
        "count": 2,
        "severity": "Élevé"
      },
      {
        "type": "Durée invalide",
        "count": 15,
        "severity": "Moyen"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

Lister les Exécutions de Pipelines

GET /api/{tenantId}/{projectId}/enrichment/executions

Récupère une liste de toutes les exécutions de pipeline avec options de filtrage et de pagination. Utile pour surveiller l'historique des exécutions et leur performance.

Paramètres de requête

Paramètre Type Description
pipelineId GUID Filtrer par pipeline spécifique
status string Filtrer par statut : Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Filtrer les exécutions à partir de cette date
dateTo datetime Filtrer les exécutions jusqu'à cette date
page integer Numéro de page pour la pagination (par défaut : 1)
pageSize integer Nombre d'items par page (par défaut : 20, max : 100)

Réponse

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Enrichissement des Données de Process Mining",
      "executionName": "Analyse Mensuelle du Processus",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 minutes",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Annuler une Exécution

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Annule une exécution de pipeline en cours ou en file d'attente. Les étapes déjà complétées seront conservées, mais l'exécution s'arrêtera à l'étape courante.

Corps de la demande (optionnel)

{
  "reason": "Annulation demandée par l'utilisateur",
  "preservePartialResults": true
}

Codes de réponse

  • 200 OK - Exécution annulée avec succès
  • 404 Not Found - Exécution non trouvée
  • 409 Conflict - Exécution déjà terminée ou ne peut être annulée

Redémarrer une Exécution Échouée

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Redémarre une exécution de pipeline échouée à partir du point de défaillance. Les étapes terminées précédemment seront sautées sauf si la re-exécution est explicitement demandée.

Corps de la demande

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Réponse

Retourne 200 OK avec un nouvel objet d'exécution contenant un nouvel ID d'exécution et le statut mis à jour.

Exemple : Flux Complet d'Exécution

Cet exemple montre comment exécuter un pipeline et surveiller sa progression :

// 1. Exécuter le pipeline
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Analyse du Parcours Client',
      executionDescription: 'Enrichissement des données clients avec des métriques de parcours',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Surveiller la progression de l'exécution
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Statut : ${execution.status}, Progression : ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // Vérifier à nouveau dans 30 secondes
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Exécution terminée avec succès !');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Exécution échouée :', execution.error);
    }
  };

  await checkStatus();
};

// 3. Obtenir les résultats une fois terminée
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Résumé de l\'enrichissement :', results.summary);
  console.log('URL de téléchargement :', results.downloadUrls);

  return results;
};

// Exécuter le workflow
executeEnrichment()
  .then(execution => {
    console.log(`Exécution démarrée : ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Échec de l\'exécution :', error));

Exemple Python

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Exécuter un pipeline d'enrichissement"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Obtenir le statut actuel de l'exécution"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Attendre la fin de l'exécution avec des vérifications périodiques"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Exécution {execution_id} : {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"L'exécution {execution_id} n'a pas été terminée dans les {timeout} secondes")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Obtenir les résultats de l'exécution"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="Annulation par l'utilisateur"):
        """Annuler une exécution en cours"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """Lister les exécutions de pipeline avec filtrage"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Exemple d'utilisation
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Exécuter un pipeline avec paramètres personnalisés
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Démarrer l'exécution
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Analyse Mensuelle du Processus',
        execution_params,
        'High'
    )

    print(f"Exécution démarrée : {execution['executionId']}")
    print(f"Durée estimée : {execution['estimatedDuration']}")

    # Attendre la fin
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Obtenir les résultats
        results = manager.get_execution_results(execution['executionId'])
        print(f"Enrichissement terminé avec succès !")
        print(f"Enregistrements originaux : {results['summary']['originalRecords']}")
        print(f"Enregistrements enrichis : {results['summary']['enrichedRecords']}")
        print(f"Score de qualité des données : {results['summary']['dataQualityScore']}")
        print(f"Télécharger les données enrichies : {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Exécution échouée avec le statut : {final_status['status']}")

except Exception as e:
    print(f"Erreur lors de l'exécution du pipeline : {e}")