Jobwachtrij
Beheer Uitvoeringswachtrij
Bekijk en beheer de uitvoeringswachtrij, stel prioriteiten in en controleer planning van jobs.
Status van Wachtrij Opvragen
GET /api/{tenantId}/{projectId}/execution/queue
Haalt de huidige status van de uitvoeringswachtrij op, inclusief geplaatste jobs, hun prioriteiten en geschatte verwerkingstijden.
Parameters
| Parameter | Type | Locatie | Beschrijving |
|---|---|---|---|
tenantId |
GUID | Pad | De tenant identificatie |
projectId |
GUID | Pad | De projectidentificatie |
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
priority |
string | Filter op prioriteit: Critical, High, Normal, Low |
jobType |
string | Filter op jobtype: ProcessMining, DataEnrichment, Notebook, Analysis |
includeEstimates |
boolean | Gedetailleerde tijdschattingen opnemen (standaard: true) |
Response
{
"queueStatus": "Active",
"timestamp": "2024-01-20T10:45:00Z",
"summary": {
"totalQueuedJobs": 23,
"criticalPriorityJobs": 2,
"highPriorityJobs": 7,
"normalPriorityJobs": 12,
"lowPriorityJobs": 2,
"averageWaitTime": "8.5 minutes",
"estimatedProcessingTime": "47 minutes"
},
"processingCapacity": {
"activeWorkers": 4,
"totalWorkers": 6,
"currentLoad": 67,
"maxConcurrentJobs": 8,
"currentlyRunning": 3
},
"queuedJobs": [
{
"jobId": "ff0e8400-e29b-41d4-a716-446655440000",
"jobName": "Customer Analytics Pipeline",
"jobType": "ProcessMining",
"priority": "Critical",
"queuePosition": 1,
"estimatedStartTime": "2024-01-20T10:47:00Z",
"estimatedDuration": "12-15 minutes",
"submittedBy": "user456",
"dateSubmitted": "2024-01-20T10:44:00Z",
"resourceRequirements": {
"cpuUnits": 2,
"memoryGB": 4,
"estimatedDiskUsage": "1.2 GB"
}
},
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"priority": "High",
"queuePosition": 2,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedDuration": "8-10 minutes",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
],
"performanceMetrics": {
"averageJobDuration": "16.3 minutes",
"throughputLastHour": 12,
"queueTrends": {
"currentHourSubmissions": 8,
"peakHourToday": "09:00-10:00",
"averageQueueSize": 15.7
}
}
}
Jobs Op Prioriteit Opvragen
GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}
Haalt jobs in de wachtrij op, gefilterd op een specifieke prioriteitsniveau, met gedetailleerde positie- en tijdsinformatie.
Parameters
| Parameter | Type | Locatie | Beschrijving |
|---|---|---|---|
priority |
string | Pad | Prioriteitsniveau: Critical, High, Normal, Low |
Response
{
"priority": "High",
"jobCount": 7,
"averageWaitTime": "6.2 minutes",
"estimatedProcessingTime": "31 minutes",
"jobs": [
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"queuePosition": 2,
"overallQueuePosition": 3,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedCompletion": "2024-01-20T11:12:00Z",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"waitTime": "15 minutes",
"dependencies": [],
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
]
}
Prioriteit Van Job Wijzigen
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority
Wijzigt de prioriteit van een geplaatste job, wat de positie in de wachtrij en de geschatte starttijd kan veranderen.
Request Body
{
"newPriority": "Critical",
"reason": "Business critical analysis required urgently",
"notifyUser": true
}
Response
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"previousPriority": "High",
"newPriority": "Critical",
"previousQueuePosition": 3,
"newQueuePosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:47:00Z",
"timeSaved": "15 minutes",
"updatedBy": "user123",
"updateTime": "2024-01-20T10:46:00Z"
}
Positie Van Job Verplaatsen
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position
Past handmatig de positie van een job aan binnen hetzelfde prioriteitsniveau. Positiewijzigingen zijn beperkt tot die zelfde prioriteitslaag.
Request Body
{
"newPosition": 1,
"reason": "Dependencies resolved, can execute earlier",
"respectPriorityBoundaries": true
}
Response
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"priority": "High",
"previousPosition": 3,
"newPosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:55:00Z",
"affectedJobs": [
{
"jobId": "11fe8400-e29b-41d4-a716-446655440000",
"newPosition": 2,
"newEstimatedStart": "2024-01-20T11:05:00Z"
}
],
"updateTime": "2024-01-20T10:46:00Z"
}
Wachtrijverwerking Controleren
POST /api/{tenantId}/{projectId}/execution/queue/control
Pauzeert of hervat de verwerking van de wachtrij voor onderhoud of noodsituaties. Lopende jobs worden voortgezet, maar nieuwe jobs starten niet tijdens pauze.
Request Body
{
"action": "pause",
"reason": "System maintenance window",
"duration": 30,
"allowRunningJobsToComplete": true,
"notifyUsers": true,
"scheduledResume": "2024-01-20T12:00:00Z"
}
Response
{
"action": "pause",
"status": "Paused",
"pausedAt": "2024-01-20T10:47:00Z",
"scheduledResume": "2024-01-20T12:00:00Z",
"affectedJobs": 23,
"runningJobsCount": 3,
"estimatedDelayMinutes": 30,
"reason": "System maintenance window",
"pausedBy": "admin123"
}
Historie Van Wachtrij Opvragen
GET /api/{tenantId}/{projectId}/execution/queue/history
Haalt historische gegevens en prestatie-indicatoren van de wachtrij op voor analyse en optimalisatie.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
dateFrom |
datetime | Startdatum voor historische gegevens |
dateTo |
datetime | Einddatum voor historische gegevens |
aggregation |
string | Aggregatieniveau van data: hour, day, week (standaard: hour) |
metrics |
string | Komma-gescheiden metrics: queue_size, wait_time, throughput, efficiency |
Response
{
"period": {
"startDate": "2024-01-19T00:00:00Z",
"endDate": "2024-01-20T10:47:00Z",
"aggregation": "hour"
},
"summary": {
"totalJobsProcessed": 847,
"averageQueueSize": 12.3,
"averageWaitTime": "7.8 minutes",
"peakQueueSize": 45,
"peakWaitTime": "23 minutes",
"throughputPerHour": 24.8,
"efficiency": 87.2
},
"hourlyData": [
{
"timestamp": "2024-01-20T09:00:00Z",
"queueSize": {
"average": 18,
"peak": 25,
"minimum": 8
},
"waitTime": {
"average": "9.5 minutes",
"maximum": "18 minutes",
"minimum": "2 minutes"
},
"throughput": {
"jobsCompleted": 28,
"jobsSubmitted": 31,
"efficiency": 89.3
},
"priorityDistribution": {
"critical": 2,
"high": 8,
"normal": 14,
"low": 1
}
}
],
"trends": {
"queueSizeGrowth": -2.3,
"waitTimeImprovement": 5.7,
"throughputIncrease": 12.1,
"efficiencyChange": 3.4
},
"bottlenecks": [
{
"timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
"issue": "High memory usage jobs accumulated",
"impact": "15 minute delay",
"resolution": "Additional worker allocated"
}
]
}
Geplande Jobs Van Gebruiker Annuleren
DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}
Annuleert alle geplaatste jobs van een specifieke gebruiker. Lopende jobs van die gebruiker worden voltooid.
Request Body (Optioneel)
{
"reason": "User account deactivated",
"notifyUser": false,
"cancelJobTypes": ["ProcessMining", "DataEnrichment"],
"excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}
Response
{
"userId": "user123",
"cancelledJobsCount": 5,
"preservedJobsCount": 2,
"cancelledJobs": [
{
"jobId": "job1-guid",
"jobName": "Weekly Analysis",
"priority": "Normal",
"queuePosition": 8
}
],
"preservedJobs": [
{
"jobId": "important-job-id-1",
"jobName": "Critical Business Report",
"reason": "Explicitly excluded"
}
],
"cancelledAt": "2024-01-20T10:47:00Z",
"cancelledBy": "admin123"
}
Wachtrijvoorspellingen Opvragen
GET /api/{tenantId}/{projectId}/execution/queue/predictions
Biedt AI-gestuurde voorspellingen over wachtrijgedrag, optimale indieningstijden en aanbevelingen voor resourceallocatie.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
horizon |
integer | Voorspellingshorizon in uren (1-24, standaard: 4) |
jobType |
string | Voorspellingen voor specifiek jobtype |
includeRecommendations |
boolean | Optimalisatie-aanbevelingen opnemen (standaard: true) |
Response
{
"predictionTime": "2024-01-20T10:47:00Z",
"horizon": 4,
"predictions": {
"queueSizeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"expectedQueueSize": 18,
"confidence": 0.87
},
{
"time": "2024-01-20T12:00:00Z",
"expectedQueueSize": 12,
"confidence": 0.82
}
],
"waitTimeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"averageWaitTime": "6.5 minutes",
"confidence": 0.85
}
],
"resourceUtilization": [
{
"time": "2024-01-20T11:00:00Z",
"cpuUtilization": 78,
"memoryUtilization": 65,
"efficiency": 89.2
}
]
},
"recommendations": {
"optimalSubmissionTimes": [
{
"timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
"expectedWaitTime": "3-5 minutes",
"reason": "Low queue activity period"
}
],
"resourceOptimization": [
{
"recommendation": "Add 1 additional worker node",
"expectedImprovement": "25% reduction in wait times",
"cost": "Low",
"priority": "Medium"
}
],
"jobScheduling": [
{
"jobType": "ProcessMining",
"recommendation": "Schedule during off-peak hours (14:00-16:00)",
"reason": "Memory-intensive jobs perform better with less contention"
}
]
},
"modelInfo": {
"modelVersion": "2.1.3",
"lastTrained": "2024-01-19T02:00:00Z",
"accuracy": 0.84,
"dataPoints": 10080
}
}
Voorbeeld: Workflow Voor Wachtrijbeheer
Dit voorbeeld toont het monitoren en beheren van de jobwachtrij:
// 1. Huidige status van wachtrij opvragen
const getQueueStatus = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const queue = await response.json();
console.log(`Status Wachtrij: ${queue.queueStatus}`);
console.log(`Totaal aantal jobs: ${queue.summary.totalQueuedJobs}`);
console.log(`Gemiddelde wachttijd: ${queue.summary.averageWaitTime}`);
return queue;
};
// 2. Indien nodig de prioriteit van een job aanpassen
const updateJobPriority = async (jobId, newPriority, reason) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
newPriority: newPriority,
reason: reason,
notifyUser: true
})
});
const result = await response.json();
console.log(`Prioriteit job ${jobId} gewijzigd van ${result.previousPriority} naar ${result.newPriority}`);
console.log(`Nieuwe positie: ${result.newQueuePosition} (was ${result.previousQueuePosition})`);
console.log(`Bespaarde tijd: ${result.timeSaved}`);
return result;
};
// 3. Wachtrijvoorspellingen ophalen voor optimalisatie
const getQueuePredictions = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const predictions = await response.json();
console.log('Wachtrij Voorspellingen:');
predictions.predictions.queueSizeProjection.forEach(prediction => {
console.log(` ${prediction.time}: ${prediction.expectedQueueSize} jobs (${Math.round(prediction.confidence * 100)}% betrouwbaarheid)`);
});
console.log('Aanbevelingen:');
predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
console.log(` Indienen tijdens: ${rec.timeWindow} (${rec.expectedWaitTime} wachttijd)`);
});
return predictions;
};
// 4. Wachtrij voor specifieke job monitoren
const monitorJobInQueue = async (jobId) => {
const checkQueue = async () => {
const queue = await getQueueStatus();
const job = queue.queuedJobs.find(j => j.jobId === jobId);
if (job) {
console.log(`Job ${jobId} bevindt zich op positie ${job.queuePosition}`);
console.log(`Geschatte start: ${job.estimatedStartTime}`);
console.log(`Geschatte duur: ${job.estimatedDuration}`);
// Over 2 minuten opnieuw controleren
setTimeout(() => checkQueue(), 120000);
} else {
console.log(`Job ${jobId} staat niet meer in de wachtrij (waarschijnlijk gestart of geannuleerd)`);
}
};
await checkQueue();
};
// 5. Noodbeheer van wachtrij
const pauseQueue = async (reason, duration) => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
action: 'pause',
reason: reason,
duration: duration,
allowRunningJobsToComplete: true,
notifyUsers: true
})
});
const result = await response.json();
console.log(`Wachtrij gepauzeerd: ${result.status}`);
console.log(`${result.affectedJobs} jobs getroffen`);
console.log(`Geschatte vertraging: ${result.estimatedDelayMinutes} minuten`);
return result;
};
// Workflow voor wachtrijbeheer uitvoeren
getQueueStatus()
.then(queue => {
console.log('Huidige wachtrijstatus opgehaald');
// Controleren of de wachtrij te lang wordt
if (queue.summary.totalQueuedJobs > 30) {
console.log('Wachtrij wordt lang, voorspellingsdata opvragen...');
return getQueuePredictions();
}
return null;
})
.then(predictions => {
if (predictions) {
console.log('Wachtrijvoorspellingen ontvangen');
// Indien voorspellingen groei aangeven, resource-optimalisatie overwegen
const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
if (futureQueueSize.expectedQueueSize > 25) {
console.log('Overweeg aanbevelingen voor resource-optimalisatie te implementeren');
predictions.recommendations.resourceOptimization.forEach(rec => {
console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
});
}
}
})
.catch(error => console.error('Wachtrijbeheer mislukt:', error));
Python Voorbeeld
import requests
import time
import json
from datetime import datetime, timedelta
class QueueManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
"""Huidige status van wachtrij ophalen"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
params = {'includeEstimates': str(include_estimates).lower()}
if priority:
params['priority'] = priority
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def get_jobs_by_priority(self, priority):
"""Jobs gefilterd op prioriteit ophalen"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
response = requests.get(url, headers=self.headers)
return response.json()
def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
"""Prioriteit van geplaatste job wijzigen"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
payload = {
'newPriority': new_priority,
'reason': reason,
'notifyUser': notify_user
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def move_job_position(self, job_id, new_position, reason):
"""Job verplaatsen naar nieuwe positie binnen eigen prioriteitslaag"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
payload = {
'newPosition': new_position,
'reason': reason,
'respectPriorityBoundaries': True
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def control_queue(self, action, reason, duration=None, scheduled_resume=None):
"""Verwerking van wachtrij pauzeren of hervatten"""
url = f"{self.base_url}/api/{self.tenantId}/{self.projectId}/execution/queue/control"
payload = {
'action': action,
'reason': reason,
'allowRunningJobsToComplete': True,
'notifyUsers': True
}
if duration:
payload['duration'] = duration
if scheduled_resume:
payload['scheduledResume'] = scheduled_resume.isoformat()
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
"""Historische prestatiegegevens van wachtrij ophalen"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
params = {
'dateFrom': date_from.isoformat(),
'dateTo': date_to.isoformat(),
'aggregation': aggregation
}
if metrics:
params['metrics'] = ','.join(metrics)
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
"""Alle geplaatste jobs van een specifieke gebruiker annuleren"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
payload = {
'reason': reason,
'notifyUser': False
}
if job_types:
payload['cancelJobTypes'] = job_types
if exclude_job_ids:
payload['excludeJobIds'] = exclude_job_ids
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
"""AI-gestuurde wachtrijvoorspellingen ophalen"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
params = {
'horizon': horizon,
'includeRecommendations': str(include_recommendations).lower()
}
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def monitor_queue_health(self, alert_threshold=30, check_interval=300):
"""Continu wachtrijmonitoring en waarschuwingen bij problemen"""
while True:
try:
queue_status = self.get_queue_status()
total_jobs = queue_status['summary']['totalQueuedJobs']
avg_wait = queue_status['summary']['averageWaitTime']
print(f"Status wachtrij: {total_jobs} jobs, gemiddelde wachttijd: {avg_wait}")
if total_jobs > alert_threshold:
print(f"ALERT: Wachtrijgrootte ({total_jobs}) overschrijdt drempel ({alert_threshold})")
# Voorspellingen ophalen om te zien of dit verbetert
predictions = self.get_queue_predictions()
future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']
if future_size > total_jobs:
print("WAARSCHUWING: Wachtrij zal naar verwachting verder groeien")
print("Aanbevelingen voor resource-optimalisatie:")
for rec in predictions['recommendations']['resourceOptimization']:
print(f" - {rec['recommendation']}: {rec['expectedImprovement']}")
time.sleep(check_interval)
except Exception as e:
print(f"Fout bij wachtrijmonitoring: {e}")
time.sleep(60)
# Voorbeeld gebruik
manager = QueueManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Uitgebreide status van wachtrij opvragen
queue_status = manager.get_queue_status(include_estimates=True)
print(f"Status wachtrij: {queue_status['queueStatus']}")
print(f"Totaal aantal jobs in wachtrij: {queue_status['summary']['totalQueuedJobs']}")
print(f"Gemiddelde wachttijd: {queue_status['summary']['averageWaitTime']}")
print(f"Verwerkingscapaciteit: {queue_status['processingCapacity']['currentLoad']}%")
# Hoge prioriteit jobs specifiek bekijken
high_priority_jobs = manager.get_jobs_by_priority('High')
print(f"Hoge prioriteit jobs: {high_priority_jobs['jobCount']}")
# Wachtrijvoorspellingen voor de komende 4 uur opvragen
predictions = manager.get_queue_predictions(horizon=4)
print("Wachtrijvoorspellingen:")
for pred in predictions['predictions']['queueSizeProjection']:
confidence_pct = round(pred['confidence'] * 100)
print(f" {pred['time']}: {pred['expectedQueueSize']} jobs ({confidence_pct}% betrouwbaarheid)")
# Aanbevelingen bekijken
if predictions['recommendations']['optimalSubmissionTimes']:
print("Optimale indieningstijden:")
for rec in predictions['recommendations']['optimalSubmissionTimes']:
print(f" {rec['timeWindow']}: {rec['expectedWaitTime']} wachttijd")
# Voorbeeld: Prioriteit van een job verhogen indien nodig
if queue_status['summary']['totalQueuedJobs'] > 20:
# Zoek een job met normale prioriteit om te verhogen
normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
if normal_jobs:
job_to_elevate = normal_jobs[0]
result = manager.change_job_priority(
job_to_elevate['jobId'],
'High',
'Wachtrij congestie - verhogen van business critical job'
)
print(f"Prioriteit verhoogd van job {job_to_elevate['jobName']} naar High")
print(f"Nieuwe positie: {result['newQueuePosition']} (was {result['previousPosition']})")
# Historie van wachtrij ophalen voor analyse
history = manager.get_queue_history(
datetime.now() - timedelta(hours=24),
datetime.now(),
'hour',
['queue_size', 'wait_time', 'throughput']
)
print(f"24-uurs samenvatting: {history['summary']['totalJobsProcessed']} verwerkte jobs")
print(f"Maximale wachtrijgrootte: {history['summary']['peakQueueSize']}")
print(f"Gemiddelde doorvoer: {history['summary']['throughputPerHour']} jobs/uur")
# Indien er knelpunten zijn, deze rapporteren
if history['bottlenecks']:
print("Recente knelpunten:")
for bottleneck in history['bottlenecks']:
print(f" {bottleneck['timeframe']}: {bottleneck['issue']} (Impact: {bottleneck['impact']})")
except Exception as e:
print(f"Fout in wachtrijbeheer: {e}")