Ejecución de Pipeline

Ejecutar Pipelines de Enriquecimiento

Ejecute pipelines de enriquecimiento sobre conjuntos de datos, monitoree el progreso y recupere resultados mejorados.

Ejecutar Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Inicia la ejecución de un pipeline de enriquecimiento en un conjunto de datos especificado. La ejecución se realiza de forma asincrónica y retorna un ID de ejecución para el seguimiento del progreso.

Parámetros

Parámetro Tipo Ubicación Descripción
tenantId GUID Path El identificador del tenant
projectId GUID Path El identificador del proyecto
pipelineId GUID Path El identificador del pipeline

Cuerpo de la Solicitud

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Análisis del Proceso Mensual",
  "executionDescription": "Enriquecimiento para la revisión mensual de desempeño",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Respuesta

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 minutos",
  "executionName": "Análisis del Proceso Mensual",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Validación de Datos",
      "status": "Pending",
      "estimatedDuration": "2-3 minutos"
    },
    {
      "stageId": "stage-002",
      "stageName": "Enriquecimiento Temporal",
      "status": "Pending",
      "estimatedDuration": "8-10 minutos"
    }
  ]
}

Obtener Estado de Ejecución

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Recupera el estado actual e información del progreso de una ejecución de pipeline, incluyendo progreso detallado por etapa.

Respuesta

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Enriquecimiento Temporal",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Análisis del Proceso Mensual",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Validación de Datos",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 minutos",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Enriquecimiento Temporal",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Obtener Resultados de Ejecución

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Recupera los resultados finales de una ejecución de pipeline completada, incluyendo datos enriquecidos, estadísticas resumidas y salidas para descargar.

Parámetros de Consulta

Parámetro Tipo Descripción
format string Formato de respuesta: summary, full, download (por defecto: summary)
includeRawData boolean Incluir el conjunto de datos original en la respuesta (por defecto: false)
limit integer Limitar número de registros devueltos (máximo: 10000)

Respuesta

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 minutos",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Día de la semana para cada evento"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Si el evento ocurrió durante horas laborales"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 horas",
      "description": "Tiempo desde el inicio hasta la finalización del caso"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Marca de tiempo faltante",
        "count": 2,
        "severity": "Alta"
      },
      {
        "type": "Duración inválida",
        "count": 15,
        "severity": "Media"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

Listar Ejecuciones de Pipeline

GET /api/{tenantId}/{projectId}/enrichment/executions

Recupera una lista de todas las ejecuciones de pipelines con opciones de filtrado y paginación. Útil para monitorear historial y rendimiento de ejecuciones.

Parámetros de Consulta

Parámetro Tipo Descripción
pipelineId GUID Filtrar por pipeline específico
status string Filtrar por estado: Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Filtrar ejecuciones a partir de esta fecha
dateTo datetime Filtrar ejecuciones hasta esta fecha
page integer Número de página para paginación (por defecto: 1)
pageSize integer Número de elementos por página (por defecto: 20, máximo: 100)

Respuesta

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Enriquecimiento de Datos de Minería de Procesos",
      "executionName": "Análisis del Proceso Mensual",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 minutos",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Cancelar Ejecución

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Cancela una ejecución de pipeline en ejecución o en cola. Las etapas completadas se preservarán, pero la ejecución se detendrá en la etapa actual.

Cuerpo de la Solicitud (Opcional)

{
  "reason": "Cancelación solicitada por el usuario",
  "preservePartialResults": true
}

Códigos de Respuesta

  • 200 OK - Ejecución cancelada exitosamente
  • 404 Not Found - Ejecución no encontrada
  • 409 Conflict - Ejecución ya completada o no puede ser cancelada

Reiniciar Ejecución Fallida

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Reinicia una ejecución de pipeline fallida desde el punto de fallo. Las etapas previamente completadas serán omitidas a menos que se solicite explícitamente reejecutarlas.

Cuerpo de la Solicitud

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Respuesta

Retorna 200 OK con un nuevo objeto de ejecución que contiene ID de ejecución actualizado y estado.

Ejemplo: Flujo Completo de Ejecución

Este ejemplo muestra cómo ejecutar un pipeline y monitorear su progreso:

// 1. Ejecutar pipeline
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Análisis del Viaje del Cliente',
      executionDescription: 'Enriqueciendo datos del cliente con métricas del viaje',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Monitorear progreso de ejecución
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Estado: ${execution.status}, Progreso: ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // Verificar nuevamente en 30 segundos
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('¡Ejecución completada exitosamente!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Ejecución fallida:', execution.error);
    }
  };

  await checkStatus();
};

// 3. Obtener resultados cuando esté completado
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Resumen de Enriquecimiento:', results.summary);
  console.log('URLs de Descarga:', results.downloadUrls);

  return results;
};

// Ejecutar flujo
executeEnrichment()
  .then(execution => {
    console.log(`Ejecución iniciada: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Ejecución fallida:', error));

Ejemplo en Python

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Ejecutar un pipeline de enriquecimiento"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Obtener estado actual de la ejecución"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Esperar a que la ejecución termine consultando periódicamente el estado"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Ejecución {execution_id}: {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"La ejecución {execution_id} no terminó en {timeout} segundos")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Obtener resultados de la ejecución"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="Cancelación por usuario"):
        """Cancelar una ejecución en curso"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """Listar ejecuciones de pipeline con filtro"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Ejemplo de uso
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Ejecutar pipeline con parámetros personalizados
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Iniciar ejecución
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Análisis del Proceso Mensual',
        execution_params,
        'High'
    )

    print(f"Ejecución iniciada: {execution['executionId']}")
    print(f"Duración estimada: {execution['estimatedDuration']}")

    # Esperar a que termine
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Obtener resultados
        results = manager.get_execution_results(execution['executionId'])
        print(f"¡Enriquecimiento completado exitosamente!")
        print(f"Registros originales: {results['summary']['originalRecords']}")
        print(f"Registros enriquecidos: {results['summary']['enrichedRecords']}")
        print(f"Puntuación de calidad de datos: {results['summary']['dataQualityScore']}")
        print(f"Descargar datos enriquecidos: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Ejecución fallida con estado: {final_status['status']}")

except Exception as e:
    print(f"Error ejecutando pipeline: {e}")