Queue

Manage Execution Queue

View and manage the job execution queue, set priorities, and control job scheduling.

Get Queue Status

GET /api/{tenantId}/{projectId}/execution/queue

Retrieves the current status of the execution queue including queued jobs, their priorities, and estimated processing times.

Parameters

Parameter Type Location Description
tenantId GUID Path The tenant identifier
projectId GUID Path The project identifier

Query Parameters

Parameter Type Description
priority string Filter by priority: Critical, High, Normal, Low
jobType string Filter by job type: ProcessMining, DataEnrichment, Notebook, Analysis
includeEstimates boolean Include detailed timing estimates (default: true)

Response

{
  "queueStatus": "Active",
  "timestamp": "2024-01-20T10:45:00Z",
  "summary": {
    "totalQueuedJobs": 23,
    "criticalPriorityJobs": 2,
    "highPriorityJobs": 7,
    "normalPriorityJobs": 12,
    "lowPriorityJobs": 2,
    "averageWaitTime": "8.5 minutes",
    "estimatedProcessingTime": "47 minutes"
  },
  "processingCapacity": {
    "activeWorkers": 4,
    "totalWorkers": 6,
    "currentLoad": 67,
    "maxConcurrentJobs": 8,
    "currentlyRunning": 3
  },
  "queuedJobs": [
    {
      "jobId": "ff0e8400-e29b-41d4-a716-446655440000",
      "jobName": "Customer Analytics Pipeline",
      "jobType": "ProcessMining",
      "priority": "Critical",
      "queuePosition": 1,
      "estimatedStartTime": "2024-01-20T10:47:00Z",
      "estimatedDuration": "12-15 minutes",
      "submittedBy": "user456",
      "dateSubmitted": "2024-01-20T10:44:00Z",
      "resourceRequirements": {
        "cpuUnits": 2,
        "memoryGB": 4,
        "estimatedDiskUsage": "1.2 GB"
      }
    },
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "priority": "High",
      "queuePosition": 2,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedDuration": "8-10 minutes",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ],
  "performanceMetrics": {
    "averageJobDuration": "16.3 minutes",
    "throughputLastHour": 12,
    "queueTrends": {
      "currentHourSubmissions": 8,
      "peakHourToday": "09:00-10:00",
      "averageQueueSize": 15.7
    }
  }
}

Get Jobs by Priority

GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}

Retrieves jobs in the queue filtered by specific priority level with detailed position and timing information.

Parameters

Parameter Type Location Description
priority string Path Priority level: Critical, High, Normal, Low

Response

{
  "priority": "High",
  "jobCount": 7,
  "averageWaitTime": "6.2 minutes",
  "estimatedProcessingTime": "31 minutes",
  "jobs": [
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "queuePosition": 2,
      "overallQueuePosition": 3,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedCompletion": "2024-01-20T11:12:00Z",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "waitTime": "15 minutes",
      "dependencies": [],
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ]
}

Change Job Priority

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority

Updates the priority of a queued job, which may change its position in the queue and estimated start time.

Request Body

{
  "newPriority": "Critical",
  "reason": "Business critical analysis required urgently",
  "notifyUser": true
}

Response

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "previousPriority": "High",
  "newPriority": "Critical",
  "previousQueuePosition": 3,
  "newQueuePosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:47:00Z",
  "timeSaved": "15 minutes",
  "updatedBy": "user123",
  "updateTime": "2024-01-20T10:46:00Z"
}

Move Job Position

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position

Manually adjusts a job's position within its priority tier. Position changes are limited to the same priority level.

Request Body

{
  "newPosition": 1,
  "reason": "Dependencies resolved, can execute earlier",
  "respectPriorityBoundaries": true
}

Response

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "priority": "High",
  "previousPosition": 3,
  "newPosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:55:00Z",
  "affectedJobs": [
    {
      "jobId": "11fe8400-e29b-41d4-a716-446655440000",
      "newPosition": 2,
      "newEstimatedStart": "2024-01-20T11:05:00Z"
    }
  ],
  "updateTime": "2024-01-20T10:46:00Z"
}

Control Queue Processing

POST /api/{tenantId}/{projectId}/execution/queue/control

Pauses or resumes queue processing for maintenance or emergency situations. Running jobs continue but no new jobs will start when paused.

Request Body

{
  "action": "pause",
  "reason": "System maintenance window",
  "duration": 30,
  "allowRunningJobsToComplete": true,
  "notifyUsers": true,
  "scheduledResume": "2024-01-20T12:00:00Z"
}

Response

{
  "action": "pause",
  "status": "Paused",
  "pausedAt": "2024-01-20T10:47:00Z",
  "scheduledResume": "2024-01-20T12:00:00Z",
  "affectedJobs": 23,
  "runningJobsCount": 3,
  "estimatedDelayMinutes": 30,
  "reason": "System maintenance window",
  "pausedBy": "admin123"
}

Get Queue History

GET /api/{tenantId}/{projectId}/execution/queue/history

Retrieves historical queue performance data and metrics for analysis and optimization purposes.

Query Parameters

Parameter Type Description
dateFrom datetime Start date for historical data
dateTo datetime End date for historical data
aggregation string Data aggregation level: hour, day, week (default: hour)
metrics string Comma-separated metrics: queue_size, wait_time, throughput, efficiency

Response

{
  "period": {
    "startDate": "2024-01-19T00:00:00Z",
    "endDate": "2024-01-20T10:47:00Z",
    "aggregation": "hour"
  },
  "summary": {
    "totalJobsProcessed": 847,
    "averageQueueSize": 12.3,
    "averageWaitTime": "7.8 minutes",
    "peakQueueSize": 45,
    "peakWaitTime": "23 minutes",
    "throughputPerHour": 24.8,
    "efficiency": 87.2
  },
  "hourlyData": [
    {
      "timestamp": "2024-01-20T09:00:00Z",
      "queueSize": {
        "average": 18,
        "peak": 25,
        "minimum": 8
      },
      "waitTime": {
        "average": "9.5 minutes",
        "maximum": "18 minutes",
        "minimum": "2 minutes"
      },
      "throughput": {
        "jobsCompleted": 28,
        "jobsSubmitted": 31,
        "efficiency": 89.3
      },
      "priorityDistribution": {
        "critical": 2,
        "high": 8,
        "normal": 14,
        "low": 1
      }
    }
  ],
  "trends": {
    "queueSizeGrowth": -2.3,
    "waitTimeImprovement": 5.7,
    "throughputIncrease": 12.1,
    "efficiencyChange": 3.4
  },
  "bottlenecks": [
    {
      "timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
      "issue": "High memory usage jobs accumulated",
      "impact": "15 minute delay",
      "resolution": "Additional worker allocated"
    }
  ]
}

Cancel User's Queued Jobs

DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}

Cancels all queued jobs submitted by a specific user. Running jobs by the user will continue to completion.

Request Body (Optional)

{
  "reason": "User account deactivated",
  "notifyUser": false,
  "cancelJobTypes": ["ProcessMining", "DataEnrichment"],
  "excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}

Response

{
  "userId": "user123",
  "cancelledJobsCount": 5,
  "preservedJobsCount": 2,
  "cancelledJobs": [
    {
      "jobId": "job1-guid",
      "jobName": "Weekly Analysis",
      "priority": "Normal",
      "queuePosition": 8
    }
  ],
  "preservedJobs": [
    {
      "jobId": "important-job-id-1",
      "jobName": "Critical Business Report",
      "reason": "Explicitly excluded"
    }
  ],
  "cancelledAt": "2024-01-20T10:47:00Z",
  "cancelledBy": "admin123"
}

Get Queue Predictions

GET /api/{tenantId}/{projectId}/execution/queue/predictions

Provides AI-powered predictions for queue behavior, optimal submission times, and resource allocation recommendations.

Query Parameters

Parameter Type Description
horizon integer Prediction horizon in hours (1-24, default: 4)
jobType string Predict for specific job type
includeRecommendations boolean Include optimization recommendations (default: true)

Response

{
  "predictionTime": "2024-01-20T10:47:00Z",
  "horizon": 4,
  "predictions": {
    "queueSizeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "expectedQueueSize": 18,
        "confidence": 0.87
      },
      {
        "time": "2024-01-20T12:00:00Z",
        "expectedQueueSize": 12,
        "confidence": 0.82
      }
    ],
    "waitTimeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "averageWaitTime": "6.5 minutes",
        "confidence": 0.85
      }
    ],
    "resourceUtilization": [
      {
        "time": "2024-01-20T11:00:00Z",
        "cpuUtilization": 78,
        "memoryUtilization": 65,
        "efficiency": 89.2
      }
    ]
  },
  "recommendations": {
    "optimalSubmissionTimes": [
      {
        "timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
        "expectedWaitTime": "3-5 minutes",
        "reason": "Low queue activity period"
      }
    ],
    "resourceOptimization": [
      {
        "recommendation": "Add 1 additional worker node",
        "expectedImprovement": "25% reduction in wait times",
        "cost": "Low",
        "priority": "Medium"
      }
    ],
    "jobScheduling": [
      {
        "jobType": "ProcessMining",
        "recommendation": "Schedule during off-peak hours (14:00-16:00)",
        "reason": "Memory-intensive jobs perform better with less contention"
      }
    ]
  },
  "modelInfo": {
    "modelVersion": "2.1.3",
    "lastTrained": "2024-01-19T02:00:00Z",
    "accuracy": 0.84,
    "dataPoints": 10080
  }
}

Example: Queue Management Workflow

This example demonstrates monitoring and managing the job queue:

// 1. Get current queue status
const getQueueStatus = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const queue = await response.json();
  console.log(`Queue Status: ${queue.queueStatus}`);
  console.log(`Total jobs: ${queue.summary.totalQueuedJobs}`);
  console.log(`Average wait time: ${queue.summary.averageWaitTime}`);

  return queue;
};

// 2. Change job priority if needed
const updateJobPriority = async (jobId, newPriority, reason) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
    method: 'PUT',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      newPriority: newPriority,
      reason: reason,
      notifyUser: true
    })
  });

  const result = await response.json();
  console.log(`Job ${jobId} priority changed from ${result.previousPriority} to ${result.newPriority}`);
  console.log(`New position: ${result.newQueuePosition} (was ${result.previousQueuePosition})`);
  console.log(`Time saved: ${result.timeSaved}`);

  return result;
};

// 3. Get queue predictions for optimization
const getQueuePredictions = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const predictions = await response.json();
  console.log('Queue Predictions:');

  predictions.predictions.queueSizeProjection.forEach(prediction => {
    console.log(`  ${prediction.time}: ${prediction.expectedQueueSize} jobs (${Math.round(prediction.confidence * 100)}% confidence)`);
  });

  console.log('Recommendations:');
  predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
    console.log(`  Submit during: ${rec.timeWindow} (${rec.expectedWaitTime} wait)`);
  });

  return predictions;
};

// 4. Monitor queue for specific job
const monitorJobInQueue = async (jobId) => {
  const checkQueue = async () => {
    const queue = await getQueueStatus();
    const job = queue.queuedJobs.find(j => j.jobId === jobId);

    if (job) {
      console.log(`Job ${jobId} is at position ${job.queuePosition}`);
      console.log(`Estimated start: ${job.estimatedStartTime}`);
      console.log(`Estimated duration: ${job.estimatedDuration}`);

      // Check again in 2 minutes
      setTimeout(() => checkQueue(), 120000);
    } else {
      console.log(`Job ${jobId} is no longer in queue (likely started or cancelled)`);
    }
  };

  await checkQueue();
};

// 5. Emergency queue management
const pauseQueue = async (reason, duration) => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      action: 'pause',
      reason: reason,
      duration: duration,
      allowRunningJobsToComplete: true,
      notifyUsers: true
    })
  });

  const result = await response.json();
  console.log(`Queue paused: ${result.status}`);
  console.log(`${result.affectedJobs} jobs affected`);
  console.log(`Estimated delay: ${result.estimatedDelayMinutes} minutes`);

  return result;
};

// Execute queue management workflow
getQueueStatus()
  .then(queue => {
    console.log('Current queue status retrieved');

    // Check if queue is getting long
    if (queue.summary.totalQueuedJobs > 30) {
      console.log('Queue is getting long, checking predictions...');
      return getQueuePredictions();
    }

    return null;
  })
  .then(predictions => {
    if (predictions) {
      console.log('Queue predictions retrieved');

      // If predictions show continued growth, consider resource optimization
      const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
      if (futureQueueSize.expectedQueueSize > 25) {
        console.log('Consider implementing resource optimization recommendations');
        predictions.recommendations.resourceOptimization.forEach(rec => {
          console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
        });
      }
    }
  })
  .catch(error => console.error('Queue management failed:', error));

Python Example

import requests
import time
import json
from datetime import datetime, timedelta

class QueueManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
        """Get current queue status"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
        params = {'includeEstimates': str(include_estimates).lower()}

        if priority:
            params['priority'] = priority
        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def get_jobs_by_priority(self, priority):
        """Get jobs filtered by priority level"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
        """Change priority of a queued job"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
        payload = {
            'newPriority': new_priority,
            'reason': reason,
            'notifyUser': notify_user
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def move_job_position(self, job_id, new_position, reason):
        """Move job to new position within its priority tier"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
        payload = {
            'newPosition': new_position,
            'reason': reason,
            'respectPriorityBoundaries': True
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def control_queue(self, action, reason, duration=None, scheduled_resume=None):
        """Pause or resume queue processing"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/control"
        payload = {
            'action': action,
            'reason': reason,
            'allowRunningJobsToComplete': True,
            'notifyUsers': True
        }

        if duration:
            payload['duration'] = duration
        if scheduled_resume:
            payload['scheduledResume'] = scheduled_resume.isoformat()

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
        """Get historical queue performance data"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
        params = {
            'dateFrom': date_from.isoformat(),
            'dateTo': date_to.isoformat(),
            'aggregation': aggregation
        }

        if metrics:
            params['metrics'] = ','.join(metrics)

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
        """Cancel all queued jobs for a specific user"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
        payload = {
            'reason': reason,
            'notifyUser': False
        }

        if job_types:
            payload['cancelJobTypes'] = job_types
        if exclude_job_ids:
            payload['excludeJobIds'] = exclude_job_ids

        response = requests.delete(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
        """Get AI-powered queue predictions"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
        params = {
            'horizon': horizon,
            'includeRecommendations': str(include_recommendations).lower()
        }

        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def monitor_queue_health(self, alert_threshold=30, check_interval=300):
        """Continuously monitor queue health and alert on issues"""
        while True:
            try:
                queue_status = self.get_queue_status()
                total_jobs = queue_status['summary']['totalQueuedJobs']
                avg_wait = queue_status['summary']['averageWaitTime']

                print(f"Queue Health Check: {total_jobs} jobs, avg wait: {avg_wait}")

                if total_jobs > alert_threshold:
                    print(f"ALERT: Queue size ({total_jobs}) exceeds threshold ({alert_threshold})")

                    # Get predictions to understand if this will improve
                    predictions = self.get_queue_predictions()
                    future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']

                    if future_size > total_jobs:
                        print("WARNING: Queue expected to grow further")
                        print("Resource optimization recommendations:")
                        for rec in predictions['recommendations']['resourceOptimization']:
                            print(f"  - {rec['recommendation']}: {rec['expectedImprovement']}")

                time.sleep(check_interval)

            except Exception as e:
                print(f"Queue monitoring error: {e}")
                time.sleep(60)

# Usage example
manager = QueueManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Get comprehensive queue status
    queue_status = manager.get_queue_status(include_estimates=True)
    print(f"Queue Status: {queue_status['queueStatus']}")
    print(f"Total jobs in queue: {queue_status['summary']['totalQueuedJobs']}")
    print(f"Average wait time: {queue_status['summary']['averageWaitTime']}")
    print(f"Processing capacity: {queue_status['processingCapacity']['currentLoad']}%")

    # Check high priority jobs specifically
    high_priority_jobs = manager.get_jobs_by_priority('High')
    print(f"High priority jobs: {high_priority_jobs['jobCount']}")

    # Get queue predictions for the next 4 hours
    predictions = manager.get_queue_predictions(horizon=4)
    print("Queue predictions:")
    for pred in predictions['predictions']['queueSizeProjection']:
        confidence_pct = round(pred['confidence'] * 100)
        print(f"  {pred['time']}: {pred['expectedQueueSize']} jobs ({confidence_pct}% confidence)")

    # Check recommendations
    if predictions['recommendations']['optimalSubmissionTimes']:
        print("Optimal submission times:")
        for rec in predictions['recommendations']['optimalSubmissionTimes']:
            print(f"  {rec['timeWindow']}: {rec['expectedWaitTime']} wait time")

    # Example: Elevate a job priority if needed
    if queue_status['summary']['totalQueuedJobs'] > 20:
        # Find a normal priority job to elevate
        normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
        if normal_jobs:
            job_to_elevate = normal_jobs[0]
            result = manager.change_job_priority(
                job_to_elevate['jobId'],
                'High',
                'Queue congestion - elevating business critical job'
            )
            print(f"Elevated job {job_to_elevate['jobName']} to High priority")
            print(f"New position: {result['newQueuePosition']} (was {result['previousPosition']})")

    # Get queue history for analysis
    history = manager.get_queue_history(
        datetime.now() - timedelta(hours=24),
        datetime.now(),
        'hour',
        ['queue_size', 'wait_time', 'throughput']
    )

    print(f"24h summary: {history['summary']['totalJobsProcessed']} jobs processed")
    print(f"Peak queue size: {history['summary']['peakQueueSize']}")
    print(f"Average throughput: {history['summary']['throughputPerHour']} jobs/hour")

    # If there are bottlenecks, report them
    if history['bottlenecks']:
        print("Recent bottlenecks:")
        for bottleneck in history['bottlenecks']:
            print(f"  {bottleneck['timeframe']}: {bottleneck['issue']} (Impact: {bottleneck['impact']})")

except Exception as e:
    print(f"Error in queue management: {e}")
An error has occurred. This application may no longer respond until reloaded. Reload ??