Ejecución de Pipeline
Ejecutar Pipelines de Enriquecimiento
Ejecute pipelines de enriquecimiento sobre conjuntos de datos, monitoree el progreso y recupere resultados mejorados.
Ejecutar Pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute
Inicia la ejecución de un pipeline de enriquecimiento en un conjunto de datos especificado. La ejecución se realiza de forma asincrónica y retorna un ID de ejecución para el seguimiento del progreso.
Parámetros
| Parámetro | Tipo | Ubicación | Descripción |
|---|---|---|---|
tenantId |
GUID | Path | El identificador del tenant |
projectId |
GUID | Path | El identificador del proyecto |
pipelineId |
GUID | Path | El identificador del pipeline |
Cuerpo de la Solicitud
{
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"executionName": "Análisis del Proceso Mensual",
"executionDescription": "Enriquecimiento para la revisión mensual de desempeño",
"parameters": {
"timeRange": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"filterCriteria": {
"includeWeekends": false,
"minCaseDuration": "1h"
},
"outputOptions": {
"includeRawData": true,
"generateSummary": true,
"exportFormat": "CSV"
}
},
"priority": "Normal",
"notifyOnCompletion": true
}
Respuesta
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"estimatedDuration": "15-20 minutos",
"executionName": "Análisis del Proceso Mensual",
"dateSubmitted": "2024-01-20T10:30:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Validación de Datos",
"status": "Pending",
"estimatedDuration": "2-3 minutos"
},
{
"stageId": "stage-002",
"stageName": "Enriquecimiento Temporal",
"status": "Pending",
"estimatedDuration": "8-10 minutos"
}
]
}
Obtener Estado de Ejecución
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Recupera el estado actual e información del progreso de una ejecución de pipeline, incluyendo progreso detallado por etapa.
Respuesta
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"progress": 45,
"currentStage": {
"stageId": "stage-002",
"stageName": "Enriquecimiento Temporal",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"estimatedCompletion": "2024-01-20T10:45:00Z"
},
"executionName": "Análisis del Proceso Mensual",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateStarted": "2024-01-20T10:32:00Z",
"estimatedCompletion": "2024-01-20T10:50:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Validación de Datos",
"status": "Completed",
"progress": 100,
"startTime": "2024-01-20T10:32:00Z",
"endTime": "2024-01-20T10:35:00Z",
"duration": "3 minutos",
"recordsProcessed": 15420,
"validationResults": {
"totalRecords": 15420,
"validRecords": 15418,
"errors": 2,
"warnings": 15
}
},
{
"stageId": "stage-002",
"stageName": "Enriquecimiento Temporal",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"recordsProcessed": 9252,
"totalRecords": 15418
}
],
"metrics": {
"totalRecords": 15420,
"processedRecords": 9252,
"errorCount": 2,
"warningCount": 15
}
}
Obtener Resultados de Ejecución
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results
Recupera los resultados finales de una ejecución de pipeline completada, incluyendo datos enriquecidos, estadísticas resumidas y salidas para descargar.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
format |
string | Formato de respuesta: summary, full, download (por defecto: summary) |
includeRawData |
boolean | Incluir el conjunto de datos original en la respuesta (por defecto: false) |
limit |
integer | Limitar número de registros devueltos (máximo: 10000) |
Respuesta
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionDate": "2024-01-20T10:48:00Z",
"totalDuration": "18 minutos",
"summary": {
"originalRecords": 15420,
"enrichedRecords": 15418,
"newAttributes": 8,
"dataQualityScore": 98.7,
"enrichmentCoverage": 99.9
},
"enrichedAttributes": [
{
"attributeName": "dayOfWeek",
"attributeType": "string",
"coverage": 100,
"uniqueValues": 7,
"description": "Día de la semana para cada evento"
},
{
"attributeName": "businessHours",
"attributeType": "boolean",
"coverage": 100,
"description": "Si el evento ocurrió durante horas laborales"
},
{
"attributeName": "cycleTime",
"attributeType": "duration",
"coverage": 99.8,
"averageValue": "4.2 horas",
"description": "Tiempo desde el inicio hasta la finalización del caso"
}
],
"dataQuality": {
"completeness": 99.9,
"accuracy": 98.5,
"consistency": 99.2,
"validity": 97.8,
"issues": [
{
"type": "Marca de tiempo faltante",
"count": 2,
"severity": "Alta"
},
{
"type": "Duración inválida",
"count": 15,
"severity": "Media"
}
]
},
"downloadUrls": {
"enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
"summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
"dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
}
}
Listar Ejecuciones de Pipeline
GET /api/{tenantId}/{projectId}/enrichment/executions
Recupera una lista de todas las ejecuciones de pipelines con opciones de filtrado y paginación. Útil para monitorear historial y rendimiento de ejecuciones.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
pipelineId |
GUID | Filtrar por pipeline específico |
status |
string | Filtrar por estado: Queued, Running, Completed, Failed, Cancelled |
dateFrom |
datetime | Filtrar ejecuciones a partir de esta fecha |
dateTo |
datetime | Filtrar ejecuciones hasta esta fecha |
page |
integer | Número de página para paginación (por defecto: 1) |
pageSize |
integer | Número de elementos por página (por defecto: 20, máximo: 100) |
Respuesta
{
"executions": [
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Enriquecimiento de Datos de Minería de Procesos",
"executionName": "Análisis del Proceso Mensual",
"status": "Completed",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateCompleted": "2024-01-20T10:48:00Z",
"duration": "18 minutos",
"recordsProcessed": 15418,
"priority": "Normal",
"submittedBy": "user123"
}
],
"totalCount": 47,
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Cancelar Ejecución
DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Cancela una ejecución de pipeline en ejecución o en cola. Las etapas completadas se preservarán, pero la ejecución se detendrá en la etapa actual.
Cuerpo de la Solicitud (Opcional)
{
"reason": "Cancelación solicitada por el usuario",
"preservePartialResults": true
}
Códigos de Respuesta
200 OK- Ejecución cancelada exitosamente404 Not Found- Ejecución no encontrada409 Conflict- Ejecución ya completada o no puede ser cancelada
Reiniciar Ejecución Fallida
POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart
Reinicia una ejecución de pipeline fallida desde el punto de fallo. Las etapas previamente completadas serán omitidas a menos que se solicite explícitamente reejecutarlas.
Cuerpo de la Solicitud
{
"restartFromStage": "stage-003",
"rerunCompletedStages": false,
"updateParameters": {
"retryFailedRecords": true,
"increaseTimeout": true
}
}
Respuesta
Retorna 200 OK con un nuevo objeto de ejecución que contiene ID de ejecución actualizado y estado.
Ejemplo: Flujo Completo de Ejecución
Este ejemplo muestra cómo ejecutar un pipeline y monitorear su progreso:
// 1. Ejecutar pipeline
const executeEnrichment = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
datasetId: '880e8400-e29b-41d4-a716-446655440000',
executionName: 'Análisis del Viaje del Cliente',
executionDescription: 'Enriqueciendo datos del cliente con métricas del viaje',
parameters: {
timeRange: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
outputOptions: {
includeRawData: true,
generateSummary: true,
exportFormat: 'CSV'
}
},
priority: 'High',
notifyOnCompletion: true
})
});
return await response.json();
};
// 2. Monitorear progreso de ejecución
const monitorExecution = async (executionId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const execution = await response.json();
console.log(`Estado: ${execution.status}, Progreso: ${execution.progress}%`);
if (execution.status === 'Running' || execution.status === 'Queued') {
// Verificar nuevamente en 30 segundos
setTimeout(() => checkStatus(), 30000);
} else if (execution.status === 'Completed') {
console.log('¡Ejecución completada exitosamente!');
await getResults(executionId);
} else if (execution.status === 'Failed') {
console.log('Ejecución fallida:', execution.error);
}
};
await checkStatus();
};
// 3. Obtener resultados cuando esté completado
const getResults = async (executionId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Resumen de Enriquecimiento:', results.summary);
console.log('URLs de Descarga:', results.downloadUrls);
return results;
};
// Ejecutar flujo
executeEnrichment()
.then(execution => {
console.log(`Ejecución iniciada: ${execution.executionId}`);
return monitorExecution(execution.executionId);
})
.catch(error => console.error('Ejecución fallida:', error));
Ejemplo en Python
import requests
import time
import json
from datetime import datetime, timedelta
class PipelineExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
"""Ejecutar un pipeline de enriquecimiento"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
payload = {
'datasetId': dataset_id,
'executionName': execution_name,
'parameters': parameters or {},
'priority': priority,
'notifyOnCompletion': True
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_execution_status(self, execution_id):
"""Obtener estado actual de la ejecución"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
"""Esperar a que la ejecución termine consultando periódicamente el estado"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_execution_status(execution_id)
print(f"Ejecución {execution_id}: {status['status']} ({status.get('progress', 0)}%)")
if status['status'] in ['Completed', 'Failed', 'Cancelled']:
return status
time.sleep(poll_interval)
raise TimeoutError(f"La ejecución {execution_id} no terminó en {timeout} segundos")
def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
"""Obtener resultados de la ejecución"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
params = {
'format': format_type,
'includeRawData': include_raw_data
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_execution(self, execution_id, reason="Cancelación por usuario"):
"""Cancelar una ejecución en curso"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
payload = {
'reason': reason,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
"""Listar ejecuciones de pipeline con filtro"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
params = {'page': page, 'pageSize': page_size}
if pipeline_id:
params['pipelineId'] = pipeline_id
if status:
params['status'] = status
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
# Ejemplo de uso
manager = PipelineExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Ejecutar pipeline con parámetros personalizados
execution_params = {
'timeRange': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'filterCriteria': {
'includeWeekends': False,
'minCaseDuration': '1h'
},
'outputOptions': {
'includeRawData': True,
'generateSummary': True,
'exportFormat': 'CSV'
}
}
try:
# Iniciar ejecución
execution = manager.execute_pipeline(
'pipeline-guid',
'dataset-guid',
'Análisis del Proceso Mensual',
execution_params,
'High'
)
print(f"Ejecución iniciada: {execution['executionId']}")
print(f"Duración estimada: {execution['estimatedDuration']}")
# Esperar a que termine
final_status = manager.wait_for_completion(execution['executionId'])
if final_status['status'] == 'Completed':
# Obtener resultados
results = manager.get_execution_results(execution['executionId'])
print(f"¡Enriquecimiento completado exitosamente!")
print(f"Registros originales: {results['summary']['originalRecords']}")
print(f"Registros enriquecidos: {results['summary']['enrichedRecords']}")
print(f"Puntuación de calidad de datos: {results['summary']['dataQualityScore']}")
print(f"Descargar datos enriquecidos: {results['downloadUrls']['enrichedDataset']}")
else:
print(f"Ejecución fallida con estado: {final_status['status']}")
except Exception as e:
print(f"Error ejecutando pipeline: {e}")