実行

ジョブ実行API

プロセスマイニングジョブの実行管理と監視、非同期操作の処理、ジョブ進行状況のリアルタイム追跡を行います。

機能

ジョブキュー

ジョブキューと優先順位を管理します。

キューを表示

ジョブ追跡

ジョブのステータスと進捗を追跡します。

ジョブを追跡

非同期操作

長時間実行される非同期操作を処理します。

非同期操作

ジョブステータスの取得

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

任意の実行ジョブの現在のステータスと詳細情報(進行情報、実行メトリクス、完了ステータスを含む)を取得します。

パラメータ

パラメータ タイプ 場所 説明
tenantId GUID パス テナント識別子
projectId GUID パス プロジェクト識別子
jobId GUID パス ジョブ識別子

レスポンス

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "顧客ジャーニー分析",
  "jobDescription": "顧客接点と行動の包括的な分析",
  "status": "実行中",
  "priority": "高",
  "progress": {
    "percentage": 65,
    "currentStage": "データ処理",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8分32秒"
  },
  "resource": {
    "resourceType": "パイプライン",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "顧客分析パイプライン"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 レコード/秒"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

すべてのジョブ一覧取得

GET /api/{tenantId}/{projectId}/execution/jobs

プロジェクト内のすべての実行ジョブをページ分割付きで取得します。ステータス、ジョブタイプ、日付範囲でのフィルタリングが可能です。

クエリパラメータ

パラメータ タイプ 説明
status string ステータスでフィルタ:Queued, Running, Completed, Failed, Cancelled
jobType string ジョブタイプでフィルタ:ProcessMining, DataEnrichment, Notebook, Analysis
priority string 優先度でフィルタ:低, 通常, 高, 重大
submittedBy string ジョブを提出したユーザーでフィルタ
dateFrom datetime この日付以降のジョブをフィルタ
dateTo datetime この日付以前のジョブをフィルタ
page integer ページ番号(デフォルト: 1)
pageSize integer 1ページあたりの件数(デフォルト: 20、最大: 100)

レスポンス

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "顧客ジャーニー分析",
      "status": "実行中",
      "priority": "高",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "顧客分析パイプライン"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "日次売上エンリッチメント",
      "status": "完了",
      "priority": "通常",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 分",
      "submittedBy": "system",
      "resourceName": "売上データパイプライン"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

新規ジョブの提出

POST /api/{tenantId}/{projectId}/execution/job

新しい実行ジョブをシステムに提出します。ジョブは優先度とリソースの空き状況に基づいてキューに登録され処理されます。

リクエストボディ

{
  "jobName": "週次プロセス分析",
  "jobDescription": "プロセスパフォーマンスの自動週次分析",
  "jobType": "ProcessMining",
  "priority": "通常",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

レスポンス

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 分",
  "jobName": "週次プロセス分析",
  "priority": "通常",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

ジョブキャンセル

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

キューにあるジョブまたは実行中のジョブをキャンセルします。完了済みのジョブはキャンセルできません。実行中のジョブは可能な限り安全に停止されます。

リクエストボディ(任意)

{
  "reason": "ユーザーによるキャンセル要求",
  "forceTermination": false,
  "preservePartialResults": true
}

レスポンスコード

  • 200 OK - ジョブが正常にキャンセルされました
  • 404 Not Found - ジョブが見つかりません
  • 409 Conflict - ジョブは既に完了しているかキャンセルできません

ジョブ結果の取得

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

完了したジョブ実行の結果と出力を取得します。生成された成果物、レポート、データファイルなどが含まれます。

クエリパラメータ

パラメータ タイプ 説明
format string レスポンス形式:summary, detailed, download(デフォルト: summary)
includeArtifacts boolean ダウンロード可能な成果物を含めるか(デフォルト: true)
outputType string 出力タイプでフィルタ:reports, data, models, visualizations

レスポンス

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "完了",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 分",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "顧客ジャーニー分析レポート",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "顧客ジャーニーデータ強化版",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "顧客ジャーニープロセスマップ",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "ジャーニー予測モデル",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 分",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "一部のタイムスタンプは推定算出されました",
        "count": 125
      }
    ]
  }
}

失敗したジョブの再試行

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

失敗したジョブを再試行します。パラメータを変更可能です。同じもしくは更新した設定でジョブが再度キューに登録されます。

リクエストボディ

{
  "retryReason": "インフラ問題が解決されたため",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "高",
  "immediateExecution": false
}

レスポンス

更新されたジョブIDと再試行情報を含む新しいジョブオブジェクトと共に 200 OK を返します。

システム実行ステータスの取得

GET /api/{tenantId}/execution/system/status

リソース利用状況、キューの健全性、パフォーマンスメトリクスを含むシステム全体の現在の実行ステータスを取得します。

レスポンス

{
  "systemStatus": "正常",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "稼働中",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "稼働中",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 分",
    "estimatedProcessingTime": "23 分"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 分",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

例:完全なジョブ管理ワークフロー

この例ではジョブの提出、進行状況の監視、結果取得の手順を示します。

// 1. 新しいジョブを提出
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: '顧客行動分析',
      jobDescription: '顧客インタラクションパターンの週次分析',
      jobType: 'ProcessMining',
      priority: '高',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. ジョブの進捗を監視
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`ジョブ ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`現在のステージ: ${job.progress.currentStage}`);
    console.log(`ETA: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // 30秒ごとにチェック
    } else if (job.status === 'Completed') {
      console.log('ジョブは正常に完了しました!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('ジョブ失敗:', job.error);
    }
  };

  await checkStatus();
};

// 3. ジョブ結果を取得
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('ジョブ結果:', results.summary);
  console.log('主な出力:', results.results.primaryOutput.downloadUrl);

  // 追加出力のダウンロード
  for (const output of results.results.additionalOutputs) {
    console.log(`${output.type}をダウンロード: ${output.downloadUrl}`);
  }

  return results;
};

// 4. システムステータスを取得
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`システムステータス: ${status.systemStatus}`);
  console.log(`キュー内ジョブ数: ${status.queueStatistics.totalQueuedJobs} 件`);
  console.log(`平均待機時間: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// ワークフローを実行
submitJob()
  .then(job => {
    console.log(`ジョブを提出しました: ${job.jobId}`);
    console.log(`キュー内位置: ${job.queuePosition}`);
    console.log(`予定開始時間: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('ジョブワークフローでエラー:', error));

Pythonの例

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """新しい実行ジョブを提出"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """ジョブの現在のステータスを取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """フィルタ付きでジョブ一覧取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """定期的にステータス確認を行いジョブ完了を待つ"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"ジョブ {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  現在のステージ: {job['progress']['currentStage']}")
            print(f"  経過時間: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"ジョブ {job_id} は {timeout} 秒以内に完了しませんでした")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """ジョブ実行結果を取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="ユーザーキャンセル", force=False):
        """実行中またはキュー中のジョブをキャンセル"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="失敗後の再試行", priority=None, modify_params=None):
        """失敗したジョブを再試行"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """システム全体の実行ステータスを取得"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# 使用例
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # システムステータスの確認
    system_status = manager.get_system_status()
    print(f"システムステータス: {system_status['systemStatus']}")
    print(f"キュー内ジョブ数: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"平均待機時間: {system_status['queueStatistics']['averageWaitTime']}")

    # 包括的なプロセスマイニングジョブを提出
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        '月次プロセス分析',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        '高'
    )

    print(f"ジョブを提出しました: {job['jobId']}")
    print(f"キュー内位置: {job['queuePosition']}")
    print(f"予定開始時間: {job['estimatedStartTime']}")

    # 完了まで待機
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # 詳細結果を取得
        results = manager.get_job_results(job['jobId'])

        print("ジョブは正常に完了しました!")
        print(f"処理済みレコード数: {results['summary']['recordsProcessed']:,}")
        print(f"データ品質スコア: {results['summary']['dataQualityScore']}")
        print(f"処理効率: {results['summary']['processingEfficiency']}%")

        # プライマリレポートをダウンロード
        print(f"レポートをダウンロード: {results['results']['primaryOutput']['downloadUrl']}")

        # すべての追加出力を表示
        for output in results['results']['additionalOutputs']:
            print(f"{output['type']} をダウンロード: {output['downloadUrl']}")

    else:
        print(f"ジョブはステータス {final_job['status']} で失敗しました")
        if 'error' in final_job:
            print(f"エラー: {final_job['error']}")

except Exception as e:
    print(f"実行ワークフローでエラー発生: {e}")