Ejecución
API de Ejecución de Trabajos
Administra y monitorea la ejecución de trabajos de minería de procesos, maneja operaciones asincrónicas y realiza el seguimiento del progreso de los trabajos en tiempo real.
Funcionalidades
Cola de Trabajos
Gestiona la cola de trabajos y prioridades.
Seguimiento de Trabajos
Realiza el seguimiento del estado y progreso de los trabajos.
Operaciones Asincrónicas
Maneja operaciones asincrónicas de larga duración.
Obtener Estado del Trabajo
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
Recupera el estado actual y detalles de cualquier trabajo de ejecución, incluyendo información de progreso, métricas de ejecución y estado de finalización.
Parámetros
| Parámetro | Tipo | Ubicación | Descripción |
|---|---|---|---|
tenantId |
GUID | Path | Identificador del tenant |
projectId |
GUID | Path | Identificador del proyecto |
jobId |
GUID | Path | Identificador del trabajo |
Respuesta
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
"status": "Running",
"priority": "High",
"progress": {
"percentage": 65,
"currentStage": "Data Processing",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8 minutes 32 seconds"
},
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "Customer Analytics Pipeline"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 records/second"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
Listar Todos los Trabajos
GET /api/{tenantId}/{projectId}/execution/jobs
Recupera una lista paginada de todos los trabajos de ejecución en el proyecto con opciones de filtrado por estado, tipo de trabajo y rangos de fechas.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
status |
string | Filtrar por estado: Queued, Running, Completed, Failed, Cancelled |
jobType |
string | Filtrar por tipo de trabajo: ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | Filtrar por prioridad: Low, Normal, High, Critical |
submittedBy |
string | Filtrar por usuario que envió el trabajo |
dateFrom |
datetime | Filtrar trabajos desde esta fecha |
dateTo |
datetime | Filtrar trabajos hasta esta fecha |
page |
integer | Número de página para paginación (por defecto: 1) |
pageSize |
integer | Número de elementos por página (por defecto: 20, máximo: 100) |
Respuesta
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"status": "Running",
"priority": "High",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "Customer Analytics Pipeline"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "Daily Sales Enrichment",
"status": "Completed",
"priority": "Normal",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 minutes",
"submittedBy": "system",
"resourceName": "Sales Data Pipeline"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Enviar Nuevo Trabajo
POST /api/{tenantId}/{projectId}/execution/job
Envía un nuevo trabajo de ejecución al sistema. El trabajo será encolado y procesado según la prioridad y disponibilidad de recursos.
Cuerpo de la Solicitud
{
"jobName": "Weekly Process Analysis",
"jobDescription": "Automated weekly analysis of process performance",
"jobType": "ProcessMining",
"priority": "Normal",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
Respuesta
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 minutes",
"jobName": "Weekly Process Analysis",
"priority": "Normal",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
Cancelar Trabajo
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
Cancela un trabajo encolado o en ejecución. Los trabajos completados no pueden ser cancelados. Los trabajos en ejecución se detendrán de forma segura cuando sea posible.
Cuerpo de la Solicitud (Opcional)
{
"reason": "User requested cancellation",
"forceTermination": false,
"preservePartialResults": true
}
Códigos de Respuesta
200 OK- Trabajo cancelado exitosamente404 Not Found- Trabajo no encontrado409 Conflict- El trabajo ya fue completado o no puede ser cancelado
Obtener Resultados del Trabajo
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
Recupera los resultados y salidas de un trabajo de ejecución completado, incluyendo artefactos generados, reportes y archivos de datos.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
format |
string | Formato de respuesta: summary, detailed, download (por defecto: summary) |
includeArtifacts |
boolean | Incluir artefactos descargables en la respuesta (por defecto: true) |
outputType |
string | Filtrar por tipo de salida: reports, data, models, visualizations |
Respuesta
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 minutes",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "Customer Journey Analysis Report",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "Customer Journey Data Enhanced",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "Customer Journey Process Map",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "Journey Prediction Model",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 minutes",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "Some timestamps had to be inferred",
"count": 125
}
]
}
}
Reintentar Trabajo Fallido
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
Reintenta la ejecución de un trabajo fallido con modificaciones opcionales en los parámetros. El trabajo será reencolado con la misma o actualizada configuración.
Cuerpo de la Solicitud
{
"retryReason": "Infrastructure issue resolved",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "High",
"immediateExecution": false
}
Respuesta
Devuelve 200 OK con un nuevo objeto de trabajo que contiene el ID actualizado y la información del reintento.
Obtener Estado del Sistema
GET /api/{tenantId}/execution/system/status
Recupera el estado actual del sistema a nivel global incluyendo utilización de recursos, salud de la cola y métricas de rendimiento.
Respuesta
{
"systemStatus": "Healthy",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "Active",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "Active",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 minutes",
"estimatedProcessingTime": "23 minutes"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 minutes",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
Ejemplo: Flujo Completo de Gestión de Trabajos
Este ejemplo demuestra cómo enviar un trabajo, monitorear su progreso y obtener los resultados:
// 1. Enviar un nuevo trabajo
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: 'Customer Behavior Analysis',
jobDescription: 'Weekly analysis of customer interaction patterns',
jobType: 'ProcessMining',
priority: 'High',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. Monitorear el progreso del trabajo
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`Trabajo ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`Etapa actual: ${job.progress.currentStage}`);
console.log(`ETA: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // Comprobar cada 30 segundos
} else if (job.status === 'Completed') {
console.log('¡Trabajo completado exitosamente!');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('Trabajo falló:', job.error);
}
};
await checkStatus();
};
// 3. Obtener resultados del trabajo
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Resultados del Trabajo:', results.summary);
console.log('Salida Principal:', results.results.primaryOutput.downloadUrl);
// Descargar salidas adicionales
for (const output of results.results.additionalOutputs) {
console.log(`Descargar ${output.type}: ${output.downloadUrl}`);
}
return results;
};
// 4. Obtener estado del sistema
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`Estado del Sistema: ${status.systemStatus}`);
console.log(`Cola: ${status.queueStatistics.totalQueuedJobs} trabajos en espera`);
console.log(`Tiempo promedio de espera: ${status.queueStatistics.averageWaitTime}`);
return status;
};
// Ejecutar el flujo
submitJob()
.then(job => {
console.log(`Trabajo enviado: ${job.jobId}`);
console.log(`Posición en cola: ${job.queuePosition}`);
console.log(`Inicio estimado: ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('Error en el flujo de trabajo:', error));
Ejemplo en Python
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""Enviar un nuevo trabajo de ejecución"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""Obtener el estado actual del trabajo"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""Listar trabajos con filtros opcionales"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""Esperar a que el trabajo se complete con revisiones periódicas del estado"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"Trabajo {job_id}: {job['status']} ({job['progress']['percentage']}%)")
print(f" Etapa actual: {job['progress']['currentStage']}")
print(f" Tiempo transcurrido: {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"El trabajo {job_id} no se completó en {timeout} segundos")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""Obtener resultados de la ejecución del trabajo"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="User cancellation", force=False):
"""Cancelar un trabajo en ejecución o en cola"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
"""Reintentar un trabajo fallido"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""Obtener estado de ejecución del sistema a nivel general"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# Ejemplo de uso
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Verificar estado del sistema
system_status = manager.get_system_status()
print(f"Estado del Sistema: {system_status['systemStatus']}")
print(f"Trabajos en cola: {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"Tiempo promedio de espera: {system_status['queueStatistics']['averageWaitTime']}")
# Enviar un trabajo completo de minería de procesos
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'Monthly Process Analytics',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'High'
)
print(f"Trabajo enviado: {job['jobId']}")
print(f"Posición en cola: {job['queuePosition']}")
print(f"Inicio estimado: {job['estimatedStartTime']}")
# Esperar la finalización
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# Obtener resultados detallados
results = manager.get_job_results(job['jobId'])
print("¡Trabajo completado exitosamente!")
print(f"Registros procesados: {results['summary']['recordsProcessed']:,}")
print(f"Puntaje de calidad de datos: {results['summary']['dataQualityScore']}")
print(f"Eficiencia de procesamiento: {results['summary']['processingEfficiency']}%")
# Descargar reporte principal
print(f"Descargar reporte: {results['results']['primaryOutput']['downloadUrl']}")
# Listar todas las salidas adicionales
for output in results['results']['additionalOutputs']:
print(f"Descargar {output['type']}: {output['downloadUrl']}")
else:
print(f"El trabajo falló con estado: {final_job['status']}")
if 'error' in final_job:
print(f"Error: {final_job['error']}")
except Exception as e:
print(f"Error en el flujo de ejecución: {e}")