Pipelines

Build Data Enrichment Workflows

Create and manage enrichment pipelines to transform and enhance your process mining datasets.

Get Pipeline Details

GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Retrieves comprehensive information about a specific enrichment pipeline including its stages, configuration, and execution metadata.

Parameters

Parameter Type Location Description
tenantId GUID Path The tenant identifier
projectId GUID Path The project identifier
pipelineId GUID Path The pipeline identifier

Response

{
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "pipelineName": "Process Mining Data Enrichment",
  "pipelineDescription": "Enriches event logs with additional attributes and calculations",
  "status": "Active",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "stageType": "Validation",
      "order": 1,
      "configuration": {
        "validateCaseId": true,
        "validateTimestamps": true,
        "requireActivityNames": true
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "stageType": "TimeCalculation",
      "order": 2,
      "configuration": {
        "addDayOfWeek": true,
        "addBusinessHours": true,
        "timezoneId": "UTC"
      }
    }
  ],
  "triggers": {
    "automatic": true,
    "schedule": "0 2 * * *",
    "onDataUpdate": true
  },
  "dateCreated": "2024-01-15T10:30:00Z",
  "dateModified": "2024-01-20T14:45:00Z",
  "createdBy": "user123",
  "lastExecutionDate": "2024-01-20T02:00:00Z",
  "lastExecutionStatus": "Success",
  "executionCount": 45
}

List All Pipelines

GET /api/{tenantId}/{projectId}/enrichment/pipelines

Retrieves a list of all enrichment pipelines in the project with basic metadata and status information.

Query Parameters

Parameter Type Description
status string Filter by pipeline status: Active, Inactive, Failed
page integer Page number for pagination (default: 1)
pageSize integer Number of items per page (default: 20, max: 100)

Response

{
  "pipelines": [
    {
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "status": "Active",
      "stageCount": 5,
      "lastExecutionDate": "2024-01-20T02:00:00Z",
      "lastExecutionStatus": "Success",
      "dateCreated": "2024-01-15T10:30:00Z"
    }
  ],
  "totalCount": 12,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": false
}

Create New Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline

Creates a new enrichment pipeline with specified stages and configuration. The pipeline can be configured to run automatically or manually.

Request Body

{
  "pipelineName": "Customer Journey Enrichment",
  "pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
  "stages": [
    {
      "stageName": "Customer Data Lookup",
      "stageType": "DataLookup",
      "order": 1,
      "configuration": {
        "lookupTable": "customer_demographics",
        "joinKey": "customerId",
        "selectFields": ["age", "segment", "region"]
      }
    },
    {
      "stageName": "Journey Metrics",
      "stageType": "Calculation",
      "order": 2,
      "configuration": {
        "calculations": [
          {
            "fieldName": "journeyDuration",
            "formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
            "groupBy": "caseId"
          },
          {
            "fieldName": "touchpointCount",
            "formula": "COUNT(*)",
            "groupBy": "caseId"
          }
        ]
      }
    }
  ],
  "triggers": {
    "automatic": false,
    "schedule": null,
    "onDataUpdate": true
  }
}

Response

Returns 201 Created with the complete pipeline object including generated IDs and timestamps.

Update Pipeline

PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Updates an existing pipeline's configuration, stages, or triggers. Changes take effect on the next execution.

Request Body

{
  "pipelineName": "Updated Customer Journey Enrichment",
  "pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
  "status": "Active",
  "triggers": {
    "automatic": true,
    "schedule": "0 3 * * *",
    "onDataUpdate": true
  }
}

Response

Returns the updated pipeline object with the same structure as the GET endpoint.

Delete Pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Permanently removes a pipeline and all its execution history. This operation cannot be undone and will stop any currently running executions.

Response Codes

  • 204 No Content - Pipeline deleted successfully
  • 404 Not Found - Pipeline not found or access denied
  • 409 Conflict - Pipeline is currently executing and cannot be deleted

Add Stage to Pipeline

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage

Adds a new processing stage to an existing pipeline. The stage will be inserted at the specified order position.

Request Body

{
  "stageName": "Process Performance Metrics",
  "stageType": "PerformanceCalculation",
  "order": 3,
  "configuration": {
    "metrics": [
      {
        "name": "cycleTime",
        "calculation": "CASE_DURATION",
        "unit": "hours"
      },
      {
        "name": "waitTime",
        "calculation": "ACTIVITY_WAITING_TIME",
        "unit": "hours"
      }
    ],
    "aggregations": ["AVG", "MAX", "MIN", "P95"]
  }
}

Response

Returns 201 Created with the complete stage object including generated stage ID.

Remove Stage from Pipeline

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}

Removes a specific stage from the pipeline. Subsequent stages will be reordered automatically.

Response Codes

  • 204 No Content - Stage removed successfully
  • 404 Not Found - Stage not found in pipeline
  • 409 Conflict - Cannot remove stage while pipeline is executing

Example: Complete Pipeline Workflow

This example demonstrates creating and managing an enrichment pipeline:

// 1. Create a new enrichment pipeline
const createPipeline = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      pipelineName: 'Order Processing Enrichment',
      pipelineDescription: 'Enriches order data with fulfillment metrics',
      stages: [
        {
          stageName: 'Order Validation',
          stageType: 'Validation',
          order: 1,
          configuration: {
            validateOrderId: true,
            validateCustomerId: true,
            validateAmounts: true
          }
        },
        {
          stageName: 'Fulfillment Time Calculation',
          stageType: 'TimeCalculation',
          order: 2,
          configuration: {
            startActivity: 'Order Received',
            endActivity: 'Order Shipped',
            outputField: 'fulfillmentTime'
          }
        }
      ],
      triggers: {
        automatic: true,
        onDataUpdate: true
      }
    })
  });

  return await response.json();
};

// 2. Add a new stage to existing pipeline
const addStage = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      stageName: 'Customer Segmentation',
      stageType: 'Classification',
      order: 3,
      configuration: {
        segmentationRules: [
          {
            segment: 'VIP',
            condition: 'orderValue > 1000'
          },
          {
            segment: 'Regular',
            condition: 'orderValue <= 1000'
          }
        ]
      }
    })
  });

  return await response.json();
};

// 3. Get pipeline status
const getPipelineStatus = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  return await response.json();
};

Python Example

import requests
import json
from datetime import datetime

class EnrichmentPipelineManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def create_pipeline(self, name, description, stages, triggers=None):
        """Create a new enrichment pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
        payload = {
            'pipelineName': name,
            'pipelineDescription': description,
            'stages': stages,
            'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_pipeline(self, pipeline_id):
        """Get pipeline details"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_pipelines(self, status=None, page=1, page_size=20):
        """List all pipelines with optional filtering"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
        params = {'page': page, 'pageSize': page_size}
        if status:
            params['status'] = status

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
        """Add a new stage to an existing pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
        payload = {
            'stageName': stage_name,
            'stageType': stage_type,
            'order': order,
            'configuration': configuration
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
        """Update pipeline configuration"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        payload = {}
        if name:
            payload['pipelineName'] = name
        if description:
            payload['pipelineDescription'] = description
        if status:
            payload['status'] = status
        if triggers:
            payload['triggers'] = triggers

        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def delete_pipeline(self, pipeline_id):
        """Delete a pipeline"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.delete(url, headers=self.headers)
        return response.status_code == 204

# Usage example
manager = EnrichmentPipelineManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Create a comprehensive enrichment pipeline
stages = [
    {
        'stageName': 'Data Quality Check',
        'stageType': 'Validation',
        'order': 1,
        'configuration': {
            'checkDuplicates': True,
            'validateTimestamps': True,
            'checkMissingValues': True
        }
    },
    {
        'stageName': 'Process Mining Metrics',
        'stageType': 'ProcessCalculation',
        'order': 2,
        'configuration': {
            'calculateCycleTime': True,
            'calculateWaitingTime': True,
            'calculateResourceUtilization': True,
            'detectBottlenecks': True
        }
    },
    {
        'stageName': 'Anomaly Detection',
        'stageType': 'AnomalyDetection',
        'order': 3,
        'configuration': {
            'algorithm': 'isolation_forest',
            'threshold': 0.1,
            'features': ['duration', 'cost', 'resourceCount']
        }
    }
]

pipeline = manager.create_pipeline(
    'Comprehensive Process Analysis',
    'End-to-end process analysis with anomaly detection',
    stages,
    {'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)

print(f"Created pipeline: {pipeline['pipelineId']}")

# List all active pipelines
active_pipelines = manager.list_pipelines(status='Active')
print(f"Found {active_pipelines['totalCount']} active pipelines")
An error has occurred. This application may no longer respond until reloaded. Reload ??