Manage Execution Queue
View and manage the job execution queue, set priorities, and control job scheduling.
Get Queue Status
GET /api/{tenantId}/{projectId}/execution/queue
Retrieves the current status of the execution queue including queued jobs, their priorities, and estimated processing times.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
tenantId |
GUID | Path | The tenant identifier |
projectId |
GUID | Path | The project identifier |
Query Parameters
| Parameter | Type | Description |
|---|---|---|
priority |
string | Filter by priority: Critical, High, Normal, Low |
jobType |
string | Filter by job type: ProcessMining, DataEnrichment, Notebook, Analysis |
includeEstimates |
boolean | Include detailed timing estimates (default: true) |
Response
{
"queueStatus": "Active",
"timestamp": "2024-01-20T10:45:00Z",
"summary": {
"totalQueuedJobs": 23,
"criticalPriorityJobs": 2,
"highPriorityJobs": 7,
"normalPriorityJobs": 12,
"lowPriorityJobs": 2,
"averageWaitTime": "8.5 minutes",
"estimatedProcessingTime": "47 minutes"
},
"processingCapacity": {
"activeWorkers": 4,
"totalWorkers": 6,
"currentLoad": 67,
"maxConcurrentJobs": 8,
"currentlyRunning": 3
},
"queuedJobs": [
{
"jobId": "ff0e8400-e29b-41d4-a716-446655440000",
"jobName": "Customer Analytics Pipeline",
"jobType": "ProcessMining",
"priority": "Critical",
"queuePosition": 1,
"estimatedStartTime": "2024-01-20T10:47:00Z",
"estimatedDuration": "12-15 minutes",
"submittedBy": "user456",
"dateSubmitted": "2024-01-20T10:44:00Z",
"resourceRequirements": {
"cpuUnits": 2,
"memoryGB": 4,
"estimatedDiskUsage": "1.2 GB"
}
},
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"priority": "High",
"queuePosition": 2,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedDuration": "8-10 minutes",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
],
"performanceMetrics": {
"averageJobDuration": "16.3 minutes",
"throughputLastHour": 12,
"queueTrends": {
"currentHourSubmissions": 8,
"peakHourToday": "09:00-10:00",
"averageQueueSize": 15.7
}
}
}
Get Jobs by Priority
GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}
Retrieves jobs in the queue filtered by specific priority level with detailed position and timing information.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
priority |
string | Path | Priority level: Critical, High, Normal, Low |
Response
{
"priority": "High",
"jobCount": 7,
"averageWaitTime": "6.2 minutes",
"estimatedProcessingTime": "31 minutes",
"jobs": [
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"queuePosition": 2,
"overallQueuePosition": 3,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedCompletion": "2024-01-20T11:12:00Z",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"waitTime": "15 minutes",
"dependencies": [],
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
]
}
Change Job Priority
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority
Updates the priority of a queued job, which may change its position in the queue and estimated start time.
Request Body
{
"newPriority": "Critical",
"reason": "Business critical analysis required urgently",
"notifyUser": true
}
Response
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"previousPriority": "High",
"newPriority": "Critical",
"previousQueuePosition": 3,
"newQueuePosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:47:00Z",
"timeSaved": "15 minutes",
"updatedBy": "user123",
"updateTime": "2024-01-20T10:46:00Z"
}
Move Job Position
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position
Manually adjusts a job's position within its priority tier. Position changes are limited to the same priority level.
Request Body
{
"newPosition": 1,
"reason": "Dependencies resolved, can execute earlier",
"respectPriorityBoundaries": true
}
Response
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"priority": "High",
"previousPosition": 3,
"newPosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:55:00Z",
"affectedJobs": [
{
"jobId": "11fe8400-e29b-41d4-a716-446655440000",
"newPosition": 2,
"newEstimatedStart": "2024-01-20T11:05:00Z"
}
],
"updateTime": "2024-01-20T10:46:00Z"
}
Control Queue Processing
POST /api/{tenantId}/{projectId}/execution/queue/control
Pauses or resumes queue processing for maintenance or emergency situations. Running jobs continue but no new jobs will start when paused.
Request Body
{
"action": "pause",
"reason": "System maintenance window",
"duration": 30,
"allowRunningJobsToComplete": true,
"notifyUsers": true,
"scheduledResume": "2024-01-20T12:00:00Z"
}
Response
{
"action": "pause",
"status": "Paused",
"pausedAt": "2024-01-20T10:47:00Z",
"scheduledResume": "2024-01-20T12:00:00Z",
"affectedJobs": 23,
"runningJobsCount": 3,
"estimatedDelayMinutes": 30,
"reason": "System maintenance window",
"pausedBy": "admin123"
}
Get Queue History
GET /api/{tenantId}/{projectId}/execution/queue/history
Retrieves historical queue performance data and metrics for analysis and optimization purposes.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
dateFrom |
datetime | Start date for historical data |
dateTo |
datetime | End date for historical data |
aggregation |
string | Data aggregation level: hour, day, week (default: hour) |
metrics |
string | Comma-separated metrics: queue_size, wait_time, throughput, efficiency |
Response
{
"period": {
"startDate": "2024-01-19T00:00:00Z",
"endDate": "2024-01-20T10:47:00Z",
"aggregation": "hour"
},
"summary": {
"totalJobsProcessed": 847,
"averageQueueSize": 12.3,
"averageWaitTime": "7.8 minutes",
"peakQueueSize": 45,
"peakWaitTime": "23 minutes",
"throughputPerHour": 24.8,
"efficiency": 87.2
},
"hourlyData": [
{
"timestamp": "2024-01-20T09:00:00Z",
"queueSize": {
"average": 18,
"peak": 25,
"minimum": 8
},
"waitTime": {
"average": "9.5 minutes",
"maximum": "18 minutes",
"minimum": "2 minutes"
},
"throughput": {
"jobsCompleted": 28,
"jobsSubmitted": 31,
"efficiency": 89.3
},
"priorityDistribution": {
"critical": 2,
"high": 8,
"normal": 14,
"low": 1
}
}
],
"trends": {
"queueSizeGrowth": -2.3,
"waitTimeImprovement": 5.7,
"throughputIncrease": 12.1,
"efficiencyChange": 3.4
},
"bottlenecks": [
{
"timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
"issue": "High memory usage jobs accumulated",
"impact": "15 minute delay",
"resolution": "Additional worker allocated"
}
]
}
Cancel User's Queued Jobs
DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}
Cancels all queued jobs submitted by a specific user. Running jobs by the user will continue to completion.
Request Body (Optional)
{
"reason": "User account deactivated",
"notifyUser": false,
"cancelJobTypes": ["ProcessMining", "DataEnrichment"],
"excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}
Response
{
"userId": "user123",
"cancelledJobsCount": 5,
"preservedJobsCount": 2,
"cancelledJobs": [
{
"jobId": "job1-guid",
"jobName": "Weekly Analysis",
"priority": "Normal",
"queuePosition": 8
}
],
"preservedJobs": [
{
"jobId": "important-job-id-1",
"jobName": "Critical Business Report",
"reason": "Explicitly excluded"
}
],
"cancelledAt": "2024-01-20T10:47:00Z",
"cancelledBy": "admin123"
}
Get Queue Predictions
GET /api/{tenantId}/{projectId}/execution/queue/predictions
Provides AI-powered predictions for queue behavior, optimal submission times, and resource allocation recommendations.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
horizon |
integer | Prediction horizon in hours (1-24, default: 4) |
jobType |
string | Predict for specific job type |
includeRecommendations |
boolean | Include optimization recommendations (default: true) |
Response
{
"predictionTime": "2024-01-20T10:47:00Z",
"horizon": 4,
"predictions": {
"queueSizeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"expectedQueueSize": 18,
"confidence": 0.87
},
{
"time": "2024-01-20T12:00:00Z",
"expectedQueueSize": 12,
"confidence": 0.82
}
],
"waitTimeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"averageWaitTime": "6.5 minutes",
"confidence": 0.85
}
],
"resourceUtilization": [
{
"time": "2024-01-20T11:00:00Z",
"cpuUtilization": 78,
"memoryUtilization": 65,
"efficiency": 89.2
}
]
},
"recommendations": {
"optimalSubmissionTimes": [
{
"timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
"expectedWaitTime": "3-5 minutes",
"reason": "Low queue activity period"
}
],
"resourceOptimization": [
{
"recommendation": "Add 1 additional worker node",
"expectedImprovement": "25% reduction in wait times",
"cost": "Low",
"priority": "Medium"
}
],
"jobScheduling": [
{
"jobType": "ProcessMining",
"recommendation": "Schedule during off-peak hours (14:00-16:00)",
"reason": "Memory-intensive jobs perform better with less contention"
}
]
},
"modelInfo": {
"modelVersion": "2.1.3",
"lastTrained": "2024-01-19T02:00:00Z",
"accuracy": 0.84,
"dataPoints": 10080
}
}
Example: Queue Management Workflow
This example demonstrates monitoring and managing the job queue:
// 1. Get current queue status
const getQueueStatus = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const queue = await response.json();
console.log(`Queue Status: ${queue.queueStatus}`);
console.log(`Total jobs: ${queue.summary.totalQueuedJobs}`);
console.log(`Average wait time: ${queue.summary.averageWaitTime}`);
return queue;
};
// 2. Change job priority if needed
const updateJobPriority = async (jobId, newPriority, reason) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
newPriority: newPriority,
reason: reason,
notifyUser: true
})
});
const result = await response.json();
console.log(`Job ${jobId} priority changed from ${result.previousPriority} to ${result.newPriority}`);
console.log(`New position: ${result.newQueuePosition} (was ${result.previousQueuePosition})`);
console.log(`Time saved: ${result.timeSaved}`);
return result;
};
// 3. Get queue predictions for optimization
const getQueuePredictions = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const predictions = await response.json();
console.log('Queue Predictions:');
predictions.predictions.queueSizeProjection.forEach(prediction => {
console.log(` ${prediction.time}: ${prediction.expectedQueueSize} jobs (${Math.round(prediction.confidence * 100)}% confidence)`);
});
console.log('Recommendations:');
predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
console.log(` Submit during: ${rec.timeWindow} (${rec.expectedWaitTime} wait)`);
});
return predictions;
};
// 4. Monitor queue for specific job
const monitorJobInQueue = async (jobId) => {
const checkQueue = async () => {
const queue = await getQueueStatus();
const job = queue.queuedJobs.find(j => j.jobId === jobId);
if (job) {
console.log(`Job ${jobId} is at position ${job.queuePosition}`);
console.log(`Estimated start: ${job.estimatedStartTime}`);
console.log(`Estimated duration: ${job.estimatedDuration}`);
// Check again in 2 minutes
setTimeout(() => checkQueue(), 120000);
} else {
console.log(`Job ${jobId} is no longer in queue (likely started or cancelled)`);
}
};
await checkQueue();
};
// 5. Emergency queue management
const pauseQueue = async (reason, duration) => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
action: 'pause',
reason: reason,
duration: duration,
allowRunningJobsToComplete: true,
notifyUsers: true
})
});
const result = await response.json();
console.log(`Queue paused: ${result.status}`);
console.log(`${result.affectedJobs} jobs affected`);
console.log(`Estimated delay: ${result.estimatedDelayMinutes} minutes`);
return result;
};
// Execute queue management workflow
getQueueStatus()
.then(queue => {
console.log('Current queue status retrieved');
// Check if queue is getting long
if (queue.summary.totalQueuedJobs > 30) {
console.log('Queue is getting long, checking predictions...');
return getQueuePredictions();
}
return null;
})
.then(predictions => {
if (predictions) {
console.log('Queue predictions retrieved');
// If predictions show continued growth, consider resource optimization
const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
if (futureQueueSize.expectedQueueSize > 25) {
console.log('Consider implementing resource optimization recommendations');
predictions.recommendations.resourceOptimization.forEach(rec => {
console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
});
}
}
})
.catch(error => console.error('Queue management failed:', error));
Python Example
import requests
import time
import json
from datetime import datetime, timedelta
class QueueManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
"""Get current queue status"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
params = {'includeEstimates': str(include_estimates).lower()}
if priority:
params['priority'] = priority
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def get_jobs_by_priority(self, priority):
"""Get jobs filtered by priority level"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
response = requests.get(url, headers=self.headers)
return response.json()
def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
"""Change priority of a queued job"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
payload = {
'newPriority': new_priority,
'reason': reason,
'notifyUser': notify_user
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def move_job_position(self, job_id, new_position, reason):
"""Move job to new position within its priority tier"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
payload = {
'newPosition': new_position,
'reason': reason,
'respectPriorityBoundaries': True
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def control_queue(self, action, reason, duration=None, scheduled_resume=None):
"""Pause or resume queue processing"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/control"
payload = {
'action': action,
'reason': reason,
'allowRunningJobsToComplete': True,
'notifyUsers': True
}
if duration:
payload['duration'] = duration
if scheduled_resume:
payload['scheduledResume'] = scheduled_resume.isoformat()
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
"""Get historical queue performance data"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
params = {
'dateFrom': date_from.isoformat(),
'dateTo': date_to.isoformat(),
'aggregation': aggregation
}
if metrics:
params['metrics'] = ','.join(metrics)
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
"""Cancel all queued jobs for a specific user"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
payload = {
'reason': reason,
'notifyUser': False
}
if job_types:
payload['cancelJobTypes'] = job_types
if exclude_job_ids:
payload['excludeJobIds'] = exclude_job_ids
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
"""Get AI-powered queue predictions"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
params = {
'horizon': horizon,
'includeRecommendations': str(include_recommendations).lower()
}
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def monitor_queue_health(self, alert_threshold=30, check_interval=300):
"""Continuously monitor queue health and alert on issues"""
while True:
try:
queue_status = self.get_queue_status()
total_jobs = queue_status['summary']['totalQueuedJobs']
avg_wait = queue_status['summary']['averageWaitTime']
print(f"Queue Health Check: {total_jobs} jobs, avg wait: {avg_wait}")
if total_jobs > alert_threshold:
print(f"ALERT: Queue size ({total_jobs}) exceeds threshold ({alert_threshold})")
# Get predictions to understand if this will improve
predictions = self.get_queue_predictions()
future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']
if future_size > total_jobs:
print("WARNING: Queue expected to grow further")
print("Resource optimization recommendations:")
for rec in predictions['recommendations']['resourceOptimization']:
print(f" - {rec['recommendation']}: {rec['expectedImprovement']}")
time.sleep(check_interval)
except Exception as e:
print(f"Queue monitoring error: {e}")
time.sleep(60)
# Usage example
manager = QueueManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Get comprehensive queue status
queue_status = manager.get_queue_status(include_estimates=True)
print(f"Queue Status: {queue_status['queueStatus']}")
print(f"Total jobs in queue: {queue_status['summary']['totalQueuedJobs']}")
print(f"Average wait time: {queue_status['summary']['averageWaitTime']}")
print(f"Processing capacity: {queue_status['processingCapacity']['currentLoad']}%")
# Check high priority jobs specifically
high_priority_jobs = manager.get_jobs_by_priority('High')
print(f"High priority jobs: {high_priority_jobs['jobCount']}")
# Get queue predictions for the next 4 hours
predictions = manager.get_queue_predictions(horizon=4)
print("Queue predictions:")
for pred in predictions['predictions']['queueSizeProjection']:
confidence_pct = round(pred['confidence'] * 100)
print(f" {pred['time']}: {pred['expectedQueueSize']} jobs ({confidence_pct}% confidence)")
# Check recommendations
if predictions['recommendations']['optimalSubmissionTimes']:
print("Optimal submission times:")
for rec in predictions['recommendations']['optimalSubmissionTimes']:
print(f" {rec['timeWindow']}: {rec['expectedWaitTime']} wait time")
# Example: Elevate a job priority if needed
if queue_status['summary']['totalQueuedJobs'] > 20:
# Find a normal priority job to elevate
normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
if normal_jobs:
job_to_elevate = normal_jobs[0]
result = manager.change_job_priority(
job_to_elevate['jobId'],
'High',
'Queue congestion - elevating business critical job'
)
print(f"Elevated job {job_to_elevate['jobName']} to High priority")
print(f"New position: {result['newQueuePosition']} (was {result['previousPosition']})")
# Get queue history for analysis
history = manager.get_queue_history(
datetime.now() - timedelta(hours=24),
datetime.now(),
'hour',
['queue_size', 'wait_time', 'throughput']
)
print(f"24h summary: {history['summary']['totalJobsProcessed']} jobs processed")
print(f"Peak queue size: {history['summary']['peakQueueSize']}")
print(f"Average throughput: {history['summary']['throughputPerHour']} jobs/hour")
# If there are bottlenecks, report them
if history['bottlenecks']:
print("Recent bottlenecks:")
for bottleneck in history['bottlenecks']:
print(f" {bottleneck['timeframe']}: {bottleneck['issue']} (Impact: {bottleneck['impact']})")
except Exception as e:
print(f"Error in queue management: {e}")