パイプライン実行

エンリッチメントパイプラインの実行

データセットに対してエンリッチメントパイプラインを実行し、進行状況を監視し、強化された結果を取得します。

パイプラインの実行

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

指定されたデータセット上でエンリッチメントパイプラインの実行をトリガーします。実行は非同期で行われ、進行状況を追跡するための実行IDを返します。

パラメータ

パラメータ 種類 場所 説明
tenantId GUID パス テナント識別子
projectId GUID パス プロジェクト識別子
pipelineId GUID パス パイプライン識別子

リクエストボディ

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Monthly Process Analysis",
  "executionDescription": "Enrichment for monthly performance review",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

レスポンス

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 minutes",
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Pending",
      "estimatedDuration": "2-3 minutes"
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Pending",
      "estimatedDuration": "8-10 minutes"
    }
  ]
}

実行ステータスの取得

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

パイプライン実行の現在のステータスおよび進行状況情報を、詳細なステージごとの進捗情報と共に取得します。

レスポンス

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Time Enrichment",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 minutes",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

実行結果の取得

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

完了したパイプライン実行の最終結果を取得します。強化データ、要約統計、ダウンロード可能な出力が含まれます。

クエリパラメータ

パラメータ 種類 説明
format string レスポンスフォーマット:summary、full、download(デフォルト:summary)
includeRawData boolean 元のデータセットをレスポンスに含めるか(デフォルト:false)
limit integer 返却するレコード数の制限(最大:10000)

レスポンス

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 minutes",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "各イベントの曜日"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "イベントが営業時間中に発生したかどうか"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 hours",
      "description": "ケース開始から完了までの時間"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Missing Timestamp",
        "count": 2,
        "severity": "High"
      },
      {
        "type": "Invalid Duration",
        "count": 15,
        "severity": "Medium"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

パイプライン実行一覧の取得

GET /api/{tenantId}/{projectId}/enrichment/executions

すべてのパイプライン実行の一覧を、フィルタリングおよびページネーションオプション付きで取得します。実行履歴やパフォーマンスの監視に役立ちます。

クエリパラメータ

パラメータ 種類 説明
pipelineId GUID 特定のパイプラインでフィルタリング
status string ステータスでフィルタリング:Queued、Running、Completed、Failed、Cancelled
dateFrom datetime この日付からの実行をフィルタリング
dateTo datetime この日付までの実行をフィルタリング
page integer ページ番号(デフォルト:1)
pageSize integer 1ページあたりのアイテム数(デフォルト:20、最大:100)

レスポンス

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "executionName": "Monthly Process Analysis",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 minutes",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

実行のキャンセル

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

実行中またはキューに入っているパイプライン実行をキャンセルします。完了したステージは保持されますが、実行は現在のステージで停止します。

リクエストボディ(任意)

{
  "reason": "User requested cancellation",
  "preservePartialResults": true
}

レスポンスコード

  • 200 OK - 実行が正常にキャンセルされました
  • 404 Not Found - 実行が見つかりません
  • 409 Conflict - 実行はすでに完了しているか、キャンセルできません

失敗した実行の再起動

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

失敗したパイプライン実行を失敗箇所から再起動します。以前に完了したステージは、明示的に再実行を要求しない限りスキップされます。

リクエストボディ

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

レスポンス

更新された実行IDとステータスを含む新しい実行オブジェクトを返します(200 OK)。

例:完了までの実行ワークフロー

この例では、パイプラインの実行とその進捗の監視を示します:

// 1. パイプラインの実行
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Customer Journey Analysis',
      executionDescription: 'Enriching customer data with journey metrics',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. 実行進捗の監視
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Status: ${execution.status}, Progress: ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // 30秒後に再チェック
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Execution completed successfully!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Execution failed:', execution.error);
    }
  };

  await checkStatus();
};

// 3. 実行完了時に結果を取得
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Enrichment Summary:', results.summary);
  console.log('Download URLs:', results.downloadUrls);

  return results;
};

// ワークフローの実行
executeEnrichment()
  .then(execution => {
    console.log(`Started execution: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Execution failed:', error));

Python例

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """エンリッチメントパイプラインを実行する"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """現在の実行ステータスを取得する"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """定期的にステータスを確認しながら実行完了を待つ"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Execution {execution_id}: {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"Execution {execution_id} did not complete within {timeout} seconds")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """実行結果を取得する"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="User cancellation"):
        """実行中のジョブをキャンセルする"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """フィルタリング付きでパイプライン実行の一覧を取得する"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# 使用例
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# カスタムパラメータでパイプラインを実行
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # 実行開始
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Monthly Process Analysis',
        execution_params,
        'High'
    )

    print(f"Started execution: {execution['executionId']}")
    print(f"Estimated duration: {execution['estimatedDuration']}")

    # 完了を待機
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # 結果を取得
        results = manager.get_execution_results(execution['executionId'])
        print(f"Enrichment completed successfully!")
        print(f"Original records: {results['summary']['originalRecords']}")
        print(f"Enriched records: {results['summary']['enrichedRecords']}")
        print(f"Data quality score: {results['summary']['dataQualityScore']}")
        print(f"Download enriched data: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Execution failed with status: {final_status['status']}")

except Exception as e:
    print(f"Error executing pipeline: {e}")