Verrijkingspijplijnen

Bouw workflows voor gegevensverrijking

Maak en beheer verrijkingspijplijnen om uw process mining datasets te transformeren en te verrijken.

Haal details van pijplijn op

GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Haalt uitgebreide informatie op over een specifieke verrijkingspijplijn, inclusief de fasen, configuratie en uitvoeringsmetadata.

Parameters

Parameter Type Locatie Beschrijving
tenantId GUID Pad De tenant-identificatie
projectId GUID Pad De projectidentificatie
pipelineId GUID Pad De pijplijnidentificatie

Antwoord

{
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "pipelineName": "Process Mining Data Enrichment",
  "pipelineDescription": "Enriches event logs with additional attributes and calculations",
  "status": "Active",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "stageType": "Validation",
      "order": 1,
      "configuration": {
        "validateCaseId": true,
        "validateTimestamps": true,
        "requireActivityNames": true
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "stageType": "TimeCalculation",
      "order": 2,
      "configuration": {
        "addDayOfWeek": true,
        "addBusinessHours": true,
        "timezoneId": "UTC"
      }
    }
  ],
  "triggers": {
    "automatic": true,
    "schedule": "0 2 * * *",
    "onDataUpdate": true
  },
  "dateCreated": "2024-01-15T10:30:00Z",
  "dateModified": "2024-01-20T14:45:00Z",
  "createdBy": "user123",
  "lastExecutionDate": "2024-01-20T02:00:00Z",
  "lastExecutionStatus": "Success",
  "executionCount": 45
}

Lijst van alle pijplijnen

GET /api/{tenantId}/{projectId}/enrichment/pipelines

Haalt een lijst op van alle verrijkingspijplijnen binnen het project met basisgegevens en statusinformatie.

Query Parameters

Parameter Type Beschrijving
status string Filter op pijplijnstatus: Active, Inactive, Failed
page integer Paginanummer voor paginering (standaard: 1)
pageSize integer Aantal items per pagina (standaard: 20, max: 100)

Antwoord

{
  "pipelines": [
    {
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "status": "Active",
      "stageCount": 5,
      "lastExecutionDate": "2024-01-20T02:00:00Z",
      "lastExecutionStatus": "Success",
      "dateCreated": "2024-01-15T10:30:00Z"
    }
  ],
  "totalCount": 12,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": false
}

Maak nieuwe pijplijn

POST /api/{tenantId}/{projectId}/enrichment/pipeline

Maakt een nieuwe verrijkingspijplijn aan met opgegeven fasen en configuratie. De pijplijn kan automatisch of handmatig worden uitgevoerd.

Verzoek lichaam

{
  "pipelineName": "Customer Journey Enrichment",
  "pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
  "stages": [
    {
      "stageName": "Customer Data Lookup",
      "stageType": "DataLookup",
      "order": 1,
      "configuration": {
        "lookupTable": "customer_demographics",
        "joinKey": "customerId",
        "selectFields": ["age", "segment", "region"]
      }
    },
    {
      "stageName": "Journey Metrics",
      "stageType": "Calculation",
      "order": 2,
      "configuration": {
        "calculations": [
          {
            "fieldName": "journeyDuration",
            "formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
            "groupBy": "caseId"
          },
          {
            "fieldName": "touchpointCount",
            "formula": "COUNT(*)",
            "groupBy": "caseId"
          }
        ]
      }
    }
  ],
  "triggers": {
    "automatic": false,
    "schedule": null,
    "onDataUpdate": true
  }
}

Antwoord

Geeft 201 Created terug met het volledige pijplijnobject inclusief gegenereerde IDs en tijdstempels.

Pijplijn bijwerken

PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Wijzigt de configuratie, fasen of triggers van een bestaande pijplijn. Wijzigingen worden bij de volgende uitvoering toegepast.

Verzoek lichaam

{
  "pipelineName": "Updated Customer Journey Enrichment",
  "pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
  "status": "Active",
  "triggers": {
    "automatic": true,
    "schedule": "0 3 * * *",
    "onDataUpdate": true
  }
}

Antwoord

Geeft het bijgewerkte pijplijnobject terug met dezelfde structuur als de GET endpoint.

Pijplijn verwijderen

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}

Verwijdert permanent een pijplijn en alle uitvoeringgeschiedenis. Deze bewerking kan niet ongedaan worden gemaakt en stopt lopende uitvoeringen.

Antwoord codes

  • 204 No Content - Pijplijn succesvol verwijderd
  • 404 Not Found - Pijplijn niet gevonden of toegang geweigerd
  • 409 Conflict - Pijplijn wordt momenteel uitgevoerd en kan niet worden verwijderd

Fase toevoegen aan pijplijn

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage

Voegt een nieuwe verwerkingsfase toe aan een bestaande pijplijn. De fase wordt ingevoegd op de opgegeven volgorde.

Verzoek lichaam

{
  "stageName": "Process Performance Metrics",
  "stageType": "PerformanceCalculation",
  "order": 3,
  "configuration": {
    "metrics": [
      {
        "name": "cycleTime",
        "calculation": "CASE_DURATION",
        "unit": "hours"
      },
      {
        "name": "waitTime",
        "calculation": "ACTIVITY_WAITING_TIME",
        "unit": "hours"
      }
    ],
    "aggregations": ["AVG", "MAX", "MIN", "P95"]
  }
}

Antwoord

Geeft 201 Created terug met het volledige fase-object inclusief gegenereerde fase-ID.

Fase verwijderen uit pijplijn

DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}

Verwijdert een specifieke fase uit de pijplijn. Volgende fasen worden automatisch opnieuw geordend.

Antwoord codes

  • 204 No Content - Fase succesvol verwijderd
  • 404 Not Found - Fase niet gevonden in pijplijn
  • 409 Conflict - Kan fase niet verwijderen terwijl pijplijn wordt uitgevoerd

Voorbeeld: Complete pijplijnworkflow

Dit voorbeeld toont het aanmaken en beheren van een verrijkingspijplijn:

// 1. Maak een nieuwe verrijkingspijplijn
const createPipeline = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      pipelineName: 'Order Processing Enrichment',
      pipelineDescription: 'Enriches order data with fulfillment metrics',
      stages: [
        {
          stageName: 'Order Validation',
          stageType: 'Validation',
          order: 1,
          configuration: {
            validateOrderId: true,
            validateCustomerId: true,
            validateAmounts: true
          }
        },
        {
          stageName: 'Fulfillment Time Calculation',
          stageType: 'TimeCalculation',
          order: 2,
          configuration: {
            startActivity: 'Order Received',
            endActivity: 'Order Shipped',
            outputField: 'fulfillmentTime'
          }
        }
      ],
      triggers: {
        automatic: true,
        onDataUpdate: true
      }
    })
  });

  return await response.json();
};

// 2. Voeg een nieuwe fase toe aan een bestaande pijplijn
const addStage = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      stageName: 'Customer Segmentation',
      stageType: 'Classification',
      order: 3,
      configuration: {
        segmentationRules: [
          {
            segment: 'VIP',
            condition: 'orderValue > 1000'
          },
          {
            segment: 'Regular',
            condition: 'orderValue <= 1000'
          }
        ]
      }
    })
  });

  return await response.json();
};

// 3. Haal pijplijnstatus op
const getPipelineStatus = async (pipelineId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  return await response.json();
};

Python voorbeeld

import requests
import json
from datetime import datetime

class EnrichmentPipelineManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def create_pipeline(self, name, description, stages, triggers=None):
        """Maak een nieuwe verrijkingspijplijn aan"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
        payload = {
            'pipelineName': name,
            'pipelineDescription': description,
            'stages': stages,
            'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_pipeline(self, pipeline_id):
        """Haal pijplijngegevens op"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_pipelines(self, status=None, page=1, page_size=20):
        """Lijst alle pijplijnen met optionele filtering"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
        params = {'page': page, 'pageSize': page_size}
        if status:
            params['status'] = status

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
        """Voeg een nieuwe fase toe aan een bestaande pijplijn"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
        payload = {
            'stageName': stage_name,
            'stageType': stage_type,
            'order': order,
            'configuration': configuration
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
        """Werk de pijplijnconfiguratie bij"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        payload = {}
        if name:
            payload['pipelineName'] = name
        if description:
            payload['pipelineDescription'] = description
        if status:
            payload['status'] = status
        if triggers:
            payload['triggers'] = triggers

        response = requests.put(url, json=payload, headers=self.headers)
        return response.json()

    def delete_pipeline(self, pipeline_id):
        """Verwijder een pijplijn"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
        response = requests.delete(url, headers=self.headers)
        return response.status_code == 204

# Gebruiksvoorbeeld
manager = EnrichmentPipelineManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Maak een uitgebreide verrijkingspijplijn aan
stages = [
    {
        'stageName': 'Data Quality Check',
        'stageType': 'Validation',
        'order': 1,
        'configuration': {
            'checkDuplicates': True,
            'validateTimestamps': True,
            'checkMissingValues': True
        }
    },
    {
        'stageName': 'Process Mining Metrics',
        'stageType': 'ProcessCalculation',
        'order': 2,
        'configuration': {
            'calculateCycleTime': True,
            'calculateWaitingTime': True,
            'calculateResourceUtilization': True,
            'detectBottlenecks': True
        }
    },
    {
        'stageName': 'Anomaly Detection',
        'stageType': 'AnomalyDetection',
        'order': 3,
        'configuration': {
            'algorithm': 'isolation_forest',
            'threshold': 0.1,
            'features': ['duration', 'cost', 'resourceCount']
        }
    }
]

pipeline = manager.create_pipeline(
    'Comprehensive Process Analysis',
    'End-to-end process analysis with anomaly detection',
    stages,
    {'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)

print(f"Created pipeline: {pipeline['pipelineId']}")

# Lijst alle actieve pijplijnen op
active_pipelines = manager.list_pipelines(status='Active')
print(f"Found {active_pipelines['totalCount']} active pipelines")