実行
ジョブ実行API
プロセスマイニングジョブの実行管理と監視、非同期操作の処理、ジョブ進行状況のリアルタイム追跡を行います。
機能
ジョブキュー
ジョブキューと優先順位を管理します。
ジョブ追跡
ジョブのステータスと進捗を追跡します。
非同期操作
長時間実行される非同期操作を処理します。
ジョブステータスの取得
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
任意の実行ジョブの現在のステータスと詳細情報(進行情報、実行メトリクス、完了ステータスを含む)を取得します。
パラメータ
| パラメータ | タイプ | 場所 | 説明 |
|---|---|---|---|
tenantId |
GUID | パス | テナント識別子 |
projectId |
GUID | パス | プロジェクト識別子 |
jobId |
GUID | パス | ジョブ識別子 |
レスポンス
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "顧客ジャーニー分析",
"jobDescription": "顧客接点と行動の包括的な分析",
"status": "実行中",
"priority": "高",
"progress": {
"percentage": 65,
"currentStage": "データ処理",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8分32秒"
},
"resource": {
"resourceType": "パイプライン",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "顧客分析パイプライン"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 レコード/秒"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
すべてのジョブ一覧取得
GET /api/{tenantId}/{projectId}/execution/jobs
プロジェクト内のすべての実行ジョブをページ分割付きで取得します。ステータス、ジョブタイプ、日付範囲でのフィルタリングが可能です。
クエリパラメータ
| パラメータ | タイプ | 説明 |
|---|---|---|
status |
string | ステータスでフィルタ:Queued, Running, Completed, Failed, Cancelled |
jobType |
string | ジョブタイプでフィルタ:ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | 優先度でフィルタ:低, 通常, 高, 重大 |
submittedBy |
string | ジョブを提出したユーザーでフィルタ |
dateFrom |
datetime | この日付以降のジョブをフィルタ |
dateTo |
datetime | この日付以前のジョブをフィルタ |
page |
integer | ページ番号(デフォルト: 1) |
pageSize |
integer | 1ページあたりの件数(デフォルト: 20、最大: 100) |
レスポンス
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "顧客ジャーニー分析",
"status": "実行中",
"priority": "高",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "顧客分析パイプライン"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "日次売上エンリッチメント",
"status": "完了",
"priority": "通常",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 分",
"submittedBy": "system",
"resourceName": "売上データパイプライン"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
新規ジョブの提出
POST /api/{tenantId}/{projectId}/execution/job
新しい実行ジョブをシステムに提出します。ジョブは優先度とリソースの空き状況に基づいてキューに登録され処理されます。
リクエストボディ
{
"jobName": "週次プロセス分析",
"jobDescription": "プロセスパフォーマンスの自動週次分析",
"jobType": "ProcessMining",
"priority": "通常",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
レスポンス
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 分",
"jobName": "週次プロセス分析",
"priority": "通常",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
ジョブキャンセル
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
キューにあるジョブまたは実行中のジョブをキャンセルします。完了済みのジョブはキャンセルできません。実行中のジョブは可能な限り安全に停止されます。
リクエストボディ(任意)
{
"reason": "ユーザーによるキャンセル要求",
"forceTermination": false,
"preservePartialResults": true
}
レスポンスコード
200 OK- ジョブが正常にキャンセルされました404 Not Found- ジョブが見つかりません409 Conflict- ジョブは既に完了しているかキャンセルできません
ジョブ結果の取得
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
完了したジョブ実行の結果と出力を取得します。生成された成果物、レポート、データファイルなどが含まれます。
クエリパラメータ
| パラメータ | タイプ | 説明 |
|---|---|---|
format |
string | レスポンス形式:summary, detailed, download(デフォルト: summary) |
includeArtifacts |
boolean | ダウンロード可能な成果物を含めるか(デフォルト: true) |
outputType |
string | 出力タイプでフィルタ:reports, data, models, visualizations |
レスポンス
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "完了",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 分",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "顧客ジャーニー分析レポート",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "顧客ジャーニーデータ強化版",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "顧客ジャーニープロセスマップ",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "ジャーニー予測モデル",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 分",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "一部のタイムスタンプは推定算出されました",
"count": 125
}
]
}
}
失敗したジョブの再試行
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
失敗したジョブを再試行します。パラメータを変更可能です。同じもしくは更新した設定でジョブが再度キューに登録されます。
リクエストボディ
{
"retryReason": "インフラ問題が解決されたため",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "高",
"immediateExecution": false
}
レスポンス
更新されたジョブIDと再試行情報を含む新しいジョブオブジェクトと共に 200 OK を返します。
システム実行ステータスの取得
GET /api/{tenantId}/execution/system/status
リソース利用状況、キューの健全性、パフォーマンスメトリクスを含むシステム全体の現在の実行ステータスを取得します。
レスポンス
{
"systemStatus": "正常",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "稼働中",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "稼働中",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 分",
"estimatedProcessingTime": "23 分"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 分",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
例:完全なジョブ管理ワークフロー
この例ではジョブの提出、進行状況の監視、結果取得の手順を示します。
// 1. 新しいジョブを提出
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: '顧客行動分析',
jobDescription: '顧客インタラクションパターンの週次分析',
jobType: 'ProcessMining',
priority: '高',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. ジョブの進捗を監視
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`ジョブ ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`現在のステージ: ${job.progress.currentStage}`);
console.log(`ETA: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // 30秒ごとにチェック
} else if (job.status === 'Completed') {
console.log('ジョブは正常に完了しました!');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('ジョブ失敗:', job.error);
}
};
await checkStatus();
};
// 3. ジョブ結果を取得
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('ジョブ結果:', results.summary);
console.log('主な出力:', results.results.primaryOutput.downloadUrl);
// 追加出力のダウンロード
for (const output of results.results.additionalOutputs) {
console.log(`${output.type}をダウンロード: ${output.downloadUrl}`);
}
return results;
};
// 4. システムステータスを取得
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`システムステータス: ${status.systemStatus}`);
console.log(`キュー内ジョブ数: ${status.queueStatistics.totalQueuedJobs} 件`);
console.log(`平均待機時間: ${status.queueStatistics.averageWaitTime}`);
return status;
};
// ワークフローを実行
submitJob()
.then(job => {
console.log(`ジョブを提出しました: ${job.jobId}`);
console.log(`キュー内位置: ${job.queuePosition}`);
console.log(`予定開始時間: ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('ジョブワークフローでエラー:', error));
Pythonの例
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""新しい実行ジョブを提出"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""ジョブの現在のステータスを取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""フィルタ付きでジョブ一覧取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""定期的にステータス確認を行いジョブ完了を待つ"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"ジョブ {job_id}: {job['status']} ({job['progress']['percentage']}%)")
print(f" 現在のステージ: {job['progress']['currentStage']}")
print(f" 経過時間: {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"ジョブ {job_id} は {timeout} 秒以内に完了しませんでした")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""ジョブ実行結果を取得"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="ユーザーキャンセル", force=False):
"""実行中またはキュー中のジョブをキャンセル"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="失敗後の再試行", priority=None, modify_params=None):
"""失敗したジョブを再試行"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""システム全体の実行ステータスを取得"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# 使用例
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# システムステータスの確認
system_status = manager.get_system_status()
print(f"システムステータス: {system_status['systemStatus']}")
print(f"キュー内ジョブ数: {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"平均待機時間: {system_status['queueStatistics']['averageWaitTime']}")
# 包括的なプロセスマイニングジョブを提出
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'月次プロセス分析',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'高'
)
print(f"ジョブを提出しました: {job['jobId']}")
print(f"キュー内位置: {job['queuePosition']}")
print(f"予定開始時間: {job['estimatedStartTime']}")
# 完了まで待機
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# 詳細結果を取得
results = manager.get_job_results(job['jobId'])
print("ジョブは正常に完了しました!")
print(f"処理済みレコード数: {results['summary']['recordsProcessed']:,}")
print(f"データ品質スコア: {results['summary']['dataQualityScore']}")
print(f"処理効率: {results['summary']['processingEfficiency']}%")
# プライマリレポートをダウンロード
print(f"レポートをダウンロード: {results['results']['primaryOutput']['downloadUrl']}")
# すべての追加出力を表示
for output in results['results']['additionalOutputs']:
print(f"{output['type']} をダウンロード: {output['downloadUrl']}")
else:
print(f"ジョブはステータス {final_job['status']} で失敗しました")
if 'error' in final_job:
print(f"エラー: {final_job['error']}")
except Exception as e:
print(f"実行ワークフローでエラー発生: {e}")