Cola de Trabajo
Gestionar la Cola de Ejecución
Visualice y gestione la cola de ejecución de trabajos, establezca prioridades y controle la programación de trabajos.
Obtener Estado de la Cola
GET /api/{tenantId}/{projectId}/execution/queue
Recupera el estado actual de la cola de ejecución, incluyendo los trabajos en cola, sus prioridades y tiempos estimados de procesamiento.
Parámetros
| Parámetro | Tipo | Ubicación | Descripción |
|---|---|---|---|
tenantId |
GUID | Ruta | El identificador del tenant |
projectId |
GUID | Ruta | El identificador del proyecto |
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
priority |
string | Filtrar por prioridad: Critical, High, Normal, Low |
jobType |
string | Filtrar por tipo de trabajo: ProcessMining, DataEnrichment, Notebook, Analysis |
includeEstimates |
boolean | Incluir estimaciones detalladas de tiempo (por defecto: true) |
Respuesta
{
"queueStatus": "Active",
"timestamp": "2024-01-20T10:45:00Z",
"summary": {
"totalQueuedJobs": 23,
"criticalPriorityJobs": 2,
"highPriorityJobs": 7,
"normalPriorityJobs": 12,
"lowPriorityJobs": 2,
"averageWaitTime": "8.5 minutes",
"estimatedProcessingTime": "47 minutes"
},
"processingCapacity": {
"activeWorkers": 4,
"totalWorkers": 6,
"currentLoad": 67,
"maxConcurrentJobs": 8,
"currentlyRunning": 3
},
"queuedJobs": [
{
"jobId": "ff0e8400-e29b-41d4-a716-446655440000",
"jobName": "Customer Analytics Pipeline",
"jobType": "ProcessMining",
"priority": "Critical",
"queuePosition": 1,
"estimatedStartTime": "2024-01-20T10:47:00Z",
"estimatedDuration": "12-15 minutes",
"submittedBy": "user456",
"dateSubmitted": "2024-01-20T10:44:00Z",
"resourceRequirements": {
"cpuUnits": 2,
"memoryGB": 4,
"estimatedDiskUsage": "1.2 GB"
}
},
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"priority": "High",
"queuePosition": 2,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedDuration": "8-10 minutes",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
],
"performanceMetrics": {
"averageJobDuration": "16.3 minutes",
"throughputLastHour": 12,
"queueTrends": {
"currentHourSubmissions": 8,
"peakHourToday": "09:00-10:00",
"averageQueueSize": 15.7
}
}
}
Obtener Trabajos por Prioridad
GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}
Recupera los trabajos en la cola filtrados por un nivel específico de prioridad con información detallada sobre posición y tiempos.
Parámetros
| Parámetro | Tipo | Ubicación | Descripción |
|---|---|---|---|
priority |
string | Ruta | Nivel de prioridad: Critical, High, Normal, Low |
Respuesta
{
"priority": "High",
"jobCount": 7,
"averageWaitTime": "6.2 minutes",
"estimatedProcessingTime": "31 minutes",
"jobs": [
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"queuePosition": 2,
"overallQueuePosition": 3,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedCompletion": "2024-01-20T11:12:00Z",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"waitTime": "15 minutes",
"dependencies": [],
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
]
}
Cambiar Prioridad de Trabajo
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority
Actualiza la prioridad de un trabajo en cola, lo que puede cambiar su posición en la cola y su hora estimada de inicio.
Cuerpo de Solicitud
{
"newPriority": "Critical",
"reason": "Business critical analysis required urgently",
"notifyUser": true
}
Respuesta
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"previousPriority": "High",
"newPriority": "Critical",
"previousQueuePosition": 3,
"newQueuePosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:47:00Z",
"timeSaved": "15 minutes",
"updatedBy": "user123",
"updateTime": "2024-01-20T10:46:00Z"
}
Mover Posición de Trabajo
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position
Ajusta manualmente la posición de un trabajo dentro de su nivel de prioridad. Los cambios de posición están limitados al mismo nivel de prioridad.
Cuerpo de Solicitud
{
"newPosition": 1,
"reason": "Dependencies resolved, can execute earlier",
"respectPriorityBoundaries": true
}
Respuesta
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"priority": "High",
"previousPosition": 3,
"newPosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:55:00Z",
"affectedJobs": [
{
"jobId": "11fe8400-e29b-41d4-a716-446655440000",
"newPosition": 2,
"newEstimatedStart": "2024-01-20T11:05:00Z"
}
],
"updateTime": "2024-01-20T10:46:00Z"
}
Controlar Procesamiento de Cola
POST /api/{tenantId}/{projectId}/execution/queue/control
Pausa o reanuda el procesamiento de la cola para mantenimiento o situaciones de emergencia. Los trabajos en ejecución continúan, pero no se iniciarán trabajos nuevos mientras esté pausado.
Cuerpo de Solicitud
{
"action": "pause",
"reason": "System maintenance window",
"duration": 30,
"allowRunningJobsToComplete": true,
"notifyUsers": true,
"scheduledResume": "2024-01-20T12:00:00Z"
}
Respuesta
{
"action": "pause",
"status": "Paused",
"pausedAt": "2024-01-20T10:47:00Z",
"scheduledResume": "2024-01-20T12:00:00Z",
"affectedJobs": 23,
"runningJobsCount": 3,
"estimatedDelayMinutes": 30,
"reason": "System maintenance window",
"pausedBy": "admin123"
}
Obtener Historial de la Cola
GET /api/{tenantId}/{projectId}/execution/queue/history
Recupera datos y métricas históricas del rendimiento de la cola para análisis y optimización.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
dateFrom |
datetime | Fecha de inicio para los datos históricos |
dateTo |
datetime | Fecha de fin para los datos históricos |
aggregation |
string | Nivel de agregación de datos: hour, day, week (por defecto: hour) |
metrics |
string | Métricas separadas por comas: queue_size, wait_time, throughput, efficiency |
Respuesta
{
"period": {
"startDate": "2024-01-19T00:00:00Z",
"endDate": "2024-01-20T10:47:00Z",
"aggregation": "hour"
},
"summary": {
"totalJobsProcessed": 847,
"averageQueueSize": 12.3,
"averageWaitTime": "7.8 minutes",
"peakQueueSize": 45,
"peakWaitTime": "23 minutes",
"throughputPerHour": 24.8,
"efficiency": 87.2
},
"hourlyData": [
{
"timestamp": "2024-01-20T09:00:00Z",
"queueSize": {
"average": 18,
"peak": 25,
"minimum": 8
},
"waitTime": {
"average": "9.5 minutes",
"maximum": "18 minutes",
"minimum": "2 minutes"
},
"throughput": {
"jobsCompleted": 28,
"jobsSubmitted": 31,
"efficiency": 89.3
},
"priorityDistribution": {
"critical": 2,
"high": 8,
"normal": 14,
"low": 1
}
}
],
"trends": {
"queueSizeGrowth": -2.3,
"waitTimeImprovement": 5.7,
"throughputIncrease": 12.1,
"efficiencyChange": 3.4
},
"bottlenecks": [
{
"timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
"issue": "High memory usage jobs accumulated",
"impact": "15 minute delay",
"resolution": "Additional worker allocated"
}
]
}
Cancelar Trabajos en Cola de un Usuario
DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}
Cancela todos los trabajos en cola enviados por un usuario específico. Los trabajos en ejecución por el usuario continuarán hasta su finalización.
Cuerpo de Solicitud (Opcional)
{
"reason": "User account deactivated",
"notifyUser": false,
"cancelJobTypes": ["ProcessMining", "DataEnrichment"],
"excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}
Respuesta
{
"userId": "user123",
"cancelledJobsCount": 5,
"preservedJobsCount": 2,
"cancelledJobs": [
{
"jobId": "job1-guid",
"jobName": "Weekly Analysis",
"priority": "Normal",
"queuePosition": 8
}
],
"preservedJobs": [
{
"jobId": "important-job-id-1",
"jobName": "Critical Business Report",
"reason": "Explicitly excluded"
}
],
"cancelledAt": "2024-01-20T10:47:00Z",
"cancelledBy": "admin123"
}
Obtener Predicciones de la Cola
GET /api/{tenantId}/{projectId}/execution/queue/predictions
Proporciona predicciones impulsadas por IA sobre el comportamiento de la cola, tiempos óptimos de envío y recomendaciones de asignación de recursos.
Parámetros de Consulta
| Parámetro | Tipo | Descripción |
|---|---|---|
horizon |
integer | Horizonte de predicción en horas (1-24, por defecto: 4) |
jobType |
string | Predecir para un tipo específico de trabajo |
includeRecommendations |
boolean | Incluir recomendaciones de optimización (por defecto: true) |
Respuesta
{
"predictionTime": "2024-01-20T10:47:00Z",
"horizon": 4,
"predictions": {
"queueSizeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"expectedQueueSize": 18,
"confidence": 0.87
},
{
"time": "2024-01-20T12:00:00Z",
"expectedQueueSize": 12,
"confidence": 0.82
}
],
"waitTimeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"averageWaitTime": "6.5 minutes",
"confidence": 0.85
}
],
"resourceUtilization": [
{
"time": "2024-01-20T11:00:00Z",
"cpuUtilization": 78,
"memoryUtilization": 65,
"efficiency": 89.2
}
]
},
"recommendations": {
"optimalSubmissionTimes": [
{
"timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
"expectedWaitTime": "3-5 minutes",
"reason": "Low queue activity period"
}
],
"resourceOptimization": [
{
"recommendation": "Add 1 additional worker node",
"expectedImprovement": "25% reduction in wait times",
"cost": "Low",
"priority": "Medium"
}
],
"jobScheduling": [
{
"jobType": "ProcessMining",
"recommendation": "Schedule during off-peak hours (14:00-16:00)",
"reason": "Memory-intensive jobs perform better with less contention"
}
]
},
"modelInfo": {
"modelVersion": "2.1.3",
"lastTrained": "2024-01-19T02:00:00Z",
"accuracy": 0.84,
"dataPoints": 10080
}
}
Ejemplo: Flujo de Trabajo de Gestión de Cola
Este ejemplo demuestra cómo monitorear y gestionar la cola de trabajos:
// 1. Obtener estado actual de la cola
const getQueueStatus = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const queue = await response.json();
console.log(`Estado de la cola: ${queue.queueStatus}`);
console.log(`Total de trabajos: ${queue.summary.totalQueuedJobs}`);
console.log(`Tiempo promedio de espera: ${queue.summary.averageWaitTime}`);
return queue;
};
// 2. Cambiar prioridad de trabajo si es necesario
const updateJobPriority = async (jobId, newPriority, reason) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
newPriority: newPriority,
reason: reason,
notifyUser: true
})
});
const result = await response.json();
console.log(`Prioridad del trabajo ${jobId} cambiada de ${result.previousPriority} a ${result.newPriority}`);
console.log(`Nueva posición: ${result.newQueuePosition} (antes ${result.previousQueuePosition})`);
console.log(`Tiempo ahorrado: ${result.timeSaved}`);
return result;
};
// 3. Obtener predicciones de la cola para optimización
const getQueuePredictions = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const predictions = await response.json();
console.log('Predicciones de la cola:');
predictions.predictions.queueSizeProjection.forEach(prediction => {
console.log(` ${prediction.time}: ${prediction.expectedQueueSize} trabajos (${Math.round(prediction.confidence * 100)}% de confianza)`);
});
console.log('Recomendaciones:');
predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
console.log(` Enviar durante: ${rec.timeWindow} (${rec.expectedWaitTime} de espera)`);
});
return predictions;
};
// 4. Monitorear trabajo específico en cola
const monitorJobInQueue = async (jobId) => {
const checkQueue = async () => {
const queue = await getQueueStatus();
const job = queue.queuedJobs.find(j => j.jobId === jobId);
if (job) {
console.log(`El trabajo ${jobId} está en la posición ${job.queuePosition}`);
console.log(`Inicio estimado: ${job.estimatedStartTime}`);
console.log(`Duración estimada: ${job.estimatedDuration}`);
// Verificar nuevamente en 2 minutos
setTimeout(() => checkQueue(), 120000);
} else {
console.log(`El trabajo ${jobId} ya no está en la cola (probablemente inició o fue cancelado)`);
}
};
await checkQueue();
};
// 5. Gestión de cola de emergencia
const pauseQueue = async (reason, duration) => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
action: 'pause',
reason: reason,
duration: duration,
allowRunningJobsToComplete: true,
notifyUsers: true
})
});
const result = await response.json();
console.log(`Cola pausada: ${result.status}`);
console.log(`${result.affectedJobs} trabajos afectados`);
console.log(`Retraso estimado: ${result.estimatedDelayMinutes} minutos`);
return result;
};
// Ejecutar flujo de gestión de cola
getQueueStatus()
.then(queue => {
console.log('Estado actual de la cola recuperado');
// Verificar si la cola está creciendo
if (queue.summary.totalQueuedJobs > 30) {
console.log('La cola está creciendo, consultando predicciones...');
return getQueuePredictions();
}
return null;
})
.then(predictions => {
if (predictions) {
console.log('Predicciones de la cola recuperadas');
// Si las predicciones muestran crecimiento continuo, considerar optimización de recursos
const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
if (futureQueueSize.expectedQueueSize > 25) {
console.log('Considerar implementar recomendaciones de optimización de recursos');
predictions.recommendations.resourceOptimization.forEach(rec => {
console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
});
}
}
})
.catch(error => console.error('Fallo en la gestión de la cola:', error));
Ejemplo en Python
import requests
import time
import json
from datetime import datetime, timedelta
class QueueManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
"""Obtener estado actual de la cola"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
params = {'includeEstimates': str(include_estimates).lower()}
if priority:
params['priority'] = priority
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def get_jobs_by_priority(self, priority):
"""Obtener trabajos filtrados por nivel de prioridad"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
response = requests.get(url, headers=self.headers)
return response.json()
def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
"""Cambiar prioridad de un trabajo en cola"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
payload = {
'newPriority': new_priority,
'reason': reason,
'notifyUser': notify_user
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def move_job_position(self, job_id, new_position, reason):
"""Mover trabajo a nueva posición dentro de su nivel de prioridad"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
payload = {
'newPosition': new_position,
'reason': reason,
'respectPriorityBoundaries': True
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def control_queue(self, action, reason, duration=None, scheduled_resume=None):
"""Pausar o reanudar el procesamiento de la cola"""
url = f"{self.base_url}/api/{self.tenantId}/{self.project_id}/execution/queue/control"
payload = {
'action': action,
'reason': reason,
'allowRunningJobsToComplete': True,
'notifyUsers': True
}
if duration:
payload['duration'] = duration
if scheduled_resume:
payload['scheduledResume'] = scheduled_resume.isoformat()
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
"""Obtener datos históricos de rendimiento de la cola"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
params = {
'dateFrom': date_from.isoformat(),
'dateTo': date_to.isoformat(),
'aggregation': aggregation
}
if metrics:
params['metrics'] = ','.join(metrics)
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
"""Cancelar todos los trabajos en cola para un usuario específico"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
payload = {
'reason': reason,
'notifyUser': False
}
if job_types:
payload['cancelJobTypes'] = job_types
if exclude_job_ids:
payload['excludeJobIds'] = exclude_job_ids
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
"""Obtener predicciones de la cola impulsadas por IA"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
params = {
'horizon': horizon,
'includeRecommendations': str(include_recommendations).lower()
}
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def monitor_queue_health(self, alert_threshold=30, check_interval=300):
"""Monitorear continuamente la salud de la cola y alertar sobre problemas"""
while True:
try:
queue_status = self.get_queue_status()
total_jobs = queue_status['summary']['totalQueuedJobs']
avg_wait = queue_status['summary']['averageWaitTime']
print(f"Revisión de salud de la cola: {total_jobs} trabajos, tiempo promedio de espera: {avg_wait}")
if total_jobs > alert_threshold:
print(f"ALERTA: El tamaño de la cola ({total_jobs}) supera el umbral ({alert_threshold})")
# Obtener predicciones para ver si esto mejorará
predictions = self.get_queue_predictions()
future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']
if future_size > total_jobs:
print("ADVERTENCIA: Se espera que la cola crezca aún más")
print("Recomendaciones de optimización de recursos:")
for rec in predictions['recommendations']['resourceOptimization']:
print(f" - {rec['recommendation']}: {rec['expectedImprovement']}")
time.sleep(check_interval)
except Exception as e:
print(f"Error en monitoreo de cola: {e}")
time.sleep(60)
# Ejemplo de uso
manager = QueueManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Obtener estado completo de la cola
queue_status = manager.get_queue_status(include_estimates=True)
print(f"Estado de la cola: {queue_status['queueStatus']}")
print(f"Total de trabajos en cola: {queue_status['summary']['totalQueuedJobs']}")
print(f"Tiempo promedio de espera: {queue_status['summary']['averageWaitTime']}")
print(f"Capacidad de procesamiento: {queue_status['processingCapacity']['currentLoad']}%")
# Verificar específicamente los trabajos de alta prioridad
high_priority_jobs = manager.get_jobs_by_priority('High')
print(f"Trabajos de alta prioridad: {high_priority_jobs['jobCount']}")
# Obtener predicciones de la cola para las próximas 4 horas
predictions = manager.get_queue_predictions(horizon=4)
print("Predicciones de la cola:")
for pred in predictions['predictions']['queueSizeProjection']:
confidence_pct = round(pred['confidence'] * 100)
print(f" {pred['time']}: {pred['expectedQueueSize']} trabajos ({confidence_pct}% de confianza)")
# Consultar recomendaciones
if predictions['recommendations']['optimalSubmissionTimes']:
print("Tiempos óptimos para enviar trabajos:")
for rec in predictions['recommendations']['optimalSubmissionTimes']:
print(f" {rec['timeWindow']}: {rec['expectedWaitTime']} de espera")
# Ejemplo: Elevar la prioridad de un trabajo si es necesario
if queue_status['summary']['totalQueuedJobs'] > 20:
# Buscar trabajos de prioridad normal para elevar
normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
if normal_jobs:
job_to_elevate = normal_jobs[0]
result = manager.change_job_priority(
job_to_elevate['jobId'],
'High',
'Congestión en la cola - elevando trabajo crítico para el negocio'
)
print(f"Trabajo {job_to_elevate['jobName']} elevado a prioridad Alta")
print(f"Nueva posición: {result['newQueuePosition']} (antes {result['previousPosition']})")
# Obtener historial de la cola para análisis
history = manager.get_queue_history(
datetime.now() - timedelta(hours=24),
datetime.now(),
'hour',
['queue_size', 'wait_time', 'throughput']
)
print(f"Resumen de 24h: {history['summary']['totalJobsProcessed']} trabajos procesados")
print(f"Pico del tamaño de la cola: {history['summary']['peakQueueSize']}")
print(f"Productividad promedio: {history['summary']['throughputPerHour']} trabajos/hora")
# Si hay cuellos de botella, reportarlos
if history['bottlenecks']:
print("Cuellos de botella recientes:")
for bottleneck in history['bottlenecks']:
print(f" {bottleneck['timeframe']}: {bottleneck['issue']} (Impacto: {bottleneck['impact']})")
except Exception as e:
print(f"Error en gestión de la cola: {e}")