Exécution du Pipeline
Exécuter des Pipelines d'Enrichissement
Exécutez des pipelines d'enrichissement sur des jeux de données, surveillez la progression et récupérez les résultats améliorés.
Exécuter un Pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute
Déclenche l'exécution d'un pipeline d'enrichissement sur un jeu de données spécifié. L'exécution s'effectue de manière asynchrone et retourne un ID d'exécution pour suivre la progression.
Paramètres
| Paramètre | Type | Emplacement | Description |
|---|---|---|---|
tenantId |
GUID | Chemin | L'identifiant du locataire |
projectId |
GUID | Chemin | L'identifiant du projet |
pipelineId |
GUID | Chemin | L'identifiant du pipeline |
Corps de la demande
{
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"executionName": "Analyse Mensuelle du Processus",
"executionDescription": "Enrichissement pour la revue de performance mensuelle",
"parameters": {
"timeRange": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"filterCriteria": {
"includeWeekends": false,
"minCaseDuration": "1h"
},
"outputOptions": {
"includeRawData": true,
"generateSummary": true,
"exportFormat": "CSV"
}
},
"priority": "Normal",
"notifyOnCompletion": true
}
Réponse
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"estimatedDuration": "15-20 minutes",
"executionName": "Analyse Mensuelle du Processus",
"dateSubmitted": "2024-01-20T10:30:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Validation des Données",
"status": "Pending",
"estimatedDuration": "2-3 minutes"
},
{
"stageId": "stage-002",
"stageName": "Enrichissement temporel",
"status": "Pending",
"estimatedDuration": "8-10 minutes"
}
]
}
Obtenir le Statut de l'Exécution
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Récupère le statut actuel et les informations de progression pour une exécution de pipeline, y compris les détails d'avancement étape par étape.
Réponse
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"progress": 45,
"currentStage": {
"stageId": "stage-002",
"stageName": "Enrichissement temporel",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"estimatedCompletion": "2024-01-20T10:45:00Z"
},
"executionName": "Analyse Mensuelle du Processus",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateStarted": "2024-01-20T10:32:00Z",
"estimatedCompletion": "2024-01-20T10:50:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Validation des Données",
"status": "Completed",
"progress": 100,
"startTime": "2024-01-20T10:32:00Z",
"endTime": "2024-01-20T10:35:00Z",
"duration": "3 minutes",
"recordsProcessed": 15420,
"validationResults": {
"totalRecords": 15420,
"validRecords": 15418,
"errors": 2,
"warnings": 15
}
},
{
"stageId": "stage-002",
"stageName": "Enrichissement temporel",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"recordsProcessed": 9252,
"totalRecords": 15418
}
],
"metrics": {
"totalRecords": 15420,
"processedRecords": 9252,
"errorCount": 2,
"warningCount": 15
}
}
Obtenir les Résultats de l'Exécution
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results
Récupère les résultats finaux d'une exécution de pipeline terminée, incluant les données enrichies, les statistiques sommaires et les sorties téléchargeables.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
format |
string | Format de la réponse : summary, full, download (par défaut : summary) |
includeRawData |
boolean | Inclure le jeu de données d'origine dans la réponse (par défaut : false) |
limit |
integer | Limite du nombre d'enregistrements retournés (max : 10000) |
Réponse
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionDate": "2024-01-20T10:48:00Z",
"totalDuration": "18 minutes",
"summary": {
"originalRecords": 15420,
"enrichedRecords": 15418,
"newAttributes": 8,
"dataQualityScore": 98.7,
"enrichmentCoverage": 99.9
},
"enrichedAttributes": [
{
"attributeName": "dayOfWeek",
"attributeType": "string",
"coverage": 100,
"uniqueValues": 7,
"description": "Jour de la semaine pour chaque événement"
},
{
"attributeName": "businessHours",
"attributeType": "boolean",
"coverage": 100,
"description": "Indique si l'événement a eu lieu pendant les heures ouvrables"
},
{
"attributeName": "cycleTime",
"attributeType": "duration",
"coverage": 99.8,
"averageValue": "4.2 hours",
"description": "Temps entre le début et la fin du cas"
}
],
"dataQuality": {
"completeness": 99.9,
"accuracy": 98.5,
"consistency": 99.2,
"validity": 97.8,
"issues": [
{
"type": "Timestamp manquant",
"count": 2,
"severity": "Élevé"
},
{
"type": "Durée invalide",
"count": 15,
"severity": "Moyen"
}
]
},
"downloadUrls": {
"enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
"summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
"dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
}
}
Lister les Exécutions de Pipelines
GET /api/{tenantId}/{projectId}/enrichment/executions
Récupère une liste de toutes les exécutions de pipeline avec options de filtrage et de pagination. Utile pour surveiller l'historique des exécutions et leur performance.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
pipelineId |
GUID | Filtrer par pipeline spécifique |
status |
string | Filtrer par statut : Queued, Running, Completed, Failed, Cancelled |
dateFrom |
datetime | Filtrer les exécutions à partir de cette date |
dateTo |
datetime | Filtrer les exécutions jusqu'à cette date |
page |
integer | Numéro de page pour la pagination (par défaut : 1) |
pageSize |
integer | Nombre d'items par page (par défaut : 20, max : 100) |
Réponse
{
"executions": [
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Enrichissement des Données de Process Mining",
"executionName": "Analyse Mensuelle du Processus",
"status": "Completed",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateCompleted": "2024-01-20T10:48:00Z",
"duration": "18 minutes",
"recordsProcessed": 15418,
"priority": "Normal",
"submittedBy": "user123"
}
],
"totalCount": 47,
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Annuler une Exécution
DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Annule une exécution de pipeline en cours ou en file d'attente. Les étapes déjà complétées seront conservées, mais l'exécution s'arrêtera à l'étape courante.
Corps de la demande (optionnel)
{
"reason": "Annulation demandée par l'utilisateur",
"preservePartialResults": true
}
Codes de réponse
200 OK- Exécution annulée avec succès404 Not Found- Exécution non trouvée409 Conflict- Exécution déjà terminée ou ne peut être annulée
Redémarrer une Exécution Échouée
POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart
Redémarre une exécution de pipeline échouée à partir du point de défaillance. Les étapes terminées précédemment seront sautées sauf si la re-exécution est explicitement demandée.
Corps de la demande
{
"restartFromStage": "stage-003",
"rerunCompletedStages": false,
"updateParameters": {
"retryFailedRecords": true,
"increaseTimeout": true
}
}
Réponse
Retourne 200 OK avec un nouvel objet d'exécution contenant un nouvel ID d'exécution et le statut mis à jour.
Exemple : Flux Complet d'Exécution
Cet exemple montre comment exécuter un pipeline et surveiller sa progression :
// 1. Exécuter le pipeline
const executeEnrichment = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
datasetId: '880e8400-e29b-41d4-a716-446655440000',
executionName: 'Analyse du Parcours Client',
executionDescription: 'Enrichissement des données clients avec des métriques de parcours',
parameters: {
timeRange: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
outputOptions: {
includeRawData: true,
generateSummary: true,
exportFormat: 'CSV'
}
},
priority: 'High',
notifyOnCompletion: true
})
});
return await response.json();
};
// 2. Surveiller la progression de l'exécution
const monitorExecution = async (executionId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const execution = await response.json();
console.log(`Statut : ${execution.status}, Progression : ${execution.progress}%`);
if (execution.status === 'Running' || execution.status === 'Queued') {
// Vérifier à nouveau dans 30 secondes
setTimeout(() => checkStatus(), 30000);
} else if (execution.status === 'Completed') {
console.log('Exécution terminée avec succès !');
await getResults(executionId);
} else if (execution.status === 'Failed') {
console.log('Exécution échouée :', execution.error);
}
};
await checkStatus();
};
// 3. Obtenir les résultats une fois terminée
const getResults = async (executionId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Résumé de l\'enrichissement :', results.summary);
console.log('URL de téléchargement :', results.downloadUrls);
return results;
};
// Exécuter le workflow
executeEnrichment()
.then(execution => {
console.log(`Exécution démarrée : ${execution.executionId}`);
return monitorExecution(execution.executionId);
})
.catch(error => console.error('Échec de l\'exécution :', error));
Exemple Python
import requests
import time
import json
from datetime import datetime, timedelta
class PipelineExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
"""Exécuter un pipeline d'enrichissement"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
payload = {
'datasetId': dataset_id,
'executionName': execution_name,
'parameters': parameters or {},
'priority': priority,
'notifyOnCompletion': True
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_execution_status(self, execution_id):
"""Obtenir le statut actuel de l'exécution"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
"""Attendre la fin de l'exécution avec des vérifications périodiques"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_execution_status(execution_id)
print(f"Exécution {execution_id} : {status['status']} ({status.get('progress', 0)}%)")
if status['status'] in ['Completed', 'Failed', 'Cancelled']:
return status
time.sleep(poll_interval)
raise TimeoutError(f"L'exécution {execution_id} n'a pas été terminée dans les {timeout} secondes")
def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
"""Obtenir les résultats de l'exécution"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
params = {
'format': format_type,
'includeRawData': include_raw_data
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_execution(self, execution_id, reason="Annulation par l'utilisateur"):
"""Annuler une exécution en cours"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
payload = {
'reason': reason,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
"""Lister les exécutions de pipeline avec filtrage"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
params = {'page': page, 'pageSize': page_size}
if pipeline_id:
params['pipelineId'] = pipeline_id
if status:
params['status'] = status
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
# Exemple d'utilisation
manager = PipelineExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Exécuter un pipeline avec paramètres personnalisés
execution_params = {
'timeRange': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'filterCriteria': {
'includeWeekends': False,
'minCaseDuration': '1h'
},
'outputOptions': {
'includeRawData': True,
'generateSummary': True,
'exportFormat': 'CSV'
}
}
try:
# Démarrer l'exécution
execution = manager.execute_pipeline(
'pipeline-guid',
'dataset-guid',
'Analyse Mensuelle du Processus',
execution_params,
'High'
)
print(f"Exécution démarrée : {execution['executionId']}")
print(f"Durée estimée : {execution['estimatedDuration']}")
# Attendre la fin
final_status = manager.wait_for_completion(execution['executionId'])
if final_status['status'] == 'Completed':
# Obtenir les résultats
results = manager.get_execution_results(execution['executionId'])
print(f"Enrichissement terminé avec succès !")
print(f"Enregistrements originaux : {results['summary']['originalRecords']}")
print(f"Enregistrements enrichis : {results['summary']['enrichedRecords']}")
print(f"Score de qualité des données : {results['summary']['dataQualityScore']}")
print(f"Télécharger les données enrichies : {results['downloadUrls']['enrichedDataset']}")
else:
print(f"Exécution échouée avec le statut : {final_status['status']}")
except Exception as e:
print(f"Erreur lors de l'exécution du pipeline : {e}")