Execution

Execute Enrichment Pipelines

Run enrichment pipelines on datasets, monitor progress, and retrieve enhanced results.

Execute Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Triggers the execution of an enrichment pipeline on a specified dataset. The execution runs asynchronously and returns an execution ID for tracking progress.

Parameters

Parameter Type Location Description
tenantId GUID Path The tenant identifier
projectId GUID Path The project identifier
pipelineId GUID Path The pipeline identifier

Request Body

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Monthly Process Analysis",
  "executionDescription": "Enrichment for monthly performance review",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Response

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 minutes",
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Pending",
      "estimatedDuration": "2-3 minutes"
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Pending",
      "estimatedDuration": "8-10 minutes"
    }
  ]
}

Get Execution Status

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Retrieves the current status and progress information for a pipeline execution, including detailed stage-by-stage progress.

Response

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Time Enrichment",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 minutes",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Get Execution Results

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Retrieves the final results of a completed pipeline execution, including enriched data, summary statistics, and downloadable outputs.

Query Parameters

Parameter Type Description
format string Response format: summary, full, download (default: summary)
includeRawData boolean Include original dataset in response (default: false)
limit integer Limit number of records returned (max: 10000)

Response

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 minutes",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Day of the week for each event"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Whether event occurred during business hours"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 hours",
      "description": "Time from case start to completion"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Missing Timestamp",
        "count": 2,
        "severity": "High"
      },
      {
        "type": "Invalid Duration",
        "count": 15,
        "severity": "Medium"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

List Pipeline Executions

GET /api/{tenantId}/{projectId}/enrichment/executions

Retrieves a list of all pipeline executions with filtering and pagination options. Useful for monitoring execution history and performance.

Query Parameters

Parameter Type Description
pipelineId GUID Filter by specific pipeline
status string Filter by status: Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Filter executions from this date
dateTo datetime Filter executions to this date
page integer Page number for pagination (default: 1)
pageSize integer Number of items per page (default: 20, max: 100)

Response

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "executionName": "Monthly Process Analysis",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 minutes",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Cancel Execution

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Cancels a running or queued pipeline execution. Completed stages will be preserved, but the execution will stop at the current stage.

Request Body (Optional)

{
  "reason": "User requested cancellation",
  "preservePartialResults": true
}

Response Codes

  • 200 OK - Execution cancelled successfully
  • 404 Not Found - Execution not found
  • 409 Conflict - Execution already completed or cannot be cancelled

Restart Failed Execution

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Restarts a failed pipeline execution from the point of failure. Previously completed stages will be skipped unless explicitly requested to re-run.

Request Body

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Response

Returns 200 OK with a new execution object containing updated execution ID and status.

Example: Complete Execution Workflow

This example demonstrates executing a pipeline and monitoring its progress:

// 1. Execute pipeline
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Customer Journey Analysis',
      executionDescription: 'Enriching customer data with journey metrics',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Monitor execution progress
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Status: ${execution.status}, Progress: ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // Check again in 30 seconds
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Execution completed successfully!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Execution failed:', execution.error);
    }
  };

  await checkStatus();
};

// 3. Get results when completed
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Enrichment Summary:', results.summary);
  console.log('Download URLs:', results.downloadUrls);

  return results;
};

// Execute the workflow
executeEnrichment()
  .then(execution => {
    console.log(`Started execution: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Execution failed:', error));

Python Example

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Execute an enrichment pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Get current execution status"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Wait for execution to complete with periodic status checks"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Execution {execution_id}: {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"Execution {execution_id} did not complete within {timeout} seconds")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Get execution results"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="User cancellation"):
        """Cancel a running execution"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """List pipeline executions with filtering"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Usage example
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Execute pipeline with custom parameters
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Start execution
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Monthly Process Analysis',
        execution_params,
        'High'
    )

    print(f"Started execution: {execution['executionId']}")
    print(f"Estimated duration: {execution['estimatedDuration']}")

    # Wait for completion
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Get results
        results = manager.get_execution_results(execution['executionId'])
        print(f"Enrichment completed successfully!")
        print(f"Original records: {results['summary']['originalRecords']}")
        print(f"Enriched records: {results['summary']['enrichedRecords']}")
        print(f"Data quality score: {results['summary']['dataQualityScore']}")
        print(f"Download enriched data: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Execution failed with status: {final_status['status']}")

except Exception as e:
    print(f"Error executing pipeline: {e}")
An error has occurred. This application may no longer respond until reloaded. Reload ??