Pipelines de Enriquecimiento

Construya Flujos de Trabajo de Enriquecimiento de Datos

Cree y gestione pipelines de enriquecimiento para transformar y mejorar sus conjuntos de datos de minería de procesos.

Obtener Detalles del Pipeline

GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Recupera información completa sobre un pipeline de enriquecimiento específico, incluyendo sus etapas, configuración y metadatos de ejecución.

Parámetros

Parámetro Tipo Ubicación Descripción
tenantId GUID Path El identificador del tenant
projectId GUID Path El identificador del proyecto
pipelineId GUID Path El identificador del pipeline

Respuesta

{
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "pipelineName": "Process Mining Data Enrichment",
  "pipelineDescription": "Enriches event logs with additional attributes and calculations",
  "status": "Active",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "stageType": "Validation",
      "order": 1,
      "configuration": {
        "validateCaseId": true,
        "validateTimestamps": true,
        "requireActivityNames": true
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "stageType": "TimeCalculation",
      "order": 2,
      "configuration": {
        "addDayOfWeek": true,
        "addBusinessHours": true,
        "timezoneId": "UTC"
      }
    }
  ],
  "triggers": {
    "automatic": true,
    "schedule": "0 2 * * *",
    "onDataUpdate": true
  },
  "dateCreated": "2024-01-15T10:30:00Z",
  "dateModified": "2024-01-20T14:45:00Z",
  "createdBy": "user123",
  "lastExecutionDate": "2024-01-20T02:00:00Z",
  "lastExecutionStatus": "Success",
  "executionCount": 45
}

Listar Todos los Pipelines

GET /api/{tenantId}/{projectId}/enrichment/pipelines

Recupera una lista de todos los pipelines de enriquecimiento en el proyecto con metadatos básicos e información de estado.

Parámetros Query

Parámetro Tipo Descripción
status string Filtrar por estado del pipeline: Active, Inactive, Failed
page integer Número de página para paginación (por defecto: 1)
pageSize integer Número de elementos por página (por defecto: 20, máximo: 100)

Respuesta

{
  "pipelines": [
    {
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "status": "Active",
      "stageCount": 5,
      "lastExecutionDate": "2024-01-20T02:00:00Z",
      "lastExecutionStatus": "Success",
      "dateCreated": "2024-01-15T10:30:00Z"
    }
  ],
  "totalCount": 12,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": false
}

Crear Nuevo Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline

Crea un nuevo pipeline de enriquecimiento con etapas y configuración especificadas. El pipeline puede configurarse para ejecutarse automáticamente o manualmente.

Cuerpo de la Solicitud

{
  "pipelineName": "Customer Journey Enrichment",
  "pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
  "stages": [
    {
      "stageName": "Customer Data Lookup",
      "stageType": "DataLookup",
      "order": 1,
      "configuration": {
        "lookupTable": "customer_demographics",
        "joinKey": "customerId",
        "selectFields": ["age", "segment", "region"]
      }
    },
    {
      "stageName": "Journey Metrics",
      "stageType": "Calculation",
      "order": 2,
      "configuration": {
        "calculations": [
          {
            "fieldName": "journeyDuration",
            "formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
            "groupBy": "caseId"
          },
          {
            "fieldName": "touchpointCount",
            "formula": "COUNT(*)",
            "groupBy": "caseId"
          }
        ]
      }
    }
  ],
  "triggers": {
    "automatic": false,
    "schedule": null,
    "onDataUpdate": true
  }
}

Respuesta

Devuelve 201 Created con el objeto completo del pipeline incluyendo IDs generados y marcas de tiempo.

Actualizar Pipeline

PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Actualiza la configuración, etapas o disparadores de un pipeline existente. Los cambios entran en vigor en la siguiente ejecución.

Cuerpo de la Solicitud

{
  "pipelineName": "Updated Customer Journey Enrichment",
  "pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
  "status": "Active",
  "triggers": {
    "automatic": true,
    "schedule": "0 3 * * *",
    "onDataUpdate": true
  }
}

Respuesta

Devuelve el objeto actualizado del pipeline con la misma estructura que el endpoint GET.

Eliminar Pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Elimina permanentemente un pipeline y todo su historial de ejecuciones. Esta operación no puede deshacerse y detendrá cualquier ejecución en curso.

Códigos de Respuesta

  • 204 No Content - Pipeline eliminado exitosamente
  • 404 Not Found - Pipeline no encontrado o acceso denegado
  • 409 Conflict - El pipeline se está ejecutando y no puede eliminarse

Agregar Etapa al Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage

Agrega una nueva etapa de procesamiento a un pipeline existente. La etapa será insertada en la posición de orden especificada.

Cuerpo de la Solicitud

{
  "stageName": "Process Performance Metrics",
  "stageType": "PerformanceCalculation",
  "order": 3,
  "configuration": {
    "metrics": [
      {
        "name": "cycleTime",
        "calculation": "CASE_DURATION",
        "unit": "hours"
      },
      {
        "name": "waitTime",
        "calculation": "ACTIVITY_WAITING_TIME",
        "unit": "hours"
      }
    ],
    "aggregations": ["AVG", "MAX", "MIN", "P95"]
  }
}

Respuesta

Devuelve 201 Created con el objeto completo de la etapa incluyendo el ID generado.

Eliminar Etapa del Pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}

Elimina una etapa específica del pipeline. Las etapas subsecuentes serán reordenadas automáticamente.

Códigos de Respuesta

  • 204 No Content - Etapa eliminada exitosamente
  • 404 Not Found - Etapa no encontrada en el pipeline
  • 409 Conflict - No se puede eliminar la etapa mientras el pipeline se está ejecutando

Ejemplo: Flujo Completo del Pipeline

Este ejemplo demuestra cómo crear y gestionar un pipeline de enriquecimiento:

// 1. Crear un nuevo pipeline de enriquecimiento
const createPipeline = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      pipelineName: 'Order Processing Enrichment',
      pipelineDescription: 'Enriches order data with fulfillment metrics',
      stages: [
        {
          stageName: 'Order Validation',
          stageType: 'Validation',
          order: 1,
          configuration: {
            validateOrderId: true,
            validateCustomerId: true,
            validateAmounts: true
          }
        },
        {
          stageName: 'Fulfillment Time Calculation',
          stageType: 'TimeCalculation',
          order: 2,
          configuration: {
            startActivity: 'Order Received',
            endActivity: 'Order Shipped',
            outputField: 'fulfillmentTime'
          }
        }
      ],
      triggers: {
        automatic: true,
        onDataUpdate: true
      }
    })
  });

  return await response.json();
};

// 2. Agregar una nueva etapa al pipeline existente
const addStage = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      stageName: 'Customer Segmentation',
      stageType: 'Classification',
      order: 3,
      configuration: {
        segmentationRules: [
          {
            segment: 'VIP',
            condition: 'orderValue > 1000'
          },
          {
            segment: 'Regular',
            condition: 'orderValue <= 1000'
          }
        ]
      }
    })
  });

  return await response.json();
};

// 3. Obtener estado del pipeline
const getPipelineStatus = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  return await response.json();
};

Ejemplo en Python

import requests
import json
from datetime import datetime

class EnrichmentPipelineManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def create_pipeline(self, name, description, stages, triggers=None):
        """Crear un nuevo pipeline de enriquecimiento"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
        payload = {
            'pipelineName': name,
            'pipelineDescription': description,
            'stages': stages,
            'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_pipeline(self, pipeline_id):
        """Obtener detalles del pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_pipelines(self, status=None, page=1, page_size=20):
        """Listar todos los pipelines con filtro opcional"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
        params = {'page': page, 'pageSize': page_size}
        if status:
            params['status'] = status

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
        """Agregar una nueva etapa a un pipeline existente"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
        payload = {
            'stageName': stage_name,
            'stageType': stage_type,
            'order': order,
            'configuration': configuration
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
        """Actualizar configuración del pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        payload = {}
        if name:
            payload['pipelineName'] = name
        if description:
            payload['pipelineDescription'] = description
        if status:
            payload['status'] = status
        if triggers:
            payload['triggers'] = triggers

        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def delete_pipeline(self, pipeline_id):
        """Eliminar un pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.delete(url, headers=self.headers)
        return response.status_code == 204

# Ejemplo de uso
manager = EnrichmentPipelineManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Crear un pipeline de enriquecimiento completo
stages = [
    {
        'stageName': 'Data Quality Check',
        'stageType': 'Validation',
        'order': 1,
        'configuration': {
            'checkDuplicates': True,
            'validateTimestamps': True,
            'checkMissingValues': True
        }
    },
    {
        'stageName': 'Process Mining Metrics',
        'stageType': 'ProcessCalculation',
        'order': 2,
        'configuration': {
            'calculateCycleTime': True,
            'calculateWaitingTime': True,
            'calculateResourceUtilization': True,
            'detectBottlenecks': True
        }
    },
    {
        'stageName': 'Anomaly Detection',
        'stageType': 'AnomalyDetection',
        'order': 3,
        'configuration': {
            'algorithm': 'isolation_forest',
            'threshold': 0.1,
            'features': ['duration', 'cost', 'resourceCount']
        }
    }
]

pipeline = manager.create_pipeline(
    'Comprehensive Process Analysis',
    'End-to-end process analysis with anomaly detection',
    stages,
    {'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)

print(f"Pipeline creado: {pipeline['pipelineId']}")

# Listar todos los pipelines activos
active_pipelines = manager.list_pipelines(status='Active')
print(f"Se encontraron {active_pipelines['totalCount']} pipelines activos")