Build Data Enrichment Workflows
Create and manage enrichment pipelines to transform and enhance your process mining datasets.
Get Pipeline Details
GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Retrieves comprehensive information about a specific enrichment pipeline including its stages, configuration, and execution metadata.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
tenantId |
GUID | Path | The tenant identifier |
projectId |
GUID | Path | The project identifier |
pipelineId |
GUID | Path | The pipeline identifier |
Response
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"pipelineDescription": "Enriches event logs with additional attributes and calculations",
"status": "Active",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"stageType": "Validation",
"order": 1,
"configuration": {
"validateCaseId": true,
"validateTimestamps": true,
"requireActivityNames": true
}
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"stageType": "TimeCalculation",
"order": 2,
"configuration": {
"addDayOfWeek": true,
"addBusinessHours": true,
"timezoneId": "UTC"
}
}
],
"triggers": {
"automatic": true,
"schedule": "0 2 * * *",
"onDataUpdate": true
},
"dateCreated": "2024-01-15T10:30:00Z",
"dateModified": "2024-01-20T14:45:00Z",
"createdBy": "user123",
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"executionCount": 45
}
List All Pipelines
GET /api/{tenantId}/{projectId}/enrichment/pipelines
Retrieves a list of all enrichment pipelines in the project with basic metadata and status information.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
status |
string | Filter by pipeline status: Active, Inactive, Failed |
page |
integer | Page number for pagination (default: 1) |
pageSize |
integer | Number of items per page (default: 20, max: 100) |
Response
{
"pipelines": [
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"status": "Active",
"stageCount": 5,
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"dateCreated": "2024-01-15T10:30:00Z"
}
],
"totalCount": 12,
"page": 1,
"pageSize": 20,
"hasNextPage": false
}
Create New Pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline
Creates a new enrichment pipeline with specified stages and configuration. The pipeline can be configured to run automatically or manually.
Request Body
{
"pipelineName": "Customer Journey Enrichment",
"pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
"stages": [
{
"stageName": "Customer Data Lookup",
"stageType": "DataLookup",
"order": 1,
"configuration": {
"lookupTable": "customer_demographics",
"joinKey": "customerId",
"selectFields": ["age", "segment", "region"]
}
},
{
"stageName": "Journey Metrics",
"stageType": "Calculation",
"order": 2,
"configuration": {
"calculations": [
{
"fieldName": "journeyDuration",
"formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
"groupBy": "caseId"
},
{
"fieldName": "touchpointCount",
"formula": "COUNT(*)",
"groupBy": "caseId"
}
]
}
}
],
"triggers": {
"automatic": false,
"schedule": null,
"onDataUpdate": true
}
}
Response
Returns 201 Created with the complete pipeline object including generated IDs and timestamps.
Update Pipeline
PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Updates an existing pipeline's configuration, stages, or triggers. Changes take effect on the next execution.
Request Body
{
"pipelineName": "Updated Customer Journey Enrichment",
"pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
"status": "Active",
"triggers": {
"automatic": true,
"schedule": "0 3 * * *",
"onDataUpdate": true
}
}
Response
Returns the updated pipeline object with the same structure as the GET endpoint.
Delete Pipeline
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Permanently removes a pipeline and all its execution history. This operation cannot be undone and will stop any currently running executions.
Response Codes
204 No Content- Pipeline deleted successfully404 Not Found- Pipeline not found or access denied409 Conflict- Pipeline is currently executing and cannot be deleted
Add Stage to Pipeline
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage
Adds a new processing stage to an existing pipeline. The stage will be inserted at the specified order position.
Request Body
{
"stageName": "Process Performance Metrics",
"stageType": "PerformanceCalculation",
"order": 3,
"configuration": {
"metrics": [
{
"name": "cycleTime",
"calculation": "CASE_DURATION",
"unit": "hours"
},
{
"name": "waitTime",
"calculation": "ACTIVITY_WAITING_TIME",
"unit": "hours"
}
],
"aggregations": ["AVG", "MAX", "MIN", "P95"]
}
}
Response
Returns 201 Created with the complete stage object including generated stage ID.
Remove Stage from Pipeline
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}
Removes a specific stage from the pipeline. Subsequent stages will be reordered automatically.
Response Codes
204 No Content- Stage removed successfully404 Not Found- Stage not found in pipeline409 Conflict- Cannot remove stage while pipeline is executing
Example: Complete Pipeline Workflow
This example demonstrates creating and managing an enrichment pipeline:
// 1. Create a new enrichment pipeline
const createPipeline = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
pipelineName: 'Order Processing Enrichment',
pipelineDescription: 'Enriches order data with fulfillment metrics',
stages: [
{
stageName: 'Order Validation',
stageType: 'Validation',
order: 1,
configuration: {
validateOrderId: true,
validateCustomerId: true,
validateAmounts: true
}
},
{
stageName: 'Fulfillment Time Calculation',
stageType: 'TimeCalculation',
order: 2,
configuration: {
startActivity: 'Order Received',
endActivity: 'Order Shipped',
outputField: 'fulfillmentTime'
}
}
],
triggers: {
automatic: true,
onDataUpdate: true
}
})
});
return await response.json();
};
// 2. Add a new stage to existing pipeline
const addStage = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
stageName: 'Customer Segmentation',
stageType: 'Classification',
order: 3,
configuration: {
segmentationRules: [
{
segment: 'VIP',
condition: 'orderValue > 1000'
},
{
segment: 'Regular',
condition: 'orderValue <= 1000'
}
]
}
})
});
return await response.json();
};
// 3. Get pipeline status
const getPipelineStatus = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
return await response.json();
};
Python Example
import requests
import json
from datetime import datetime
class EnrichmentPipelineManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def create_pipeline(self, name, description, stages, triggers=None):
"""Create a new enrichment pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
payload = {
'pipelineName': name,
'pipelineDescription': description,
'stages': stages,
'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_pipeline(self, pipeline_id):
"""Get pipeline details"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_pipelines(self, status=None, page=1, page_size=20):
"""List all pipelines with optional filtering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
"""Add a new stage to an existing pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
payload = {
'stageName': stage_name,
'stageType': stage_type,
'order': order,
'configuration': configuration
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
"""Update pipeline configuration"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
payload = {}
if name:
payload['pipelineName'] = name
if description:
payload['pipelineDescription'] = description
if status:
payload['status'] = status
if triggers:
payload['triggers'] = triggers
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def delete_pipeline(self, pipeline_id):
"""Delete a pipeline"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.delete(url, headers=self.headers)
return response.status_code == 204
# Usage example
manager = EnrichmentPipelineManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Create a comprehensive enrichment pipeline
stages = [
{
'stageName': 'Data Quality Check',
'stageType': 'Validation',
'order': 1,
'configuration': {
'checkDuplicates': True,
'validateTimestamps': True,
'checkMissingValues': True
}
},
{
'stageName': 'Process Mining Metrics',
'stageType': 'ProcessCalculation',
'order': 2,
'configuration': {
'calculateCycleTime': True,
'calculateWaitingTime': True,
'calculateResourceUtilization': True,
'detectBottlenecks': True
}
},
{
'stageName': 'Anomaly Detection',
'stageType': 'AnomalyDetection',
'order': 3,
'configuration': {
'algorithm': 'isolation_forest',
'threshold': 0.1,
'features': ['duration', 'cost', 'resourceCount']
}
}
]
pipeline = manager.create_pipeline(
'Comprehensive Process Analysis',
'End-to-end process analysis with anomaly detection',
stages,
{'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)
print(f"Created pipeline: {pipeline['pipelineId']}")
# List all active pipelines
active_pipelines = manager.list_pipelines(status='Active')
print(f"Found {active_pipelines['totalCount']} active pipelines")