File d'attente des travaux
Gérer la file d'exécution
Afficher et gérer la file d'exécution des travaux, définir les priorités et contrôler la planification des travaux.
Obtenir le statut de la file
GET /api/{tenantId}/{projectId}/execution/queue
Récupère le statut actuel de la file d'exécution, y compris les travaux en file d'attente, leurs priorités et les temps de traitement estimés.
Paramètres
| Paramètre | Type | Emplacement | Description |
|---|---|---|---|
tenantId |
GUID | Chemin | L'identifiant du tenant |
projectId |
GUID | Chemin | L'identifiant du projet |
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
priority |
string | Filtrer par priorité : Critique, Haute, Normale, Basse |
jobType |
string | Filtrer par type de travail : ProcessMining, DataEnrichment, Notebook, Analysis |
includeEstimates |
boolean | Inclure des estimations détaillées des temps (par défaut : true) |
Réponse
{
"queueStatus": "Active",
"timestamp": "2024-01-20T10:45:00Z",
"summary": {
"totalQueuedJobs": 23,
"criticalPriorityJobs": 2,
"highPriorityJobs": 7,
"normalPriorityJobs": 12,
"lowPriorityJobs": 2,
"averageWaitTime": "8.5 minutes",
"estimatedProcessingTime": "47 minutes"
},
"processingCapacity": {
"activeWorkers": 4,
"totalWorkers": 6,
"currentLoad": 67,
"maxConcurrentJobs": 8,
"currentlyRunning": 3
},
"queuedJobs": [
{
"jobId": "ff0e8400-e29b-41d4-a716-446655440000",
"jobName": "Customer Analytics Pipeline",
"jobType": "ProcessMining",
"priority": "Critical",
"queuePosition": 1,
"estimatedStartTime": "2024-01-20T10:47:00Z",
"estimatedDuration": "12-15 minutes",
"submittedBy": "user456",
"dateSubmitted": "2024-01-20T10:44:00Z",
"resourceRequirements": {
"cpuUnits": 2,
"memoryGB": 4,
"estimatedDiskUsage": "1.2 GB"
}
},
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"priority": "High",
"queuePosition": 2,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedDuration": "8-10 minutes",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
],
"performanceMetrics": {
"averageJobDuration": "16.3 minutes",
"throughputLastHour": 12,
"queueTrends": {
"currentHourSubmissions": 8,
"peakHourToday": "09:00-10:00",
"averageQueueSize": 15.7
}
}
}
Obtenir les travaux par priorité
GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}
Récupère les travaux dans la file filtrés par niveau de priorité spécifique avec des informations détaillées sur la position et le temps.
Paramètres
| Paramètre | Type | Emplacement | Description |
|---|---|---|---|
priority |
string | Chemin | Niveau de priorité : Critique, Haute, Normale, Basse |
Réponse
{
"priority": "High",
"jobCount": 7,
"averageWaitTime": "6.2 minutes",
"estimatedProcessingTime": "31 minutes",
"jobs": [
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"queuePosition": 2,
"overallQueuePosition": 3,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedCompletion": "2024-01-20T11:12:00Z",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"waitTime": "15 minutes",
"dependencies": [],
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
]
}
Modifier la priorité d'un travail
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority
Met à jour la priorité d'un travail en file d'attente, ce qui peut changer sa position dans la file et son heure de début estimée.
Corps de la requête
{
"newPriority": "Critical",
"reason": "Business critical analysis required urgently",
"notifyUser": true
}
Réponse
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"previousPriority": "High",
"newPriority": "Critical",
"previousQueuePosition": 3,
"newQueuePosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:47:00Z",
"timeSaved": "15 minutes",
"updatedBy": "user123",
"updateTime": "2024-01-20T10:46:00Z"
}
Déplacer la position d'un travail
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position
Ajuste manuellement la position d'un travail dans son niveau de priorité. Les changements de position sont limités au même niveau de priorité.
Corps de la requête
{
"newPosition": 1,
"reason": "Dependencies resolved, can execute earlier",
"respectPriorityBoundaries": true
}
Réponse
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"priority": "High",
"previousPosition": 3,
"newPosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:55:00Z",
"affectedJobs": [
{
"jobId": "11fe8400-e29b-41d4-a716-446655440000",
"newPosition": 2,
"newEstimatedStart": "2024-01-20T11:05:00Z"
}
],
"updateTime": "2024-01-20T10:46:00Z"
}
Contrôler le traitement de la file
POST /api/{tenantId}/{projectId}/execution/queue/control
Met en pause ou reprend le traitement de la file pour maintenance ou situation d'urgence. Les travaux en cours continuent mais aucun nouveau travail ne commencera lorsque la file est en pause.
Corps de la requête
{
"action": "pause",
"reason": "System maintenance window",
"duration": 30,
"allowRunningJobsToComplete": true,
"notifyUsers": true,
"scheduledResume": "2024-01-20T12:00:00Z"
}
Réponse
{
"action": "pause",
"status": "Paused",
"pausedAt": "2024-01-20T10:47:00Z",
"scheduledResume": "2024-01-20T12:00:00Z",
"affectedJobs": 23,
"runningJobsCount": 3,
"estimatedDelayMinutes": 30,
"reason": "System maintenance window",
"pausedBy": "admin123"
}
Obtenir l'historique de la file
GET /api/{tenantId}/{projectId}/execution/queue/history
Récupère les données et métriques historiques de performance de la file pour analyse et optimisation.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
dateFrom |
datetime | Date de début pour les données historiques |
dateTo |
datetime | Date de fin pour les données historiques |
aggregation |
string | Niveau d'agrégation des données : heure, jour, semaine (par défaut : heure) |
metrics |
string | Métriques séparées par des virgules : queue_size, wait_time, throughput, efficiency |
Réponse
{
"period": {
"startDate": "2024-01-19T00:00:00Z",
"endDate": "2024-01-20T10:47:00Z",
"aggregation": "hour"
},
"summary": {
"totalJobsProcessed": 847,
"averageQueueSize": 12.3,
"averageWaitTime": "7.8 minutes",
"peakQueueSize": 45,
"peakWaitTime": "23 minutes",
"throughputPerHour": 24.8,
"efficiency": 87.2
},
"hourlyData": [
{
"timestamp": "2024-01-20T09:00:00Z",
"queueSize": {
"average": 18,
"peak": 25,
"minimum": 8
},
"waitTime": {
"average": "9.5 minutes",
"maximum": "18 minutes",
"minimum": "2 minutes"
},
"throughput": {
"jobsCompleted": 28,
"jobsSubmitted": 31,
"efficiency": 89.3
},
"priorityDistribution": {
"critical": 2,
"high": 8,
"normal": 14,
"low": 1
}
}
],
"trends": {
"queueSizeGrowth": -2.3,
"waitTimeImprovement": 5.7,
"throughputIncrease": 12.1,
"efficiencyChange": 3.4
},
"bottlenecks": [
{
"timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
"issue": "High memory usage jobs accumulated",
"impact": "15 minute delay",
"resolution": "Additional worker allocated"
}
]
}
Annuler les travaux en file d'un utilisateur
DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}
Annule tous les travaux en file soumis par un utilisateur spécifique. Les travaux en cours de l'utilisateur continueront jusqu'à leur achèvement.
Corps de la requête (optionnel)
{
"reason": "User account deactivated",
"notifyUser": false,
"cancelJobTypes": ["ProcessMining", "DataEnrichment"],
"excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}
Réponse
{
"userId": "user123",
"cancelledJobsCount": 5,
"preservedJobsCount": 2,
"cancelledJobs": [
{
"jobId": "job1-guid",
"jobName": "Weekly Analysis",
"priority": "Normal",
"queuePosition": 8
}
],
"preservedJobs": [
{
"jobId": "important-job-id-1",
"jobName": "Critical Business Report",
"reason": "Explicitly excluded"
}
],
"cancelledAt": "2024-01-20T10:47:00Z",
"cancelledBy": "admin123"
}
Obtenir les prédictions de la file
GET /api/{tenantId}/{projectId}/execution/queue/predictions
Fournit des prédictions basées sur l'IA pour le comportement de la file, les temps optimaux de soumission et les recommandations d'allocation des ressources.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
horizon |
integer | Horizon de prédiction en heures (1-24, par défaut : 4) |
jobType |
string | Prédire pour un type de travail spécifique |
includeRecommendations |
boolean | Inclure les recommandations d'optimisation (par défaut : true) |
Réponse
{
"predictionTime": "2024-01-20T10:47:00Z",
"horizon": 4,
"predictions": {
"queueSizeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"expectedQueueSize": 18,
"confidence": 0.87
},
{
"time": "2024-01-20T12:00:00Z",
"expectedQueueSize": 12,
"confidence": 0.82
}
],
"waitTimeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"averageWaitTime": "6.5 minutes",
"confidence": 0.85
}
],
"resourceUtilization": [
{
"time": "2024-01-20T11:00:00Z",
"cpuUtilization": 78,
"memoryUtilization": 65,
"efficiency": 89.2
}
]
},
"recommendations": {
"optimalSubmissionTimes": [
{
"timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
"expectedWaitTime": "3-5 minutes",
"reason": "Low queue activity period"
}
],
"resourceOptimization": [
{
"recommendation": "Add 1 additional worker node",
"expectedImprovement": "25% reduction in wait times",
"cost": "Low",
"priority": "Medium"
}
],
"jobScheduling": [
{
"jobType": "ProcessMining",
"recommendation": "Schedule during off-peak hours (14:00-16:00)",
"reason": "Memory-intensive jobs perform better with less contention"
}
]
},
"modelInfo": {
"modelVersion": "2.1.3",
"lastTrained": "2024-01-19T02:00:00Z",
"accuracy": 0.84,
"dataPoints": 10080
}
}
Exemple : Flux de gestion de la file
Cet exemple démontre la surveillance et la gestion de la file des travaux :
// 1. Obtenir le statut actuel de la file
const getQueueStatus = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const queue = await response.json();
console.log(`Statut de la file : ${queue.queueStatus}`);
console.log(`Nombre total de travaux : ${queue.summary.totalQueuedJobs}`);
console.log(`Temps d'attente moyen : ${queue.summary.averageWaitTime}`);
return queue;
};
// 2. Modifier la priorité du travail si nécessaire
const updateJobPriority = async (jobId, newPriority, reason) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
newPriority: newPriority,
reason: reason,
notifyUser: true
})
});
const result = await response.json();
console.log(`Priorité du travail ${jobId} modifiée de ${result.previousPriority} à ${result.newPriority}`);
console.log(`Nouvelle position : ${result.newQueuePosition} (était ${result.previousQueuePosition})`);
console.log(`Temps économisé : ${result.timeSaved}`);
return result;
};
// 3. Obtenir les prédictions de la file pour l'optimisation
const getQueuePredictions = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const predictions = await response.json();
console.log('Prédictions de la file :');
predictions.predictions.queueSizeProjection.forEach(prediction => {
console.log(` ${prediction.time} : ${prediction.expectedQueueSize} travaux (${Math.round(prediction.confidence * 100)}% de confiance)`);
});
console.log('Recommandations :');
predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
console.log(` Soumettre durant : ${rec.timeWindow} (${rec.expectedWaitTime} d'attente)`);
});
return predictions;
};
// 4. Surveiller un travail spécifique dans la file
const monitorJobInQueue = async (jobId) => {
const checkQueue = async () => {
const queue = await getQueueStatus();
const job = queue.queuedJobs.find(j => j.jobId === jobId);
if (job) {
console.log(`Le travail ${jobId} est à la position ${job.queuePosition}`);
console.log(`Début estimé : ${job.estimatedStartTime}`);
console.log(`Durée estimée : ${job.estimatedDuration}`);
// Vérifier à nouveau dans 2 minutes
setTimeout(() => checkQueue(), 120000);
} else {
console.log(`Le travail ${jobId} n'est plus dans la file (probablement démarré ou annulé)`);
}
};
await checkQueue();
};
// 5. Gestion d'urgence de la file
const pauseQueue = async (reason, duration) => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
action: 'pause',
reason: reason,
duration: duration,
allowRunningJobsToComplete: true,
notifyUsers: true
})
});
const result = await response.json();
console.log(`File mise en pause : ${result.status}`);
console.log(`${result.affectedJobs} travaux affectés`);
console.log(`Délai estimé : ${result.estimatedDelayMinutes} minutes`);
return result;
};
// Exécuter le flux de gestion de la file
getQueueStatus()
.then(queue => {
console.log('Statut actuel de la file récupéré');
// Vérifier si la file devient longue
if (queue.summary.totalQueuedJobs > 30) {
console.log('La file devient longue, vérification des prédictions...');
return getQueuePredictions();
}
return null;
})
.then(predictions => {
if (predictions) {
console.log('Prédictions de la file récupérées');
// Si les prédictions montrent une croissance continue, envisager une optimisation des ressources
const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
if (futureQueueSize.expectedQueueSize > 25) {
console.log('Envisager la mise en œuvre des recommandations d\'optimisation des ressources');
predictions.recommendations.resourceOptimization.forEach(rec => {
console.log(`- ${rec.recommendation} : ${rec.expectedImprovement}`);
});
}
}
})
.catch(error => console.error('Échec de la gestion de la file :', error));
Exemple en Python
import requests
import time
import json
from datetime import datetime, timedelta
class QueueManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
"""Obtenir le statut actuel de la file"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
params = {'includeEstimates': str(include_estimates).lower()}
if priority:
params['priority'] = priority
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def get_jobs_by_priority(self, priority):
"""Obtenir les travaux filtrés par niveau de priorité"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
response = requests.get(url, headers=self.headers)
return response.json()
def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
"""Changer la priorité d'un travail en file"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
payload = {
'newPriority': new_priority,
'reason': reason,
'notifyUser': notify_user
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def move_job_position(self, job_id, new_position, reason):
"""Déplacer le travail à une nouvelle position dans son niveau de priorité"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
payload = {
'newPosition': new_position,
'reason': reason,
'respectPriorityBoundaries': True
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def control_queue(self, action, reason, duration=None, scheduled_resume=None):
"""Mettre en pause ou reprendre le traitement de la file"""
url = f"{self.base_url}/api/{self.tenantId}/{self.projectId}/execution/queue/control"
payload = {
'action': action,
'reason': reason,
'allowRunningJobsToComplete': True,
'notifyUsers': True
}
if duration:
payload['duration'] = duration
if scheduled_resume:
payload['scheduledResume'] = scheduled_resume.isoformat()
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
"""Obtenir les données historiques de performance de la file"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
params = {
'dateFrom': date_from.isoformat(),
'dateTo': date_to.isoformat(),
'aggregation': aggregation
}
if metrics:
params['metrics'] = ','.join(metrics)
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
"""Annuler tous les travaux en file pour un utilisateur spécifique"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
payload = {
'reason': reason,
'notifyUser': False
}
if job_types:
payload['cancelJobTypes'] = job_types
if exclude_job_ids:
payload['excludeJobIds'] = exclude_job_ids
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
"""Obtenir les prédictions de la file basées sur l'IA"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
params = {
'horizon': horizon,
'includeRecommendations': str(include_recommendations).lower()
}
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def monitor_queue_health(self, alert_threshold=30, check_interval=300):
"""Surveiller continuellement la santé de la file et alerter en cas de problèmes"""
while True:
try:
queue_status = self.get_queue_status()
total_jobs = queue_status['summary']['totalQueuedJobs']
avg_wait = queue_status['summary']['averageWaitTime']
print(f"Contrôle santé file : {total_jobs} travaux, attente moyenne : {avg_wait}")
if total_jobs > alert_threshold:
print(f"ALERTE : La taille de la file ({total_jobs}) dépasse le seuil ({alert_threshold})")
# Obtenir les prédictions pour comprendre l'évolution attendue
predictions = self.get_queue_predictions()
future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']
if future_size > total_jobs:
print("AVERTISSEMENT : La file devrait encore s'allonger")
print("Recommandations d'optimisation des ressources :")
for rec in predictions['recommendations']['resourceOptimization']:
print(f" - {rec['recommendation']} : {rec['expectedImprovement']}")
time.sleep(check_interval)
except Exception as e:
print(f"Erreur de surveillance de la file : {e}")
time.sleep(60)
# Exemple d'utilisation
manager = QueueManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Obtenir un statut complet de la file
queue_status = manager.get_queue_status(include_estimates=True)
print(f"Statut de la file : {queue_status['queueStatus']}")
print(f"Travaux en file : {queue_status['summary']['totalQueuedJobs']}")
print(f"Temps d'attente moyen : {queue_status['summary']['averageWaitTime']}")
print(f"Capacité de traitement : {queue_status['processingCapacity']['currentLoad']}%")
# Vérifier spécifiquement les travaux à haute priorité
high_priority_jobs = manager.get_jobs_by_priority('High')
print(f"Travaux à haute priorité : {high_priority_jobs['jobCount']}")
# Obtenir les prédictions de la file pour les 4 prochaines heures
predictions = manager.get_queue_predictions(horizon=4)
print("Prédictions de la file :")
for pred in predictions['predictions']['queueSizeProjection']:
confidence_pct = round(pred['confidence'] * 100)
print(f" {pred['time']} : {pred['expectedQueueSize']} travaux ({confidence_pct}% de confiance)")
# Vérifier les recommandations
if predictions['recommendations']['optimalSubmissionTimes']:
print("Heures optimales de soumission :")
for rec in predictions['recommendations']['optimalSubmissionTimes']:
print(f" {rec['timeWindow']} : {rec['expectedWaitTime']} temps d'attente")
# Exemple : élever la priorité d'un travail si nécessaire
if queue_status['summary']['totalQueuedJobs'] > 20:
# Trouver un travail de priorité normale à élever
normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
if normal_jobs:
job_to_elevate = normal_jobs[0]
result = manager.change_job_priority(
job_to_elevate['jobId'],
'High',
'Congestion dans la file - élévation du travail critique'
)
print(f"Travail {job_to_elevate['jobName']} élevé à priorité Haute")
print(f"Nouvelle position : {result['newQueuePosition']} (était {result['previousPosition']})")
# Obtenir l'historique de la file pour analyse
history = manager.get_queue_history(
datetime.now() - timedelta(hours=24),
datetime.now(),
'hour',
['queue_size', 'wait_time', 'throughput']
)
print(f"Résumé 24h : {history['summary']['totalJobsProcessed']} travaux traités")
print(f"Pic de taille de la file : {history['summary']['peakQueueSize']}")
print(f"Débit moyen : {history['summary']['throughputPerHour']} travaux/heure")
# Si des goulets d'étranglement sont présents, les signaler
if history['bottlenecks']:
print("Goulets d'étranglement récents :")
for bottleneck in history['bottlenecks']:
print(f" {bottleneck['timeframe']} : {bottleneck['issue']} (Impact : {bottleneck['impact']})")
except Exception as e:
print(f"Erreur dans la gestion de la file : {e}")