Pipelines d'enrichissement

Créer des workflows d'enrichissement de données

Créez et gérez des pipelines d'enrichissement pour transformer et améliorer vos ensembles de données de process mining.

Obtenir les détails d'un pipeline

GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Récupère des informations complètes sur un pipeline d'enrichissement spécifique, y compris ses étapes, sa configuration et les métadonnées d'exécution.

Paramètres

Paramètre Type Emplacement Description
tenantId GUID Chemin L'identifiant du tenant
projectId GUID Chemin L'identifiant du projet
pipelineId GUID Chemin L'identifiant du pipeline

Réponse

{
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "pipelineName": "Process Mining Data Enrichment",
  "pipelineDescription": "Enriches event logs with additional attributes and calculations",
  "status": "Active",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "stageType": "Validation",
      "order": 1,
      "configuration": {
        "validateCaseId": true,
        "validateTimestamps": true,
        "requireActivityNames": true
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "stageType": "TimeCalculation",
      "order": 2,
      "configuration": {
        "addDayOfWeek": true,
        "addBusinessHours": true,
        "timezoneId": "UTC"
      }
    }
  ],
  "triggers": {
    "automatic": true,
    "schedule": "0 2 * * *",
    "onDataUpdate": true
  },
  "dateCreated": "2024-01-15T10:30:00Z",
  "dateModified": "2024-01-20T14:45:00Z",
  "createdBy": "user123",
  "lastExecutionDate": "2024-01-20T02:00:00Z",
  "lastExecutionStatus": "Success",
  "executionCount": 45
}

Lister tous les pipelines

GET /api/{tenantId}/{projectId}/enrichment/pipelines

Récupère la liste de tous les pipelines d'enrichissement du projet avec les métadonnées de base et les informations de statut.

Paramètres de requête

Paramètre Type Description
status chaîne de caractères Filtrer par statut du pipeline : Active, Inactive, Failed
page entier Numéro de page pour la pagination (par défaut : 1)
pageSize entier Nombre d'éléments par page (par défaut : 20, max : 100)

Réponse

{
  "pipelines": [
    {
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "status": "Active",
      "stageCount": 5,
      "lastExecutionDate": "2024-01-20T02:00:00Z",
      "lastExecutionStatus": "Success",
      "dateCreated": "2024-01-15T10:30:00Z"
    }
  ],
  "totalCount": 12,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": false
}

Créer un nouveau pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline

Crée un nouveau pipeline d'enrichissement avec les étapes et la configuration spécifiées. Le pipeline peut être configuré pour s'exécuter automatiquement ou manuellement.

Corps de la requête

{
  "pipelineName": "Customer Journey Enrichment",
  "pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
  "stages": [
    {
      "stageName": "Customer Data Lookup",
      "stageType": "DataLookup",
      "order": 1,
      "configuration": {
        "lookupTable": "customer_demographics",
        "joinKey": "customerId",
        "selectFields": ["age", "segment", "region"]
      }
    },
    {
      "stageName": "Journey Metrics",
      "stageType": "Calculation",
      "order": 2,
      "configuration": {
        "calculations": [
          {
            "fieldName": "journeyDuration",
            "formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
            "groupBy": "caseId"
          },
          {
            "fieldName": "touchpointCount",
            "formula": "COUNT(*)",
            "groupBy": "caseId"
          }
        ]
      }
    }
  ],
  "triggers": {
    "automatic": false,
    "schedule": null,
    "onDataUpdate": true
  }
}

Réponse

Retourne 201 Created avec l'objet complet du pipeline incluant les IDs et horodatages générés.

Mettre à jour un pipeline

PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Met à jour la configuration, les étapes ou les déclencheurs d’un pipeline existant. Les modifications prennent effet lors de la prochaine exécution.

Corps de la requête

{
  "pipelineName": "Updated Customer Journey Enrichment",
  "pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
  "status": "Active",
  "triggers": {
    "automatic": true,
    "schedule": "0 3 * * *",
    "onDataUpdate": true
  }
}

Réponse

Retourne l’objet pipeline mis à jour avec la même structure que le endpoint GET.

Supprimer un pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Supprime définitivement un pipeline ainsi que tout son historique d’exécution. Cette opération est irréversible et arrêtera toute exécution en cours.

Codes de réponse

  • 204 No Content - Pipeline supprimé avec succès
  • 404 Not Found - Pipeline introuvable ou accès refusé
  • 409 Conflict - Pipeline en cours d'exécution et ne peut pas être supprimé

Ajouter une étape au pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage

Ajoute une nouvelle étape de traitement à un pipeline existant. L’étape sera insérée à la position d’ordre spécifiée.

Corps de la requête

{
  "stageName": "Process Performance Metrics",
  "stageType": "PerformanceCalculation",
  "order": 3,
  "configuration": {
    "metrics": [
      {
        "name": "cycleTime",
        "calculation": "CASE_DURATION",
        "unit": "hours"
      },
      {
        "name": "waitTime",
        "calculation": "ACTIVITY_WAITING_TIME",
        "unit": "hours"
      }
    ],
    "aggregations": ["AVG", "MAX", "MIN", "P95"]
  }
}

Réponse

Retourne 201 Created avec l’objet complet de l’étape incluant l’ID généré.

Supprimer une étape du pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}

Supprime une étape spécifique du pipeline. Les étapes suivantes seront réordonnées automatiquement.

Codes de réponse

  • 204 No Content - Étape supprimée avec succès
  • 404 Not Found - Étape introuvable dans le pipeline
  • 409 Conflict - Impossible de supprimer l’étape pendant l’exécution du pipeline

Exemple : Workflow complet de pipeline

Cet exemple montre la création et la gestion d’un pipeline d’enrichissement :

// 1. Créer un nouveau pipeline d'enrichissement
const createPipeline = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      pipelineName: 'Order Processing Enrichment',
      pipelineDescription: 'Enriches order data with fulfillment metrics',
      stages: [
        {
          stageName: 'Order Validation',
          stageType: 'Validation',
          order: 1,
          configuration: {
            validateOrderId: true,
            validateCustomerId: true,
            validateAmounts: true
          }
        },
        {
          stageName: 'Fulfillment Time Calculation',
          stageType: 'TimeCalculation',
          order: 2,
          configuration: {
            startActivity: 'Order Received',
            endActivity: 'Order Shipped',
            outputField: 'fulfillmentTime'
          }
        }
      ],
      triggers: {
        automatic: true,
        onDataUpdate: true
      }
    })
  });

  return await response.json();
};

// 2. Ajouter une nouvelle étape à un pipeline existant
const addStage = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      stageName: 'Customer Segmentation',
      stageType: 'Classification',
      order: 3,
      configuration: {
        segmentationRules: [
          {
            segment: 'VIP',
            condition: 'orderValue > 1000'
          },
          {
            segment: 'Regular',
            condition: 'orderValue <= 1000'
          }
        ]
      }
    })
  });

  return await response.json();
};

// 3. Obtenir le statut du pipeline
const getPipelineStatus = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  return await response.json();
};

Exemple en Python

import requests
import json
from datetime import datetime

class EnrichmentPipelineManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def create_pipeline(self, name, description, stages, triggers=None):
        """Créer un nouveau pipeline d'enrichissement"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
        payload = {
            'pipelineName': name,
            'pipelineDescription': description,
            'stages': stages,
            'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_pipeline(self, pipeline_id):
        """Obtenir les détails du pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_pipelines(self, status=None, page=1, page_size=20):
        """Lister tous les pipelines avec filtrage optionnel"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
        params = {'page': page, 'pageSize': page_size}
        if status:
            params['status'] = status

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
        """Ajouter une nouvelle étape à un pipeline existant"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
        payload = {
            'stageName': stage_name,
            'stageType': stage_type,
            'order': order,
            'configuration': configuration
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
        """Mettre à jour la configuration du pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        payload = {}
        if name:
            payload['pipelineName'] = name
        if description:
            payload['pipelineDescription'] = description
        if status:
            payload['status'] = status
        if triggers:
            payload['triggers'] = triggers

        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def delete_pipeline(self, pipeline_id):
        """Supprimer un pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.delete(url, headers=self.headers)
        return response.status_code == 204

# Exemple d'utilisation
manager = EnrichmentPipelineManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Créer un pipeline d'enrichissement complet
stages = [
    {
        'stageName': 'Data Quality Check',
        'stageType': 'Validation',
        'order': 1,
        'configuration': {
            'checkDuplicates': True,
            'validateTimestamps': True,
            'checkMissingValues': True
        }
    },
    {
        'stageName': 'Process Mining Metrics',
        'stageType': 'ProcessCalculation',
        'order': 2,
        'configuration': {
            'calculateCycleTime': True,
            'calculateWaitingTime': True,
            'calculateResourceUtilization': True,
            'detectBottlenecks': True
        }
    },
    {
        'stageName': 'Anomaly Detection',
        'stageType': 'AnomalyDetection',
        'order': 3,
        'configuration': {
            'algorithm': 'isolation_forest',
            'threshold': 0.1,
            'features': ['duration', 'cost', 'resourceCount']
        }
    }
]

pipeline = manager.create_pipeline(
    'Comprehensive Process Analysis',
    'End-to-end process analysis with anomaly detection',
    stages,
    {'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)

print(f"Pipeline créé : {pipeline['pipelineId']}")

# Lister tous les pipelines actifs
active_pipelines = manager.list_pipelines(status='Active')
print(f"{active_pipelines['totalCount']} pipelines actifs trouvés")