Pipelines d'enrichissement
Créer des workflows d'enrichissement de données
Créez et gérez des pipelines d'enrichissement pour transformer et améliorer vos ensembles de données de process mining.
Obtenir les détails d'un pipeline
GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Récupère des informations complètes sur un pipeline d'enrichissement spécifique, y compris ses étapes, sa configuration et les métadonnées d'exécution.
Paramètres
| Paramètre | Type | Emplacement | Description |
|---|---|---|---|
tenantId |
GUID | Chemin | L'identifiant du tenant |
projectId |
GUID | Chemin | L'identifiant du projet |
pipelineId |
GUID | Chemin | L'identifiant du pipeline |
Réponse
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"pipelineDescription": "Enriches event logs with additional attributes and calculations",
"status": "Active",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"stageType": "Validation",
"order": 1,
"configuration": {
"validateCaseId": true,
"validateTimestamps": true,
"requireActivityNames": true
}
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"stageType": "TimeCalculation",
"order": 2,
"configuration": {
"addDayOfWeek": true,
"addBusinessHours": true,
"timezoneId": "UTC"
}
}
],
"triggers": {
"automatic": true,
"schedule": "0 2 * * *",
"onDataUpdate": true
},
"dateCreated": "2024-01-15T10:30:00Z",
"dateModified": "2024-01-20T14:45:00Z",
"createdBy": "user123",
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"executionCount": 45
}
Lister tous les pipelines
GET /api/{tenantId}/{projectId}/enrichment/pipelines
Récupère la liste de tous les pipelines d'enrichissement du projet avec les métadonnées de base et les informations de statut.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
status |
chaîne de caractères | Filtrer par statut du pipeline : Active, Inactive, Failed |
page |
entier | Numéro de page pour la pagination (par défaut : 1) |
pageSize |
entier | Nombre d'éléments par page (par défaut : 20, max : 100) |
Réponse
{
"pipelines": [
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"status": "Active",
"stageCount": 5,
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"dateCreated": "2024-01-15T10:30:00Z"
}
],
"totalCount": 12,
"page": 1,
"pageSize": 20,
"hasNextPage": false
}
Créer un nouveau pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline
Crée un nouveau pipeline d'enrichissement avec les étapes et la configuration spécifiées. Le pipeline peut être configuré pour s'exécuter automatiquement ou manuellement.
Corps de la requête
{
"pipelineName": "Customer Journey Enrichment",
"pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
"stages": [
{
"stageName": "Customer Data Lookup",
"stageType": "DataLookup",
"order": 1,
"configuration": {
"lookupTable": "customer_demographics",
"joinKey": "customerId",
"selectFields": ["age", "segment", "region"]
}
},
{
"stageName": "Journey Metrics",
"stageType": "Calculation",
"order": 2,
"configuration": {
"calculations": [
{
"fieldName": "journeyDuration",
"formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
"groupBy": "caseId"
},
{
"fieldName": "touchpointCount",
"formula": "COUNT(*)",
"groupBy": "caseId"
}
]
}
}
],
"triggers": {
"automatic": false,
"schedule": null,
"onDataUpdate": true
}
}
Réponse
Retourne 201 Created avec l'objet complet du pipeline incluant les IDs et horodatages générés.
Mettre à jour un pipeline
PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Met à jour la configuration, les étapes ou les déclencheurs d’un pipeline existant. Les modifications prennent effet lors de la prochaine exécution.
Corps de la requête
{
"pipelineName": "Updated Customer Journey Enrichment",
"pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
"status": "Active",
"triggers": {
"automatic": true,
"schedule": "0 3 * * *",
"onDataUpdate": true
}
}
Réponse
Retourne l’objet pipeline mis à jour avec la même structure que le endpoint GET.
Supprimer un pipeline
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Supprime définitivement un pipeline ainsi que tout son historique d’exécution. Cette opération est irréversible et arrêtera toute exécution en cours.
Codes de réponse
204 No Content- Pipeline supprimé avec succès404 Not Found- Pipeline introuvable ou accès refusé409 Conflict- Pipeline en cours d'exécution et ne peut pas être supprimé
Ajouter une étape au pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage
Ajoute une nouvelle étape de traitement à un pipeline existant. L’étape sera insérée à la position d’ordre spécifiée.
Corps de la requête
{
"stageName": "Process Performance Metrics",
"stageType": "PerformanceCalculation",
"order": 3,
"configuration": {
"metrics": [
{
"name": "cycleTime",
"calculation": "CASE_DURATION",
"unit": "hours"
},
{
"name": "waitTime",
"calculation": "ACTIVITY_WAITING_TIME",
"unit": "hours"
}
],
"aggregations": ["AVG", "MAX", "MIN", "P95"]
}
}
Réponse
Retourne 201 Created avec l’objet complet de l’étape incluant l’ID généré.
Supprimer une étape du pipeline
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}
Supprime une étape spécifique du pipeline. Les étapes suivantes seront réordonnées automatiquement.
Codes de réponse
204 No Content- Étape supprimée avec succès404 Not Found- Étape introuvable dans le pipeline409 Conflict- Impossible de supprimer l’étape pendant l’exécution du pipeline
Exemple : Workflow complet de pipeline
Cet exemple montre la création et la gestion d’un pipeline d’enrichissement :
// 1. Créer un nouveau pipeline d'enrichissement
const createPipeline = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
pipelineName: 'Order Processing Enrichment',
pipelineDescription: 'Enriches order data with fulfillment metrics',
stages: [
{
stageName: 'Order Validation',
stageType: 'Validation',
order: 1,
configuration: {
validateOrderId: true,
validateCustomerId: true,
validateAmounts: true
}
},
{
stageName: 'Fulfillment Time Calculation',
stageType: 'TimeCalculation',
order: 2,
configuration: {
startActivity: 'Order Received',
endActivity: 'Order Shipped',
outputField: 'fulfillmentTime'
}
}
],
triggers: {
automatic: true,
onDataUpdate: true
}
})
});
return await response.json();
};
// 2. Ajouter une nouvelle étape à un pipeline existant
const addStage = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
stageName: 'Customer Segmentation',
stageType: 'Classification',
order: 3,
configuration: {
segmentationRules: [
{
segment: 'VIP',
condition: 'orderValue > 1000'
},
{
segment: 'Regular',
condition: 'orderValue <= 1000'
}
]
}
})
});
return await response.json();
};
// 3. Obtenir le statut du pipeline
const getPipelineStatus = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
return await response.json();
};
Exemple en Python
import requests
import json
from datetime import datetime
class EnrichmentPipelineManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def create_pipeline(self, name, description, stages, triggers=None):
"""Créer un nouveau pipeline d'enrichissement"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
payload = {
'pipelineName': name,
'pipelineDescription': description,
'stages': stages,
'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_pipeline(self, pipeline_id):
"""Obtenir les détails du pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_pipelines(self, status=None, page=1, page_size=20):
"""Lister tous les pipelines avec filtrage optionnel"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
"""Ajouter une nouvelle étape à un pipeline existant"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
payload = {
'stageName': stage_name,
'stageType': stage_type,
'order': order,
'configuration': configuration
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
"""Mettre à jour la configuration du pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
payload = {}
if name:
payload['pipelineName'] = name
if description:
payload['pipelineDescription'] = description
if status:
payload['status'] = status
if triggers:
payload['triggers'] = triggers
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def delete_pipeline(self, pipeline_id):
"""Supprimer un pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.delete(url, headers=self.headers)
return response.status_code == 204
# Exemple d'utilisation
manager = EnrichmentPipelineManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Créer un pipeline d'enrichissement complet
stages = [
{
'stageName': 'Data Quality Check',
'stageType': 'Validation',
'order': 1,
'configuration': {
'checkDuplicates': True,
'validateTimestamps': True,
'checkMissingValues': True
}
},
{
'stageName': 'Process Mining Metrics',
'stageType': 'ProcessCalculation',
'order': 2,
'configuration': {
'calculateCycleTime': True,
'calculateWaitingTime': True,
'calculateResourceUtilization': True,
'detectBottlenecks': True
}
},
{
'stageName': 'Anomaly Detection',
'stageType': 'AnomalyDetection',
'order': 3,
'configuration': {
'algorithm': 'isolation_forest',
'threshold': 0.1,
'features': ['duration', 'cost', 'resourceCount']
}
}
]
pipeline = manager.create_pipeline(
'Comprehensive Process Analysis',
'End-to-end process analysis with anomaly detection',
stages,
{'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)
print(f"Pipeline créé : {pipeline['pipelineId']}")
# Lister tous les pipelines actifs
active_pipelines = manager.list_pipelines(status='Active')
print(f"{active_pipelines['totalCount']} pipelines actifs trouvés")