ジョブキュー
実行キューの管理
ジョブ実行キューの表示と管理、優先度設定、ジョブスケジューリングの制御を行います。
キューのステータス取得
GET /api/{tenantId}/{projectId}/execution/queue
キューに入っているジョブ、優先度、推定処理時間を含む実行キューの現在の状態を取得します。
パラメーター
| パラメーター | タイプ | 場所 | 説明 |
|---|---|---|---|
tenantId |
GUID | Path | テナントの識別子 |
projectId |
GUID | Path | プロジェクトの識別子 |
クエリパラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
priority |
string | 優先度でフィルター:Critical、High、Normal、Low |
jobType |
string | ジョブタイプでフィルター:ProcessMining、DataEnrichment、Notebook、Analysis |
includeEstimates |
boolean | 詳細なタイミング見積もりを含める(デフォルト:true) |
レスポンス
{
"queueStatus": "Active",
"timestamp": "2024-01-20T10:45:00Z",
"summary": {
"totalQueuedJobs": 23,
"criticalPriorityJobs": 2,
"highPriorityJobs": 7,
"normalPriorityJobs": 12,
"lowPriorityJobs": 2,
"averageWaitTime": "8.5 minutes",
"estimatedProcessingTime": "47 minutes"
},
"processingCapacity": {
"activeWorkers": 4,
"totalWorkers": 6,
"currentLoad": 67,
"maxConcurrentJobs": 8,
"currentlyRunning": 3
},
"queuedJobs": [
{
"jobId": "ff0e8400-e29b-41d4-a716-446655440000",
"jobName": "Customer Analytics Pipeline",
"jobType": "ProcessMining",
"priority": "Critical",
"queuePosition": 1,
"estimatedStartTime": "2024-01-20T10:47:00Z",
"estimatedDuration": "12-15 minutes",
"submittedBy": "user456",
"dateSubmitted": "2024-01-20T10:44:00Z",
"resourceRequirements": {
"cpuUnits": 2,
"memoryGB": 4,
"estimatedDiskUsage": "1.2 GB"
}
},
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"priority": "High",
"queuePosition": 2,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedDuration": "8-10 minutes",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
],
"performanceMetrics": {
"averageJobDuration": "16.3 minutes",
"throughputLastHour": 12,
"queueTrends": {
"currentHourSubmissions": 8,
"peakHourToday": "09:00-10:00",
"averageQueueSize": 15.7
}
}
}
優先度別ジョブ取得
GET /api/{tenantId}/{projectId}/execution/queue/priority/{priority}
特定の優先度でフィルターしたキューのジョブを、詳細な位置情報やタイミングとともに取得します。
パラメーター
| パラメーター | タイプ | 場所 | 説明 |
|---|---|---|---|
priority |
string | Path | 優先度レベル:Critical、High、Normal、Low |
レスポンス
{
"priority": "High",
"jobCount": 7,
"averageWaitTime": "6.2 minutes",
"estimatedProcessingTime": "31 minutes",
"jobs": [
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"jobName": "Daily Sales Analysis",
"jobType": "DataEnrichment",
"queuePosition": 2,
"overallQueuePosition": 3,
"estimatedStartTime": "2024-01-20T11:02:00Z",
"estimatedCompletion": "2024-01-20T11:12:00Z",
"submittedBy": "system",
"dateSubmitted": "2024-01-20T10:30:00Z",
"waitTime": "15 minutes",
"dependencies": [],
"resourceRequirements": {
"cpuUnits": 1,
"memoryGB": 2,
"estimatedDiskUsage": "500 MB"
}
}
]
}
ジョブ優先度変更
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/priority
キュー内のジョブの優先度を更新します。これによりキュー内の位置や推定開始時間が変わる可能性があります。
リクエストボディ
{
"newPriority": "Critical",
"reason": "Business critical analysis required urgently",
"notifyUser": true
}
レスポンス
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"previousPriority": "High",
"newPriority": "Critical",
"previousQueuePosition": 3,
"newQueuePosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:47:00Z",
"timeSaved": "15 minutes",
"updatedBy": "user123",
"updateTime": "2024-01-20T10:46:00Z"
}
ジョブ位置の移動
PUT /api/{tenantId}/{projectId}/execution/queue/job/{jobId}/position
優先度階層内でジョブの位置を手動で調整します。位置変更は同じ優先度レベル内に限定されます。
リクエストボディ
{
"newPosition": 1,
"reason": "Dependencies resolved, can execute earlier",
"respectPriorityBoundaries": true
}
レスポンス
{
"jobId": "00fe8400-e29b-41d4-a716-446655440000",
"priority": "High",
"previousPosition": 3,
"newPosition": 1,
"previousEstimatedStart": "2024-01-20T11:02:00Z",
"newEstimatedStart": "2024-01-20T10:55:00Z",
"affectedJobs": [
{
"jobId": "11fe8400-e29b-41d4-a716-446655440000",
"newPosition": 2,
"newEstimatedStart": "2024-01-20T11:05:00Z"
}
],
"updateTime": "2024-01-20T10:46:00Z"
}
キュー処理の制御
POST /api/{tenantId}/{projectId}/execution/queue/control
メンテナンスや緊急時にキュー処理を一時停止または再開します。実行中のジョブは継続されますが、停止中は新規ジョブが開始されません。
リクエストボディ
{
"action": "pause",
"reason": "System maintenance window",
"duration": 30,
"allowRunningJobsToComplete": true,
"notifyUsers": true,
"scheduledResume": "2024-01-20T12:00:00Z"
}
レスポンス
{
"action": "pause",
"status": "Paused",
"pausedAt": "2024-01-20T10:47:00Z",
"scheduledResume": "2024-01-20T12:00:00Z",
"affectedJobs": 23,
"runningJobsCount": 3,
"estimatedDelayMinutes": 30,
"reason": "System maintenance window",
"pausedBy": "admin123"
}
キュー履歴の取得
GET /api/{tenantId}/{projectId}/execution/queue/history
分析・最適化のために、過去のキューパフォーマンスデータとメトリクスを取得します。
クエリパラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
dateFrom |
datetime | 履歴データの開始日時 |
dateTo |
datetime | 履歴データの終了日時 |
aggregation |
string | データ集計レベル:hour、day、week(デフォルト:hour) |
metrics |
string | カンマ区切りメトリクス:queue_size、wait_time、throughput、efficiency |
レスポンス
{
"period": {
"startDate": "2024-01-19T00:00:00Z",
"endDate": "2024-01-20T10:47:00Z",
"aggregation": "hour"
},
"summary": {
"totalJobsProcessed": 847,
"averageQueueSize": 12.3,
"averageWaitTime": "7.8 minutes",
"peakQueueSize": 45,
"peakWaitTime": "23 minutes",
"throughputPerHour": 24.8,
"efficiency": 87.2
},
"hourlyData": [
{
"timestamp": "2024-01-20T09:00:00Z",
"queueSize": {
"average": 18,
"peak": 25,
"minimum": 8
},
"waitTime": {
"average": "9.5 minutes",
"maximum": "18 minutes",
"minimum": "2 minutes"
},
"throughput": {
"jobsCompleted": 28,
"jobsSubmitted": 31,
"efficiency": 89.3
},
"priorityDistribution": {
"critical": 2,
"high": 8,
"normal": 14,
"low": 1
}
}
],
"trends": {
"queueSizeGrowth": -2.3,
"waitTimeImprovement": 5.7,
"throughputIncrease": 12.1,
"efficiencyChange": 3.4
},
"bottlenecks": [
{
"timeframe": "2024-01-20T08:30:00Z - 2024-01-20T09:15:00Z",
"issue": "High memory usage jobs accumulated",
"impact": "15 minute delay",
"resolution": "Additional worker allocated"
}
]
}
ユーザーのキュー中ジョブをキャンセル
DELETE /api/{tenantId}/{projectId}/execution/queue/user/{userId}
特定ユーザーが提出したキュー内の全ジョブをキャンセルします。実行中のジョブは完了まで継続されます。
リクエストボディ(オプション)
{
"reason": "User account deactivated",
"notifyUser": false,
"cancelJobTypes": ["ProcessMining", "DataEnrichment"],
"excludeJobIds": ["important-job-id-1", "important-job-id-2"]
}
レスポンス
{
"userId": "user123",
"cancelledJobsCount": 5,
"preservedJobsCount": 2,
"cancelledJobs": [
{
"jobId": "job1-guid",
"jobName": "Weekly Analysis",
"priority": "Normal",
"queuePosition": 8
}
],
"preservedJobs": [
{
"jobId": "important-job-id-1",
"jobName": "Critical Business Report",
"reason": "Explicitly excluded"
}
],
"cancelledAt": "2024-01-20T10:47:00Z",
"cancelledBy": "admin123"
}
キュー予測の取得
GET /api/{tenantId}/{projectId}/execution/queue/predictions
AIを活用したキューの挙動予測、最適な提出時間、リソース割り当て推奨を提供します。
クエリパラメーター
| パラメーター | タイプ | 説明 |
|---|---|---|
horizon |
integer | 予測時間範囲(1-24時間、デフォルト:4) |
jobType |
string | 特定ジョブタイプの予測 |
includeRecommendations |
boolean | 最適化推奨を含める(デフォルト:true) |
レスポンス
{
"predictionTime": "2024-01-20T10:47:00Z",
"horizon": 4,
"predictions": {
"queueSizeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"expectedQueueSize": 18,
"confidence": 0.87
},
{
"time": "2024-01-20T12:00:00Z",
"expectedQueueSize": 12,
"confidence": 0.82
}
],
"waitTimeProjection": [
{
"time": "2024-01-20T11:00:00Z",
"averageWaitTime": "6.5 minutes",
"confidence": 0.85
}
],
"resourceUtilization": [
{
"time": "2024-01-20T11:00:00Z",
"cpuUtilization": 78,
"memoryUtilization": 65,
"efficiency": 89.2
}
]
},
"recommendations": {
"optimalSubmissionTimes": [
{
"timeWindow": "2024-01-20T13:00:00Z - 2024-01-20T15:00:00Z",
"expectedWaitTime": "3-5 minutes",
"reason": "Low queue activity period"
}
],
"resourceOptimization": [
{
"recommendation": "Add 1 additional worker node",
"expectedImprovement": "25% reduction in wait times",
"cost": "Low",
"priority": "Medium"
}
],
"jobScheduling": [
{
"jobType": "ProcessMining",
"recommendation": "Schedule during off-peak hours (14:00-16:00)",
"reason": "Memory-intensive jobs perform better with less contention"
}
]
},
"modelInfo": {
"modelVersion": "2.1.3",
"lastTrained": "2024-01-19T02:00:00Z",
"accuracy": 0.84,
"dataPoints": 10080
}
}
例:キュー管理ワークフロー
以下の例はジョブキューの監視と管理を示しています:
// 1. 現在のキューステータスを取得
const getQueueStatus = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue?includeEstimates=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const queue = await response.json();
console.log(`Queue Status: ${queue.queueStatus}`);
console.log(`Total jobs: ${queue.summary.totalQueuedJobs}`);
console.log(`Average wait time: ${queue.summary.averageWaitTime}`);
return queue;
};
// 2. 必要に応じてジョブ優先度を変更
const updateJobPriority = async (jobId, newPriority, reason) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/queue/job/${jobId}/priority`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
newPriority: newPriority,
reason: reason,
notifyUser: true
})
});
const result = await response.json();
console.log(`Job ${jobId} priority changed from ${result.previousPriority} to ${result.newPriority}`);
console.log(`New position: ${result.newQueuePosition} (was ${result.previousQueuePosition})`);
console.log(`Time saved: ${result.timeSaved}`);
return result;
};
// 3. 最適化のためキュー予測を取得
const getQueuePredictions = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/predictions?horizon=4&includeRecommendations=true', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const predictions = await response.json();
console.log('Queue Predictions:');
predictions.predictions.queueSizeProjection.forEach(prediction => {
console.log(` ${prediction.time}: ${prediction.expectedQueueSize} jobs (${Math.round(prediction.confidence * 100)}% confidence)`);
});
console.log('Recommendations:');
predictions.recommendations.optimalSubmissionTimes.forEach(rec => {
console.log(` Submit during: ${rec.timeWindow} (${rec.expectedWaitTime} wait)`);
});
return predictions;
};
// 4. 特定ジョブのキュー監視
const monitorJobInQueue = async (jobId) => {
const checkQueue = async () => {
const queue = await getQueueStatus();
const job = queue.queuedJobs.find(j => j.jobId === jobId);
if (job) {
console.log(`Job ${jobId} is at position ${job.queuePosition}`);
console.log(`Estimated start: ${job.estimatedStartTime}`);
console.log(`Estimated duration: ${job.estimatedDuration}`);
// 2分後に再チェック
setTimeout(() => checkQueue(), 120000);
} else {
console.log(`Job ${jobId} is no longer in queue (likely started or cancelled)`);
}
};
await checkQueue();
};
// 5. 緊急時のキュー管理(停止)
const pauseQueue = async (reason, duration) => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/queue/control', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
action: 'pause',
reason: reason,
duration: duration,
allowRunningJobsToComplete: true,
notifyUsers: true
})
});
const result = await response.json();
console.log(`Queue paused: ${result.status}`);
console.log(`${result.affectedJobs} jobs affected`);
console.log(`Estimated delay: ${result.estimatedDelayMinutes} minutes`);
return result;
};
// キュー管理ワークフローを実行
getQueueStatus()
.then(queue => {
console.log('Current queue status retrieved');
// キューが長くなってきたか確認
if (queue.summary.totalQueuedJobs > 30) {
console.log('Queue is getting long, checking predictions...');
return getQueuePredictions();
}
return null;
})
.then(predictions => {
if (predictions) {
console.log('Queue predictions retrieved');
// 予測でさらに増加が見込まれるならリソース最適化を検討
const futureQueueSize = predictions.predictions.queueSizeProjection[predictions.predictions.queueSizeProjection.length - 1];
if (futureQueueSize.expectedQueueSize > 25) {
console.log('Consider implementing resource optimization recommendations');
predictions.recommendations.resourceOptimization.forEach(rec => {
console.log(`- ${rec.recommendation}: ${rec.expectedImprovement}`);
});
}
}
})
.catch(error => console.error('Queue management failed:', error));
Pythonの例
import requests
import time
import json
from datetime import datetime, timedelta
class QueueManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def get_queue_status(self, priority=None, job_type=None, include_estimates=True):
"""現在のキューステータスを取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue"
params = {'includeEstimates': str(include_estimates).lower()}
if priority:
params['priority'] = priority
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def get_jobs_by_priority(self, priority):
"""優先度レベルでフィルターされたジョブを取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/priority/{priority}"
response = requests.get(url, headers=self.headers)
return response.json()
def change_job_priority(self, job_id, new_priority, reason, notify_user=True):
"""キュー内ジョブの優先度を変更"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/priority"
payload = {
'newPriority': new_priority,
'reason': reason,
'notifyUser': notify_user
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def move_job_position(self, job_id, new_position, reason):
"""優先度階層内でジョブの位置を移動"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/job/{job_id}/position"
payload = {
'newPosition': new_position,
'reason': reason,
'respectPriorityBoundaries': True
}
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def control_queue(self, action, reason, duration=None, scheduled_resume=None):
"""キュー処理を一時停止または再開"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/control"
payload = {
'action': action,
'reason': reason,
'allowRunningJobsToComplete': True,
'notifyUsers': True
}
if duration:
payload['duration'] = duration
if scheduled_resume:
payload['scheduledResume'] = scheduled_resume.isoformat()
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_queue_history(self, date_from, date_to, aggregation='hour', metrics=None):
"""過去のキューパフォーマンスデータを取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/history"
params = {
'dateFrom': date_from.isoformat(),
'dateTo': date_to.isoformat(),
'aggregation': aggregation
}
if metrics:
params['metrics'] = ','.join(metrics)
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_user_jobs(self, user_id, reason, job_types=None, exclude_job_ids=None):
"""特定ユーザーのキュー内ジョブを全てキャンセル"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/queue/user/{user_id}"
payload = {
'reason': reason,
'notifyUser': False
}
if job_types:
payload['cancelJobTypes'] = job_types
if exclude_job_ids:
payload['excludeJobIds'] = exclude_job_ids
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_queue_predictions(self, horizon=4, job_type=None, include_recommendations=True):
"""AIによるキュー予測を取得"""
url = f"{self.base_url}/api/{self.tenantId}/{self.project_id}/execution/queue/predictions"
params = {
'horizon': horizon,
'includeRecommendations': str(include_recommendations).lower()
}
if job_type:
params['jobType'] = job_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def monitor_queue_health(self, alert_threshold=30, check_interval=300):
"""キューの状態を継続的に監視し問題があれば通知"""
while True:
try:
queue_status = self.get_queue_status()
total_jobs = queue_status['summary']['totalQueuedJobs']
avg_wait = queue_status['summary']['averageWaitTime']
print(f"Queue Health Check: {total_jobs} jobs, avg wait: {avg_wait}")
if total_jobs > alert_threshold:
print(f"ALERT: Queue size ({total_jobs}) exceeds threshold ({alert_threshold})")
# 状況改善の予測確認
predictions = self.get_queue_predictions()
future_size = predictions['predictions']['queueSizeProjection'][-1]['expectedQueueSize']
if future_size > total_jobs:
print("WARNING: Queue expected to grow further")
print("Resource optimization recommendations:")
for rec in predictions['recommendations']['resourceOptimization']:
print(f" - {rec['recommendation']}: {rec['expectedImprovement']}")
time.sleep(check_interval)
except Exception as e:
print(f"Queue monitoring error: {e}")
time.sleep(60)
# 利用例
manager = QueueManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# 包括的なキューステータス取得
queue_status = manager.get_queue_status(include_estimates=True)
print(f"Queue Status: {queue_status['queueStatus']}")
print(f"Total jobs in queue: {queue_status['summary']['totalQueuedJobs']}")
print(f"Average wait time: {queue_status['summary']['averageWaitTime']}")
print(f"Processing capacity: {queue_status['processingCapacity']['currentLoad']}%")
# 優先度が高いジョブを確認
high_priority_jobs = manager.get_jobs_by_priority('High')
print(f"High priority jobs: {high_priority_jobs['jobCount']}")
# 今後4時間のキュー予測取得
predictions = manager.get_queue_predictions(horizon=4)
print("Queue predictions:")
for pred in predictions['predictions']['queueSizeProjection']:
confidence_pct = round(pred['confidence'] * 100)
print(f" {pred['time']}: {pred['expectedQueueSize']} jobs ({confidence_pct}% confidence)")
# 推奨事項の確認
if predictions['recommendations']['optimalSubmissionTimes']:
print("Optimal submission times:")
for rec in predictions['recommendations']['optimalSubmissionTimes']:
print(f" {rec['timeWindow']}: {rec['expectedWaitTime']} wait time")
# 必要に応じてジョブの優先度を上げる例
if queue_status['summary']['totalQueuedJobs'] > 20:
# 正常優先度のジョブを検索して昇格
normal_jobs = [job for job in queue_status['queuedJobs'] if job['priority'] == 'Normal']
if normal_jobs:
job_to_elevate = normal_jobs[0]
result = manager.change_job_priority(
job_to_elevate['jobId'],
'High',
'Queue congestion - elevating business critical job'
)
print(f"Elevated job {job_to_elevate['jobName']} to High priority")
print(f"New position: {result['newQueuePosition']} (was {result['previousPosition']})")
# 分析用にキュー履歴を取得
history = manager.get_queue_history(
datetime.now() - timedelta(hours=24),
datetime.now(),
'hour',
['queue_size', 'wait_time', 'throughput']
)
print(f"24h summary: {history['summary']['totalJobsProcessed']} jobs processed")
print(f"Peak queue size: {history['summary']['peakQueueSize']}")
print(f"Average throughput: {history['summary']['throughputPerHour']} jobs/hour")
# ボトルネックがあれば報告
if history['bottlenecks']:
print("Recent bottlenecks:")
for bottleneck in history['bottlenecks']:
print(f" {bottleneck['timeframe']}: {bottleneck['issue']} (Impact: {bottleneck['impact']})")
except Exception as e:
print(f"Error in queue management: {e}")