Ejecución

API de Ejecución de Trabajos

Administra y monitorea la ejecución de trabajos de minería de procesos, maneja operaciones asincrónicas y realiza el seguimiento del progreso de los trabajos en tiempo real.

Funcionalidades

Cola de Trabajos

Gestiona la cola de trabajos y prioridades.

Ver Cola

Seguimiento de Trabajos

Realiza el seguimiento del estado y progreso de los trabajos.

Seguir Trabajos

Operaciones Asincrónicas

Maneja operaciones asincrónicas de larga duración.

Operaciones Asincrónicas

Obtener Estado del Trabajo

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Recupera el estado actual y detalles de cualquier trabajo de ejecución, incluyendo información de progreso, métricas de ejecución y estado de finalización.

Parámetros

Parámetro Tipo Ubicación Descripción
tenantId GUID Path Identificador del tenant
projectId GUID Path Identificador del proyecto
jobId GUID Path Identificador del trabajo

Respuesta

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 minutes 32 seconds"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Customer Analytics Pipeline"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 records/second"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

Listar Todos los Trabajos

GET /api/{tenantId}/{projectId}/execution/jobs

Recupera una lista paginada de todos los trabajos de ejecución en el proyecto con opciones de filtrado por estado, tipo de trabajo y rangos de fechas.

Parámetros de Consulta

Parámetro Tipo Descripción
status string Filtrar por estado: Queued, Running, Completed, Failed, Cancelled
jobType string Filtrar por tipo de trabajo: ProcessMining, DataEnrichment, Notebook, Analysis
priority string Filtrar por prioridad: Low, Normal, High, Critical
submittedBy string Filtrar por usuario que envió el trabajo
dateFrom datetime Filtrar trabajos desde esta fecha
dateTo datetime Filtrar trabajos hasta esta fecha
page integer Número de página para paginación (por defecto: 1)
pageSize integer Número de elementos por página (por defecto: 20, máximo: 100)

Respuesta

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Customer Analytics Pipeline"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 minutes",
      "submittedBy": "system",
      "resourceName": "Sales Data Pipeline"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Enviar Nuevo Trabajo

POST /api/{tenantId}/{projectId}/execution/job

Envía un nuevo trabajo de ejecución al sistema. El trabajo será encolado y procesado según la prioridad y disponibilidad de recursos.

Cuerpo de la Solicitud

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Automated weekly analysis of process performance",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Respuesta

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 minutes",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

Cancelar Trabajo

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Cancela un trabajo encolado o en ejecución. Los trabajos completados no pueden ser cancelados. Los trabajos en ejecución se detendrán de forma segura cuando sea posible.

Cuerpo de la Solicitud (Opcional)

{
  "reason": "User requested cancellation",
  "forceTermination": false,
  "preservePartialResults": true
}

Códigos de Respuesta

  • 200 OK - Trabajo cancelado exitosamente
  • 404 Not Found - Trabajo no encontrado
  • 409 Conflict - El trabajo ya fue completado o no puede ser cancelado

Obtener Resultados del Trabajo

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Recupera los resultados y salidas de un trabajo de ejecución completado, incluyendo artefactos generados, reportes y archivos de datos.

Parámetros de Consulta

Parámetro Tipo Descripción
format string Formato de respuesta: summary, detailed, download (por defecto: summary)
includeArtifacts boolean Incluir artefactos descargables en la respuesta (por defecto: true)
outputType string Filtrar por tipo de salida: reports, data, models, visualizations

Respuesta

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 minutes",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 minutes",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Some timestamps had to be inferred",
        "count": 125
      }
    ]
  }
}

Reintentar Trabajo Fallido

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Reintenta la ejecución de un trabajo fallido con modificaciones opcionales en los parámetros. El trabajo será reencolado con la misma o actualizada configuración.

Cuerpo de la Solicitud

{
  "retryReason": "Infrastructure issue resolved",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Respuesta

Devuelve 200 OK con un nuevo objeto de trabajo que contiene el ID actualizado y la información del reintento.

Obtener Estado del Sistema

GET /api/{tenantId}/execution/system/status

Recupera el estado actual del sistema a nivel global incluyendo utilización de recursos, salud de la cola y métricas de rendimiento.

Respuesta

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 minutes",
    "estimatedProcessingTime": "23 minutes"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 minutes",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Ejemplo: Flujo Completo de Gestión de Trabajos

Este ejemplo demuestra cómo enviar un trabajo, monitorear su progreso y obtener los resultados:

// 1. Enviar un nuevo trabajo
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Weekly analysis of customer interaction patterns',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. Monitorear el progreso del trabajo
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`Trabajo ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Etapa actual: ${job.progress.currentStage}`);
    console.log(`ETA: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Comprobar cada 30 segundos
    } else if (job.status === 'Completed') {
      console.log('¡Trabajo completado exitosamente!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('Trabajo falló:', job.error);
    }
  };

  await checkStatus();
};

// 3. Obtener resultados del trabajo
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Resultados del Trabajo:', results.summary);
  console.log('Salida Principal:', results.results.primaryOutput.downloadUrl);

  // Descargar salidas adicionales
  for (const output of results.results.additionalOutputs) {
    console.log(`Descargar ${output.type}: ${output.downloadUrl}`);
  }

  return results;
};

// 4. Obtener estado del sistema
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`Estado del Sistema: ${status.systemStatus}`);
  console.log(`Cola: ${status.queueStatistics.totalQueuedJobs} trabajos en espera`);
  console.log(`Tiempo promedio de espera: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// Ejecutar el flujo
submitJob()
  .then(job => {
    console.log(`Trabajo enviado: ${job.jobId}`);
    console.log(`Posición en cola: ${job.queuePosition}`);
    console.log(`Inicio estimado: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('Error en el flujo de trabajo:', error));

Ejemplo en Python

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Enviar un nuevo trabajo de ejecución"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Obtener el estado actual del trabajo"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """Listar trabajos con filtros opcionales"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """Esperar a que el trabajo se complete con revisiones periódicas del estado"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"Trabajo {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Etapa actual: {job['progress']['currentStage']}")
            print(f"  Tiempo transcurrido: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"El trabajo {job_id} no se completó en {timeout} segundos")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """Obtener resultados de la ejecución del trabajo"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="User cancellation", force=False):
        """Cancelar un trabajo en ejecución o en cola"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
        """Reintentar un trabajo fallido"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Obtener estado de ejecución del sistema a nivel general"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Ejemplo de uso
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Verificar estado del sistema
    system_status = manager.get_system_status()
    print(f"Estado del Sistema: {system_status['systemStatus']}")
    print(f"Trabajos en cola: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Tiempo promedio de espera: {system_status['queueStatistics']['averageWaitTime']}")

    # Enviar un trabajo completo de minería de procesos
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Trabajo enviado: {job['jobId']}")
    print(f"Posición en cola: {job['queuePosition']}")
    print(f"Inicio estimado: {job['estimatedStartTime']}")

    # Esperar la finalización
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Obtener resultados detallados
        results = manager.get_job_results(job['jobId'])

        print("¡Trabajo completado exitosamente!")
        print(f"Registros procesados: {results['summary']['recordsProcessed']:,}")
        print(f"Puntaje de calidad de datos: {results['summary']['dataQualityScore']}")
        print(f"Eficiencia de procesamiento: {results['summary']['processingEfficiency']}%")

        # Descargar reporte principal
        print(f"Descargar reporte: {results['results']['primaryOutput']['downloadUrl']}")

        # Listar todas las salidas adicionales
        for output in results['results']['additionalOutputs']:
            print(f"Descargar {output['type']}: {output['downloadUrl']}")

    else:
        print(f"El trabajo falló con estado: {final_job['status']}")
        if 'error' in final_job:
            print(f"Error: {final_job['error']}")

except Exception as e:
    print(f"Error en el flujo de ejecución: {e}")