ジョブキュー

実行キューの管理

ジョブ実行キューの表示と管理、優先度設定、ジョブスケジューリングの制御を行います。

キューのステータス取得

GET /api/{tenantId}/{projectId}/execution/queue

キューに入っているジョブ、優先度、推定処理時間を含む実行キューの現在の状態を取得します。

パラメーター

パラメーター タイプ 場所 説明
tenantId GUID Path テナントの識別子
projectId GUID Path プロジェクトの識別子

クエリパラメーター

パラメーター タイプ 説明
priority string 優先度でフィルター:Critical、High、Normal、Low
jobType string ジョブタイプでフィルター:ProcessMining、DataEnrichment、Notebook、Analysis
includeEstimates boolean 詳細なタイミング見積もりを含める(デフォルト:true)

レスポンス

{
  "queueStatus": "Active",
  "timestamp": "2024-01-20T10:45:00Z",
  "summary": {
    "totalQueuedJobs": 23,
    "criticalPriorityJobs": 2,
    "highPriorityJobs": 7,
    "normalPriorityJobs": 12,
    "lowPriorityJobs": 2,
    "averageWaitTime": "8.5 minutes",
    "estimatedProcessingTime": "47 minutes"
  },
  "processingCapacity": {
    "activeWorkers": 4,
    "totalWorkers": 6,
    "currentLoad": 67,
    "maxConcurrentJobs": 8,
    "currentlyRunning": 3
  },
  "queuedJobs": [
    {
      "jobId": "ff0e8400-e29b-41d4-a716-446655440000",
      "jobName": "Customer Analytics Pipeline",
      "jobType": "ProcessMining",
      "priority": "Critical",
      "queuePosition": 1,
      "estimatedStartTime": "2024-01-20T10:47:00Z",
      "estimatedDuration": "12-15 minutes",
      "submittedBy": "user456",
      "dateSubmitted": "2024-01-20T10:44:00Z",
      "resourceRequirements": {
        "cpuUnits": 2,
        "memoryGB": 4,
        "estimatedDiskUsage": "1.2 GB"
      }
    },
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "priority": "High",
      "queuePosition": 2,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedDuration": "8-10 minutes",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ],
  "performanceMetrics": {
    "averageJobDuration": "16.3 minutes",
    "throughputLastHour": 12,
    "queueTrends": {
      "currentHourSubmissions": 8,
      "peakHourToday": "09:00-10:00",
      "averageQueueSize": 15.7
    }
  }
}

優先度別ジョブ取得

GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}

特定の優先度でフィルターしたキューのジョブを、詳細な位置情報やタイミングとともに取得します。

パラメーター

パラメーター タイプ 場所 説明
priority string Path 優先度レベル:Critical、High、Normal、Low

レスポンス

{
  "priority": "High",
  "jobCount": 7,
  "averageWaitTime": "6.2 minutes",
  "estimatedProcessingTime": "31 minutes",
  "jobs": [
    {
      "jobId": "00fe8400-e29b-41d4-a716-446655440000",
      "jobName": "Daily Sales Analysis",
      "jobType": "DataEnrichment",
      "queuePosition": 2,
      "overallQueuePosition": 3,
      "estimatedStartTime": "2024-01-20T11:02:00Z",
      "estimatedCompletion": "2024-01-20T11:12:00Z",
      "submittedBy": "system",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "waitTime": "15 minutes",
      "dependencies": [],
      "resourceRequirements": {
        "cpuUnits": 1,
        "memoryGB": 2,
        "estimatedDiskUsage": "500 MB"
      }
    }
  ]
}

ジョブ優先度変更

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority

キュー内のジョブの優先度を更新します。これによりキュー内の位置や推定開始時間が変わる可能性があります。

リクエストボディ

{
  "newPriority": "Critical",
  "reason": "Business critical analysis required urgently",
  "notifyUser": true
}

レスポンス

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "previousPriority": "High",
  "newPriority": "Critical",
  "previousQueuePosition": 3,
  "newQueuePosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:47:00Z",
  "timeSaved": "15 minutes",
  "updatedBy": "user123",
  "updateTime": "2024-01-20T10:46:00Z"
}

ジョブ位置の移動

PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position

優先度階層内でジョブの位置を手動で調整します。位置変更は同じ優先度レベル内に限定されます。

リクエストボディ

{
  "newPosition": 1,
  "reason": "Dependencies resolved, can execute earlier",
  "respectPriorityBoundaries": true
}

レスポンス

{
  "jobId": "00fe8400-e29b-41d4-a716-446655440000",
  "priority": "High",
  "previousPosition": 3,
  "newPosition": 1,
  "previousEstimatedStart": "2024-01-20T11:02:00Z",
  "newEstimatedStart": "2024-01-20T10:55:00Z",
  "affectedJobs": [
    {
      "jobId": "11fe8400-e29b-41d4-a716-446655440000",
      "newPosition": 2,
      "newEstimatedStart": "2024-01-20T11:05:00Z"
    }
  ],
  "updateTime": "2024-01-20T10:46:00Z"
}

キュー処理の制御

POST /api/{tenantId}/{projectId}/execution/queue/control

メンテナンスや緊急時にキュー処理を一時停止または再開します。実行中のジョブは継続されますが、停止中は新規ジョブが開始されません。

リクエストボディ

{
  "action": "pause",
  "reason": "System maintenance window",
  "duration": 30,
  "allowRunningJobsToComplete": true,
  "notifyUsers": true,
  "scheduledResume": "2024-01-20T12:00:00Z"
}

レスポンス

{
  "action": "pause",
  "status": "Paused",
  "pausedAt": "2024-01-20T10:47:00Z",
  "scheduledResume": "2024-01-20T12:00:00Z",
  "affectedJobs": 23,
  "runningJobsCount": 3,
  "estimatedDelayMinutes": 30,
  "reason": "System maintenance window",
  "pausedBy": "admin123"
}

キュー履歴の取得

GET /api/{tenantId}/{projectId}/execution/queue/history

分析・最適化のために、過去のキューパフォーマンスデータとメトリクスを取得します。

クエリパラメーター

パラメーター タイプ 説明
dateFrom datetime 履歴データの開始日時
dateTo datetime 履歴データの終了日時
aggregation string データ集計レベル:hour、day、week(デフォルト:hour)
metrics string カンマ区切りメトリクス:queue_size、wait_time、throughput、efficiency

レスポンス

{
  "period": {
    "startDate": "2024-01-19T00:00:00Z",
    "endDate": "2024-01-20T10:47:00Z",
    "aggregation": "hour"
  },
  "summary": {
    "totalJobsProcessed": 847,
    "averageQueueSize": 12.3,
    "averageWaitTime": "7.8 minutes",
    "peakQueueSize": 45,
    "peakWaitTime": "23 minutes",
    "throughputPerHour": 24.8,
    "efficiency": 87.2
  },
  "hourlyData": [
    {
      "timestamp": "2024-01-20T09:00:00Z",
      "queueSize": {
        "average": 18,
        "peak": 25,
        "minimum": 8
      },
      "waitTime": {
        "average": "9.5 minutes",
        "maximum": "18 minutes",
        "minimum": "2 minutes"
      },
      "throughput": {
        "jobsCompleted": 28,
        "jobsSubmitted": 31,
        "efficiency": 89.3
      },
      "priorityDistribution": {
        "critical": 2,
        "high": 8,
        "normal": 14,
        "low": 1
      }
    }
  ],
  "trends": {
    "queueSizeGrowth": -2.3,
    "waitTimeImprovement": 5.7,
    "throughputIncrease": 12.1,
    "efficiencyChange": 3.4
  },
  "bottlenecks": [
    {
      "timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
      "issue": "High memory usage jobs accumulated",
      "impact": "15 minute delay",
      "resolution": "Additional worker allocated"
    }
  ]
}

ユーザーのキュー中ジョブをキャンセル

DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}

特定ユーザーが提出したキュー内の全ジョブをキャンセルします。実行中のジョブは完了まで継続されます。

リクエストボディ(オプション)

{
  "reason": "User account deactivated",
  "notifyUser": false,
  "cancelJobTypes": ["ProcessMining", "DataEnrichment"],
  "excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}

レスポンス

{
  "userId": "user123",
  "cancelledJobsCount": 5,
  "preservedJobsCount": 2,
  "cancelledJobs": [
    {
      "jobId": "job1-guid",
      "jobName": "Weekly Analysis",
      "priority": "Normal",
      "queuePosition": 8
    }
  ],
  "preservedJobs": [
    {
      "jobId": "important-job-id-1",
      "jobName": "Critical Business Report",
      "reason": "Explicitly excluded"
    }
  ],
  "cancelledAt": "2024-01-20T10:47:00Z",
  "cancelledBy": "admin123"
}

キュー予測の取得

GET /api/{tenantId}/{projectId}/execution/queue/predictions

AIを活用したキューの挙動予測、最適な提出時間、リソース割り当て推奨を提供します。

クエリパラメーター

パラメーター タイプ 説明
horizon integer 予測時間範囲(1-24時間、デフォルト:4)
jobType string 特定ジョブタイプの予測
includeRecommendations boolean 最適化推奨を含める(デフォルト:true)

レスポンス

{
  "predictionTime": "2024-01-20T10:47:00Z",
  "horizon": 4,
  "predictions": {
    "queueSizeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "expectedQueueSize": 18,
        "confidence": 0.87
      },
      {
        "time": "2024-01-20T12:00:00Z",
        "expectedQueueSize": 12,
        "confidence": 0.82
      }
    ],
    "waitTimeProjection": [
      {
        "time": "2024-01-20T11:00:00Z",
        "averageWaitTime": "6.5 minutes",
        "confidence": 0.85
      }
    ],
    "resourceUtilization": [
      {
        "time": "2024-01-20T11:00:00Z",
        "cpuUtilization": 78,
        "memoryUtilization": 65,
        "efficiency": 89.2
      }
    ]
  },
  "recommendations": {
    "optimalSubmissionTimes": [
      {
        "timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
        "expectedWaitTime": "3-5 minutes",
        "reason": "Low queue activity period"
      }
    ],
    "resourceOptimization": [
      {
        "recommendation": "Add 1 additional worker node",
        "expectedImprovement": "25% reduction in wait times",
        "cost": "Low",
        "priority": "Medium"
      }
    ],
    "jobScheduling": [
      {
        "jobType": "ProcessMining",
        "recommendation": "Schedule during off-peak hours (14:00-16:00)",
        "reason": "Memory-intensive jobs perform better with less contention"
      }
    ]
  },
  "modelInfo": {
    "modelVersion": "2.1.3",
    "lastTrained": "2024-01-19T02:00:00Z",
    "accuracy": 0.84,
    "dataPoints": 10080
  }
}

例:キュー管理ワークフロー

以下の例はジョブキューの監視と管理を示しています:

// 1. 現在のキューステータスを取得
const getQueueStatus = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const queue = await response.json();
  console.log(`Queue Status: ${queue.queueStatus}`);
  console.log(`Total jobs: ${queue.summary.totalQueuedJobs}`);
  console.log(`Average wait time: ${queue.summary.averageWaitTime}`);

  return queue;
};

// 2. 必要に応じてジョブ優先度を変更
const updateJobPriority = async (jobId, newPriority, reason) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
    method: 'PUT',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      newPriority: newPriority,
      reason: reason,
      notifyUser: true
    })
  });

  const result = await response.json();
  console.log(`Job ${jobId} priority changed from ${result.previousPriority} to ${result.newPriority}`);
  console.log(`New position: ${result.newQueuePosition} (was ${result.previousQueuePosition})`);
  console.log(`Time saved: ${result.timeSaved}`);

  return result;
};

// 3. 最適化のためキュー予測を取得
const getQueuePredictions = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const predictions = await response.json();
  console.log('Queue Predictions:');

  predictions.predictions.queueSizeProjection.forEach(prediction => {
    console.log(`  ${prediction.time}: ${prediction.expectedQueueSize} jobs (${Math.round(prediction.confidence * 100)}% confidence)`);
  });

  console.log('Recommendations:');
  predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
    console.log(`  Submit during: ${rec.timeWindow} (${rec.expectedWaitTime} wait)`);
  });

  return predictions;
};

// 4. 特定ジョブのキュー監視
const monitorJobInQueue = async (jobId) => {
  const checkQueue = async () => {
    const queue = await getQueueStatus();
    const job = queue.queuedJobs.find(j => j.jobId === jobId);

    if (job) {
      console.log(`Job ${jobId} is at position ${job.queuePosition}`);
      console.log(`Estimated start: ${job.estimatedStartTime}`);
      console.log(`Estimated duration: ${job.estimatedDuration}`);

      // 2分後に再チェック
      setTimeout(() => checkQueue(), 120000);
    } else {
      console.log(`Job ${jobId} is no longer in queue (likely started or cancelled)`);
    }
  };

  await checkQueue();
};

// 5. 緊急時のキュー管理(停止)
const pauseQueue = async (reason, duration) => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      action: 'pause',
      reason: reason,
      duration: duration,
      allowRunningJobsToComplete: true,
      notifyUsers: true
    })
  });

  const result = await response.json();
  console.log(`Queue paused: ${result.status}`);
  console.log(`${result.affectedJobs} jobs affected`);
  console.log(`Estimated delay: ${result.estimatedDelayMinutes} minutes`);

  return result;
};

// キュー管理ワークフローを実行
getQueueStatus()
  .then(queue => {
    console.log('Current queue status retrieved');

    // キューが長くなってきたか確認
    if (queue.summary.totalQueuedJobs > 30) {
      console.log('Queue is getting long, checking predictions...');
      return getQueuePredictions();
    }

    return null;
  })
  .then(predictions => {
    if (predictions) {
      console.log('Queue predictions retrieved');

      // 予測でさらに増加が見込まれるならリソース最適化を検討
      const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
      if (futureQueueSize.expectedQueueSize > 25) {
        console.log('Consider implementing resource optimization recommendations');
        predictions.recommendations.resourceOptimization.forEach(rec => {
          console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
        });
      }
    }
  })
  .catch(error => console.error('Queue management failed:', error));

Pythonの例

import requests
import time
import json
from datetime import datetime, timedelta

class QueueManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
        """現在のキューステータスを取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
        params = {'includeEstimates': str(include_estimates).lower()}

        if priority:
            params['priority'] = priority
        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def get_jobs_by_priority(self, priority):
        """優先度レベルでフィルターされたジョブを取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
        """キュー内ジョブの優先度を変更"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
        payload = {
            'newPriority': new_priority,
            'reason': reason,
            'notifyUser': notify_user
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def move_job_position(self, job_id, new_position, reason):
        """優先度階層内でジョブの位置を移動"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
        payload = {
            'newPosition': new_position,
            'reason': reason,
            'respectPriorityBoundaries': True
        }
        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def control_queue(self, action, reason, duration=None, scheduled_resume=None):
        """キュー処理を一時停止または再開"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/control"
        payload = {
            'action': action,
            'reason': reason,
            'allowRunningJobsToComplete': True,
            'notifyUsers': True
        }

        if duration:
            payload['duration'] = duration
        if scheduled_resume:
            payload['scheduledResume'] = scheduled_resume.isoformat()

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
        """過去のキューパフォーマンスデータを取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
        params = {
            'dateFrom': date_from.isoformat(),
            'dateTo': date_to.isoformat(),
            'aggregation': aggregation
        }

        if metrics:
            params['metrics'] = ','.join(metrics)

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
        """特定ユーザーのキュー内ジョブを全てキャンセル"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
        payload = {
            'reason': reason,
            'notifyUser': False
        }

        if job_types:
            payload['cancelJobTypes'] = job_types
        if exclude_job_ids:
            payload['excludeJobIds'] = exclude_job_ids

        response = requests.delete(url, json=payload, headers=self.headers)
        return response.json()

    def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
        """AIによるキュー予測を取得"""
        url = f"{self.base_url}/api/{self.tenantId}/{self.project_id}/execution/queue/predictions"
        params = {
            'horizon': horizon,
            'includeRecommendations': str(include_recommendations).lower()
        }

        if job_type:
            params['jobType'] = job_type

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def monitor_queue_health(self, alert_threshold=30, check_interval=300):
        """キューの状態を継続的に監視し問題があれば通知"""
        while True:
            try:
                queue_status = self.get_queue_status()
                total_jobs = queue_status['summary']['totalQueuedJobs']
                avg_wait = queue_status['summary']['averageWaitTime']

                print(f"Queue Health Check: {total_jobs} jobs, avg wait: {avg_wait}")

                if total_jobs > alert_threshold:
                    print(f"ALERT: Queue size ({total_jobs}) exceeds threshold ({alert_threshold})")

                    # 状況改善の予測確認
                    predictions = self.get_queue_predictions()
                    future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']

                    if future_size > total_jobs:
                        print("WARNING: Queue expected to grow further")
                        print("Resource optimization recommendations:")
                        for rec in predictions['recommendations']['resourceOptimization']:
                            print(f"  - {rec['recommendation']}: {rec['expectedImprovement']}")

                time.sleep(check_interval)

            except Exception as e:
                print(f"Queue monitoring error: {e}")
                time.sleep(60)

# 利用例
manager = QueueManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # 包括的なキューステータス取得
    queue_status = manager.get_queue_status(include_estimates=True)
    print(f"Queue Status: {queue_status['queueStatus']}")
    print(f"Total jobs in queue: {queue_status['summary']['totalQueuedJobs']}")
    print(f"Average wait time: {queue_status['summary']['averageWaitTime']}")
    print(f"Processing capacity: {queue_status['processingCapacity']['currentLoad']}%")

    # 優先度が高いジョブを確認
    high_priority_jobs = manager.get_jobs_by_priority('High')
    print(f"High priority jobs: {high_priority_jobs['jobCount']}")

    # 今後4時間のキュー予測取得
    predictions = manager.get_queue_predictions(horizon=4)
    print("Queue predictions:")
    for pred in predictions['predictions']['queueSizeProjection']:
        confidence_pct = round(pred['confidence'] * 100)
        print(f"  {pred['time']}: {pred['expectedQueueSize']} jobs ({confidence_pct}% confidence)")

    # 推奨事項の確認
    if predictions['recommendations']['optimalSubmissionTimes']:
        print("Optimal submission times:")
        for rec in predictions['recommendations']['optimalSubmissionTimes']:
            print(f"  {rec['timeWindow']}: {rec['expectedWaitTime']} wait time")

    # 必要に応じてジョブの優先度を上げる例
    if queue_status['summary']['totalQueuedJobs'] > 20:
        # 正常優先度のジョブを検索して昇格
        normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
        if normal_jobs:
            job_to_elevate = normal_jobs[0]
            result = manager.change_job_priority(
                job_to_elevate['jobId'],
                'High',
                'Queue congestion - elevating business critical job'
            )
            print(f"Elevated job {job_to_elevate['jobName']} to High priority")
            print(f"New position: {result['newQueuePosition']} (was {result['previousPosition']})")

    # 分析用にキュー履歴を取得
    history = manager.get_queue_history(
        datetime.now() - timedelta(hours=24),
        datetime.now(),
        'hour',
        ['queue_size', 'wait_time', 'throughput']
    )

    print(f"24h summary: {history['summary']['totalJobsProcessed']} jobs processed")
    print(f"Peak queue size: {history['summary']['peakQueueSize']}")
    print(f"Average throughput: {history['summary']['throughputPerHour']} jobs/hour")

    # ボトルネックがあれば報告
    if history['bottlenecks']:
        print("Recent bottlenecks:")
        for bottleneck in history['bottlenecks']:
            print(f"  {bottleneck['timeframe']}: {bottleneck['issue']} (Impact: {bottleneck['impact']})")

except Exception as e:
    print(f"Error in queue management: {e}")