Job-Warteschlange

Verwaltung der Ausführungswarteschlange

Zeigen Sie die Ausführungswarteschlange der Jobs an und verwalten Sie sie, legen Sie Prioritäten fest und steuern Sie die Job-Planung.

Warteschlangenstatus abrufen

GET /api/{tenantId}/{projectId}/execution/queue

Ruht den aktuellen Status der Ausführungswarteschlange ab, einschließlich der wartenden Jobs, ihrer Prioritäten und geschätzter Verarbeitungszeiten.

Parameter

Parameter Typ Ort Beschreibung
tenantId GUID Pfad Die Mandanten-ID
projectId GUID Pfad Die Projekt-ID

Abfrageparameter

Parameter Typ Beschreibung
priority string Nach Priorität filtern: Critical, High, Normal, Low
jobType string Nach Job-Typ filtern: ProcessMining, DataEnrichment, Notebook, Analysis
includeEstimates boolean Detaillierte Zeitabschätzungen einbeziehen (Standard: true)

Antwort

{
  "queueStatus": "Active",
  "timestamp": "2024-01-20T10:45:00Z",
  "summary": {
    "totalQueuedJobs": 23,
    "criticalPriorityJobs": 2,
    "highPriorityJobs": 7,
    "normalPriorityJobs": 12,
    "lowPriorityJobs": 2,
    "averageWaitTime": "8.5 minutes",
    "estimatedProcessingTime": "47 minutes"
  },
  "processingCapacity": {
    "activeWorkers": 4,
    "totalWorkers": 6,
    "currentLoad": 67,
    "maxConcurrentJobs": 8,
    "currentlyRunning": 3
  },
  "queuedJobs": [
    {
      "jobId": "ff0e8400-e29b-41d4-a716-446655440000",
      "jobName": "Customer Analytics Pipeline",
      "jobType": "ProcessMining",
      "priority": "Critical",
      "queuePosition": 1,
      "estimatedStartTime": "2024-01-20T10:47:00Z",
      "estimatedDuration": "12-15 minutes",
      "submittedBy": "user456",
      "dateSubmitted": "2024-01-20T10:44:00Z",
      "resourceRequirements": {
        "cpuUnits": 2,
        "memoryGB": 4,
        "estimatedDiskUsage": "1.2 GB"
      }
    },
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "priority": "High",
      "queuePosition": 2,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedDuration": "8-10 minutes",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ],
  "performanceMetrics": {
    "averageJobDuration": "16.3 minutes",
    "throughputLastHour": 12,
    "queueTrends": {
      "currentHourSubmissions": 8,
      "peakHourToday": "09:00-10:00",
      "averageQueueSize": 15.7
    }
  }
}

Jobs nach Priorität abrufen

GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}

Ruft Jobs in der Warteschlange ab, gefiltert nach einem bestimmten Prioritätsniveau mit detaillierten Positions- und Zeitinformationen.

Parameter

Parameter Typ Ort Beschreibung
priority string Pfad Prioritätsniveau: Critical, High, Normal, Low

Antwort

{
  "priority": "High",
  "jobCount": 7,
  "averageWaitTime": "6.2 minutes",
  "estimatedProcessingTime": "31 minutes",
  "jobs": [
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "queuePosition": 2,
      "overallQueuePosition": 3,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedCompletion": "2024-01-20T11:12:00Z",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "waitTime": "15 minutes",
      "dependencies": [],
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ]
}

Job-Priorität ändern

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority

Aktualisiert die Priorität eines wartenden Jobs, was seine Position in der Warteschlange und die geschätzte Startzeit ändern kann.

Anfrageinhalt

{
  "newPriority": "Critical",
  "reason": "Business critical analysis required urgently",
  "notifyUser": true
}

Antwort

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "previousPriority": "High",
  "newPriority": "Critical",
  "previousQueuePosition": 3,
  "newQueuePosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:47:00Z",
  "timeSaved": "15 minutes",
  "updatedBy": "user123",
  "updateTime": "2024-01-20T10:46:00Z"
}

Job-Position verschieben

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position

Verschiebt manuell die Position eines Jobs innerhalb seiner Prioritätsstufe. Positionsänderungen sind auf das gleiche Prioritätsniveau beschränkt.

Anfrageinhalt

{
  "newPosition": 1,
  "reason": "Dependencies resolved, can execute earlier",
  "respectPriorityBoundaries": true
}

Antwort

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "priority": "High",
  "previousPosition": 3,
  "newPosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:55:00Z",
  "affectedJobs": [
    {
      "jobId": "11fe8400-e29b-41d4-a716-446655440000",
      "newPosition": 2,
      "newEstimatedStart": "2024-01-20T11:05:00Z"
    }
  ],
  "updateTime": "2024-01-20T10:46:00Z"
}

Warteschlangenverarbeitung steuern

POST /api/{tenantId}/{projectId}/execution/queue/control

Pausiert oder setzt die Verarbeitung der Warteschlange für Wartungs- oder Notfallsituationen aus. Laufende Jobs werden beendet, aber keine neuen Jobs werden gestartet, wenn pausiert ist.

Anfrageinhalt

{
  "action": "pause",
  "reason": "System maintenance window",
  "duration": 30,
  "allowRunningJobsToComplete": true,
  "notifyUsers": true,
  "scheduledResume": "2024-01-20T12:00:00Z"
}

Antwort

{
  "action": "pause",
  "status": "Paused",
  "pausedAt": "2024-01-20T10:47:00Z",
  "scheduledResume": "2024-01-20T12:00:00Z",
  "affectedJobs": 23,
  "runningJobsCount": 3,
  "estimatedDelayMinutes": 30,
  "reason": "System maintenance window",
  "pausedBy": "admin123"
}

Warteschlangenhistorie abrufen

GET /api/{tenantId}/{projectId}/execution/queue/history

Ruht historische Leistungsdaten und Metriken der Warteschlange für Analyse- und Optimierungszwecke ab.

Abfrageparameter

Parameter Typ Beschreibung
dateFrom datetime Startdatum für historische Daten
dateTo datetime Enddatum für historische Daten
aggregation string Datenaggregationsniveau: hour, day, week (Standard: hour)
metrics string Kommagetrennte Metriken: queue_size, wait_time, throughput, efficiency

Antwort

{
  "period": {
    "startDate": "2024-01-19T00:00:00Z",
    "endDate": "2024-01-20T10:47:00Z",
    "aggregation": "hour"
  },
  "summary": {
    "totalJobsProcessed": 847,
    "averageQueueSize": 12.3,
    "averageWaitTime": "7.8 minutes",
    "peakQueueSize": 45,
    "peakWaitTime": "23 minutes",
    "throughputPerHour": 24.8,
    "efficiency": 87.2
  },
  "hourlyData": [
    {
      "timestamp": "2024-01-20T09:00:00Z",
      "queueSize": {
        "average": 18,
        "peak": 25,
        "minimum": 8
      },
      "waitTime": {
        "average": "9.5 minutes",
        "maximum": "18 minutes",
        "minimum": "2 minutes"
      },
      "throughput": {
        "jobsCompleted": 28,
        "jobsSubmitted": 31,
        "efficiency": 89.3
      },
      "priorityDistribution": {
        "critical": 2,
        "high": 8,
        "normal": 14,
        "low": 1
      }
    }
  ],
  "trends": {
    "queueSizeGrowth": -2.3,
    "waitTimeImprovement": 5.7,
    "throughputIncrease": 12.1,
    "efficiencyChange": 3.4
  },
  "bottlenecks": [
    {
      "timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
      "issue": "High memory usage jobs accumulated",
      "impact": "15 minute delay",
      "resolution": "Additional worker allocated"
    }
  ]
}

Wartende Jobs eines Benutzers stornieren

DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}

Storniert alle wartenden Jobs, die von einem bestimmten Benutzer eingereicht wurden. Laufende Jobs des Benutzers werden fortgesetzt.

Anfrageinhalt (Optional)

{
  "reason": "User account deactivated",
  "notifyUser": false,
  "cancelJobTypes": ["ProcessMining", "DataEnrichment"],
  "excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}

Antwort

{
  "userId": "user123",
  "cancelledJobsCount": 5,
  "preservedJobsCount": 2,
  "cancelledJobs": [
    {
      "jobId": "job1-guid",
      "jobName": "Weekly Analysis",
      "priority": "Normal",
      "queuePosition": 8
    }
  ],
  "preservedJobs": [
    {
      "jobId": "important-job-id-1",
      "jobName": "Critical Business Report",
      "reason": "Explicitly excluded"
    }
  ],
  "cancelledAt": "2024-01-20T10:47:00Z",
  "cancelledBy": "admin123"
}

Warteschlangenvorhersagen abrufen

GET /api/{tenantId}/{projectId}/execution/queue/predictions

Bietet KI-basierte Vorhersagen zum Warteschlangenverhalten, optimale Einreichungszeiten und Empfehlungen zur Ressourcenzuteilung.

Abfrageparameter

Parameter Typ Beschreibung
horizon integer Vorhersagehorizont in Stunden (1-24, Standard: 4)
jobType string Für spezifischen Job-Typ vorhersagen
includeRecommendations boolean Optimierungsempfehlungen einbeziehen (Standard: true)

Antwort

{
  "predictionTime": "2024-01-20T10:47:00Z",
  "horizon": 4,
  "predictions": {
    "queueSizeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "expectedQueueSize": 18,
        "confidence": 0.87
      },
      {
        "time": "2024-01-20T12:00:00Z",
        "expectedQueueSize": 12,
        "confidence": 0.82
      }
    ],
    "waitTimeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "averageWaitTime": "6.5 minutes",
        "confidence": 0.85
      }
    ],
    "resourceUtilization": [
      {
        "time": "2024-01-20T11:00:00Z",
        "cpuUtilization": 78,
        "memoryUtilization": 65,
        "efficiency": 89.2
      }
    ]
  },
  "recommendations": {
    "optimalSubmissionTimes": [
      {
        "timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
        "expectedWaitTime": "3-5 minutes",
        "reason": "Low queue activity period"
      }
    ],
    "resourceOptimization": [
      {
        "recommendation": "Add 1 additional worker node",
        "expectedImprovement": "25% reduction in wait times",
        "cost": "Low",
        "priority": "Medium"
      }
    ],
    "jobScheduling": [
      {
        "jobType": "ProcessMining",
        "recommendation": "Schedule during off-peak hours (14:00-16:00)",
        "reason": "Memory-intensive jobs perform better with less contention"
      }
    ]
  },
  "modelInfo": {
    "modelVersion": "2.1.3",
    "lastTrained": "2024-01-19T02:00:00Z",
    "accuracy": 0.84,
    "dataPoints": 10080
  }
}

Beispiel: Workflow zur Warteschlangenverwaltung

Dieses Beispiel zeigt, wie die Job-Warteschlange überwacht und verwaltet wird:

// 1. Aktuellen Warteschlangenstatus abrufen
const getQueueStatus = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const queue = await response.json();
  console.log(`Warteschlangenstatus: ${queue.queueStatus}`);
  console.log(`Gesamtanzahl Jobs: ${queue.summary.totalQueuedJobs}`);
  console.log(`Durchschnittliche Wartezeit: ${queue.summary.averageWaitTime}`);

  return queue;
};

// 2. Priorität des Jobs falls nötig ändern
const updateJobPriority = async (jobId, newPriority, reason) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
    method: 'PUT',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      newPriority: newPriority,
      reason: reason,
      notifyUser: true
    })
  });

  const result = await response.json();
  console.log(`Job ${jobId} Priorität geändert von ${result.previousPriority} zu ${result.newPriority}`);
  console.log(`Neue Position: ${result.newQueuePosition} (vorher ${result.previousQueuePosition})`);
  console.log(`Gesparte Zeit: ${result.timeSaved}`);

  return result;
};

// 3. Warteschlangenvorhersagen für Optimierung abrufen
const getQueuePredictions = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const predictions = await response.json();
  console.log('Warteschlangenvorhersagen:');

  predictions.predictions.queueSizeProjection.forEach(prediction => {
    console.log(`  ${prediction.time}: ${prediction.expectedQueueSize} Jobs (${Math.round(prediction.confidence * 100)}% Sicherheit)`);
  });

  console.log('Empfehlungen:');
  predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
    console.log(`  Einreichen während: ${rec.timeWindow} (${rec.expectedWaitTime} Wartezeit)`);
  });

  return predictions;
};

// 4. Ein bestimmtes Job in der Warteschlange überwachen
const monitorJobInQueue = async (jobId) => {
  const checkQueue = async () => {
    const queue = await getQueueStatus();
    const job = queue.queuedJobs.find(j => j.jobId === jobId);

    if (job) {
      console.log(`Job ${jobId} befindet sich auf Position ${job.queuePosition}`);
      console.log(`Geschätzter Start: ${job.estimatedStartTime}`);
      console.log(`Geschätzte Dauer: ${job.estimatedDuration}`);

      // Nach 2 Minuten erneut prüfen
      setTimeout(() => checkQueue(), 120000);
    } else {
      console.log(`Job ${jobId} ist nicht mehr in der Warteschlange (wahrscheinlich gestartet oder storniert)`);
    }
  };

  await checkQueue();
};

// 5. Notfall-Warteschlangenverwaltung
const pauseQueue = async (reason, duration) => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      action: 'pause',
      reason: reason,
      duration: duration,
      allowRunningJobsToComplete: true,
      notifyUsers: true
    })
  });

  const result = await response.json();
  console.log(`Warteschlange pausiert: ${result.status}`);
  console.log(`${result.affectedJobs} Jobs betroffen`);
  console.log(`Geschätzte Verzögerung: ${result.estimatedDelayMinutes} Minuten`);

  return result;
};

// Workflow zur Warteschlangenverwaltung ausführen
getQueueStatus()
  .then(queue => {
    console.log('Aktueller Warteschlangenstatus abgerufen');

    // Prüfen, ob Warteschlange lang wird
    if (queue.summary.totalQueuedJobs > 30) {
      console.log('Warteschlange wird lang, prüfe Vorhersagen...');
      return getQueuePredictions();
    }

    return null;
  })
  .then(predictions => {
    if (predictions) {
      console.log('Warteschlangenvorhersagen abgerufen');

      // Wenn Vorhersagen weiteres Wachstum zeigen, Ressourcenoptimierung erwägen
      const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
      if (futureQueueSize.expectedQueueSize > 25) {
        console.log('Ressourcenoptimierungsempfehlungen in Betracht ziehen');
        predictions.recommendations.resourceOptimization.forEach(rec => {
          console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
        });
      }
    }
  })
  .catch(error => console.error('Warteschlangenverwaltung fehlgeschlagen:', error));

Python-Beispiel

import requests
import time
import json
from datetime import datetime, timedelta

class QueueManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
        """Aktuellen Warteschlangenstatus abrufen"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
        params = {'includeEstimates': str(include_estimates).lower()}

        if priority:
            params['priority'] = priority
        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def get_jobs_by_priority(self, priority):
        """Jobs nach Prioritätsstufe filtern"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
        """Priorität eines wartenden Jobs ändern"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
        payload = {
            'newPriority': new_priority,
            'reason': reason,
            'notifyUser': notify_user
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def move_job_position(self, job_id, new_position, reason):
        """Job innerhalb seiner Prioritätsstufe neu positionieren"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
        payload = {
            'newPosition': new_position,
            'reason': reason,
            'respectPriorityBoundaries': True
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def control_queue(self, action, reason, duration=None, scheduled_resume=None):
        """Warteschlangenverarbeitung pausieren oder fortsetzen"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/control"
        payload = {
            'action': action,
            'reason': reason,
            'allowRunningJobsToComplete': True,
            'notifyUsers': True
        }

        if duration:
            payload['duration'] = duration
        if scheduled_resume:
            payload['scheduledResume'] = scheduled_resume.isoformat()

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
        """Historische Leistungsdaten der Warteschlange abrufen"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
        params = {
            'dateFrom': date_from.isoformat(),
            'dateTo': date_to.isoformat(),
            'aggregation': aggregation
        }

        if metrics:
            params['metrics'] = ','.join(metrics)

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
        """Alle wartenden Jobs eines bestimmten Benutzers stornieren"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
        payload = {
            'reason': reason,
            'notifyUser': False
        }

        if job_types:
            payload['cancelJobTypes'] = job_types
        if exclude_job_ids:
            payload['excludeJobIds'] = exclude_job_ids

        response = requests.delete(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
        """KI-basierte Warteschlangenvorhersagen abrufen"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/predictions"
        params = {
            'horizon': horizon,
            'includeRecommendations': str(include_recommendations).lower()
        }

        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def monitor_queue_health(self, alert_threshold=30, check_interval=300):
        """Kontinuierliche Überwachung der Warteschlangengesundheit und Benachrichtigung bei Problemen"""
        while True:
            try:
                queue_status = self.get_queue_status()
                total_jobs = queue_status['summary']['totalQueuedJobs']
                avg_wait = queue_status['summary']['averageWaitTime']

                print(f"Warteschlangengesundheits-Check: {total_jobs} Jobs, Durchschnittliche Wartezeit: {avg_wait}")

                if total_jobs > alert_threshold:
                    print(f"ALARM: Warteschlangengröße ({total_jobs}) überschreitet Schwelle ({alert_threshold})")

                    # Vorhersagen abrufen, um zu prüfen ob sich Situation verbessert
                    predictions = self.get_queue_predictions()
                    future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']

                    if future_size > total_jobs:
                        print("WARNUNG: Warteschlange wird voraussichtlich weiter wachsen")
                        print("Empfehlungen zur Ressourcenoptimierung:")
                        for rec in predictions['recommendations']['resourceOptimization']:
                            print(f"  - {rec['recommendation']}: {rec['expectedImprovement']}")

                time.sleep(check_interval)

            except Exception as e:
                print(f"Fehler bei der Warteschlangenüberwachung: {e}")
                time.sleep(60)

# Beispiel für Nutzung
manager = QueueManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Umfangreichen Warteschlangenstatus abrufen
    queue_status = manager.get_queue_status(include_estimates=True)
    print(f"Warteschlangenstatus: {queue_status['queueStatus']}")
    print(f"Gesamtanzahl wartender Jobs: {queue_status['summary']['totalQueuedJobs']}")
    print(f"Durchschnittliche Wartezeit: {queue_status['summary']['averageWaitTime']}")
    print(f"Verarbeitungskapazität: {queue_status['processingCapacity']['currentLoad']}%")

    # High-Priority-Jobs speziell abrufen
    high_priority_jobs = manager.get_jobs_by_priority('High')
    print(f"Anzahl Jobs mit hoher Priorität: {high_priority_jobs['jobCount']}")

    # Warteschlangenvorhersagen für die nächsten 4 Stunden abrufen
    predictions = manager.get_queue_predictions(horizon=4)
    print("Warteschlangenvorhersagen:")
    for pred in predictions['predictions']['queueSizeProjection']:
        confidence_pct = round(pred['confidence'] * 100)
        print(f"  {pred['time']}: {pred['expectedQueueSize']} Jobs ({confidence_pct}% Sicherheit)")

    # Empfehlungen prüfen
    if predictions['recommendations']['optimalSubmissionTimes']:
        print("Optimale Einreichungszeiten:")
        for rec in predictions['recommendations']['optimalSubmissionTimes']:
            print(f"  {rec['timeWindow']}: {rec['expectedWaitTime']} Wartezeit")

    # Beispiel: Job-Priorität bei Bedarf erhöhen
    if queue_status['summary']['totalQueuedJobs'] > 20:
        # Einen Job mit normaler Priorität finden, der erhöht werden kann
        normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
        if normal_jobs:
            job_to_elevate = normal_jobs[0]
            result = manager.change_job_priority(
                job_to_elevate['jobId'],
                'High',
                'Warteschlangenstau - Erhöhung für geschäftskritischen Job'
            )
            print(f"Job {job_to_elevate['jobName']} auf hohe Priorität gesetzt")
            print(f"Neue Position: {result['newQueuePosition']} (vorher {result['previousPosition']})")

    # Warteschlangenhistorie für Analyse abrufen
    history = manager.get_queue_history(
        datetime.now() - timedelta(hours=24),
        datetime.now(),
        'hour',
        ['queue_size', 'wait_time', 'throughput']
    )

    print(f"24h Zusammenfassung: {history['summary']['totalJobsProcessed']} verarbeitete Jobs")
    print(f"Höchste Warteschlangengröße: {history['summary']['peakQueueSize']}")
    print(f"Durchschnittlicher Durchsatz: {history['summary']['throughputPerHour']} Jobs/Stunde")

    # Wenn Engpässe vorliegen, diese melden
    if history['bottlenecks']:
        print("Aktuelle Engpässe:")
        for bottleneck in history['bottlenecks']:
            print(f"  {bottleneck['timeframe']}: {bottleneck['issue']} (Auswirkung: {bottleneck['impact']})")

except Exception as e:
    print(f"Fehler bei der Warteschlangenverwaltung: {e}")