Execute Enrichment Pipelines
Run enrichment pipelines on datasets, monitor progress, and retrieve enhanced results.
Execute Pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute
Triggers the execution of an enrichment pipeline on a specified dataset. The execution runs asynchronously and returns an execution ID for tracking progress.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
tenantId |
GUID | Path | The tenant identifier |
projectId |
GUID | Path | The project identifier |
pipelineId |
GUID | Path | The pipeline identifier |
Request Body
{
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"executionName": "Monthly Process Analysis",
"executionDescription": "Enrichment for monthly performance review",
"parameters": {
"timeRange": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"filterCriteria": {
"includeWeekends": false,
"minCaseDuration": "1h"
},
"outputOptions": {
"includeRawData": true,
"generateSummary": true,
"exportFormat": "CSV"
}
},
"priority": "Normal",
"notifyOnCompletion": true
}
Response
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"estimatedDuration": "15-20 minutes",
"executionName": "Monthly Process Analysis",
"dateSubmitted": "2024-01-20T10:30:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"status": "Pending",
"estimatedDuration": "2-3 minutes"
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Pending",
"estimatedDuration": "8-10 minutes"
}
]
}
Get Execution Status
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Retrieves the current status and progress information for a pipeline execution, including detailed stage-by-stage progress.
Response
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"progress": 45,
"currentStage": {
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"estimatedCompletion": "2024-01-20T10:45:00Z"
},
"executionName": "Monthly Process Analysis",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateStarted": "2024-01-20T10:32:00Z",
"estimatedCompletion": "2024-01-20T10:50:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"status": "Completed",
"progress": 100,
"startTime": "2024-01-20T10:32:00Z",
"endTime": "2024-01-20T10:35:00Z",
"duration": "3 minutes",
"recordsProcessed": 15420,
"validationResults": {
"totalRecords": 15420,
"validRecords": 15418,
"errors": 2,
"warnings": 15
}
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"recordsProcessed": 9252,
"totalRecords": 15418
}
],
"metrics": {
"totalRecords": 15420,
"processedRecords": 9252,
"errorCount": 2,
"warningCount": 15
}
}
Get Execution Results
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results
Retrieves the final results of a completed pipeline execution, including enriched data, summary statistics, and downloadable outputs.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
format |
string | Response format: summary, full, download (default: summary) |
includeRawData |
boolean | Include original dataset in response (default: false) |
limit |
integer | Limit number of records returned (max: 10000) |
Response
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionDate": "2024-01-20T10:48:00Z",
"totalDuration": "18 minutes",
"summary": {
"originalRecords": 15420,
"enrichedRecords": 15418,
"newAttributes": 8,
"dataQualityScore": 98.7,
"enrichmentCoverage": 99.9
},
"enrichedAttributes": [
{
"attributeName": "dayOfWeek",
"attributeType": "string",
"coverage": 100,
"uniqueValues": 7,
"description": "Day of the week for each event"
},
{
"attributeName": "businessHours",
"attributeType": "boolean",
"coverage": 100,
"description": "Whether event occurred during business hours"
},
{
"attributeName": "cycleTime",
"attributeType": "duration",
"coverage": 99.8,
"averageValue": "4.2 hours",
"description": "Time from case start to completion"
}
],
"dataQuality": {
"completeness": 99.9,
"accuracy": 98.5,
"consistency": 99.2,
"validity": 97.8,
"issues": [
{
"type": "Missing Timestamp",
"count": 2,
"severity": "High"
},
{
"type": "Invalid Duration",
"count": 15,
"severity": "Medium"
}
]
},
"downloadUrls": {
"enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
"summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
"dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
}
}
List Pipeline Executions
GET /api/{tenantId}/{projectId}/enrichment/executions
Retrieves a list of all pipeline executions with filtering and pagination options. Useful for monitoring execution history and performance.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
pipelineId |
GUID | Filter by specific pipeline |
status |
string | Filter by status: Queued, Running, Completed, Failed, Cancelled |
dateFrom |
datetime | Filter executions from this date |
dateTo |
datetime | Filter executions to this date |
page |
integer | Page number for pagination (default: 1) |
pageSize |
integer | Number of items per page (default: 20, max: 100) |
Response
{
"executions": [
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"executionName": "Monthly Process Analysis",
"status": "Completed",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateCompleted": "2024-01-20T10:48:00Z",
"duration": "18 minutes",
"recordsProcessed": 15418,
"priority": "Normal",
"submittedBy": "user123"
}
],
"totalCount": 47,
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Cancel Execution
DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Cancels a running or queued pipeline execution. Completed stages will be preserved, but the execution will stop at the current stage.
Request Body (Optional)
{
"reason": "User requested cancellation",
"preservePartialResults": true
}
Response Codes
200 OK- Execution cancelled successfully404 Not Found- Execution not found409 Conflict- Execution already completed or cannot be cancelled
Restart Failed Execution
POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart
Restarts a failed pipeline execution from the point of failure. Previously completed stages will be skipped unless explicitly requested to re-run.
Request Body
{
"restartFromStage": "stage-003",
"rerunCompletedStages": false,
"updateParameters": {
"retryFailedRecords": true,
"increaseTimeout": true
}
}
Response
Returns 200 OK with a new execution object containing updated execution ID and status.
Example: Complete Execution Workflow
This example demonstrates executing a pipeline and monitoring its progress:
// 1. Execute pipeline
const executeEnrichment = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
datasetId: '880e8400-e29b-41d4-a716-446655440000',
executionName: 'Customer Journey Analysis',
executionDescription: 'Enriching customer data with journey metrics',
parameters: {
timeRange: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
outputOptions: {
includeRawData: true,
generateSummary: true,
exportFormat: 'CSV'
}
},
priority: 'High',
notifyOnCompletion: true
})
});
return await response.json();
};
// 2. Monitor execution progress
const monitorExecution = async (executionId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const execution = await response.json();
console.log(`Status: ${execution.status}, Progress: ${execution.progress}%`);
if (execution.status === 'Running' || execution.status === 'Queued') {
// Check again in 30 seconds
setTimeout(() => checkStatus(), 30000);
} else if (execution.status === 'Completed') {
console.log('Execution completed successfully!');
await getResults(executionId);
} else if (execution.status === 'Failed') {
console.log('Execution failed:', execution.error);
}
};
await checkStatus();
};
// 3. Get results when completed
const getResults = async (executionId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Enrichment Summary:', results.summary);
console.log('Download URLs:', results.downloadUrls);
return results;
};
// Execute the workflow
executeEnrichment()
.then(execution => {
console.log(`Started execution: ${execution.executionId}`);
return monitorExecution(execution.executionId);
})
.catch(error => console.error('Execution failed:', error));
Python Example
import requests
import time
import json
from datetime import datetime, timedelta
class PipelineExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
"""Execute an enrichment pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
payload = {
'datasetId': dataset_id,
'executionName': execution_name,
'parameters': parameters or {},
'priority': priority,
'notifyOnCompletion': True
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_execution_status(self, execution_id):
"""Get current execution status"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
"""Wait for execution to complete with periodic status checks"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_execution_status(execution_id)
print(f"Execution {execution_id}: {status['status']} ({status.get('progress', 0)}%)")
if status['status'] in ['Completed', 'Failed', 'Cancelled']:
return status
time.sleep(poll_interval)
raise TimeoutError(f"Execution {execution_id} did not complete within {timeout} seconds")
def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
"""Get execution results"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
params = {
'format': format_type,
'includeRawData': include_raw_data
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_execution(self, execution_id, reason="User cancellation"):
"""Cancel a running execution"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
payload = {
'reason': reason,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
"""List pipeline executions with filtering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
params = {'page': page, 'pageSize': page_size}
if pipeline_id:
params['pipelineId'] = pipeline_id
if status:
params['status'] = status
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
# Usage example
manager = PipelineExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Execute pipeline with custom parameters
execution_params = {
'timeRange': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'filterCriteria': {
'includeWeekends': False,
'minCaseDuration': '1h'
},
'outputOptions': {
'includeRawData': True,
'generateSummary': True,
'exportFormat': 'CSV'
}
}
try:
# Start execution
execution = manager.execute_pipeline(
'pipeline-guid',
'dataset-guid',
'Monthly Process Analysis',
execution_params,
'High'
)
print(f"Started execution: {execution['executionId']}")
print(f"Estimated duration: {execution['estimatedDuration']}")
# Wait for completion
final_status = manager.wait_for_completion(execution['executionId'])
if final_status['status'] == 'Completed':
# Get results
results = manager.get_execution_results(execution['executionId'])
print(f"Enrichment completed successfully!")
print(f"Original records: {results['summary']['originalRecords']}")
print(f"Enriched records: {results['summary']['enrichedRecords']}")
print(f"Data quality score: {results['summary']['dataQualityScore']}")
print(f"Download enriched data: {results['downloadUrls']['enrichedDataset']}")
else:
print(f"Execution failed with status: {final_status['status']}")
except Exception as e:
print(f"Error executing pipeline: {e}")