Verrijkingspijplijnen
Bouw workflows voor gegevensverrijking
Maak en beheer verrijkingspijplijnen om uw process mining datasets te transformeren en te verrijken.
Haal details van pijplijn op
GET /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Haalt uitgebreide informatie op over een specifieke verrijkingspijplijn, inclusief de fasen, configuratie en uitvoeringsmetadata.
Parameters
| Parameter | Type | Locatie | Beschrijving |
|---|---|---|---|
tenantId |
GUID | Pad | De tenant-identificatie |
projectId |
GUID | Pad | De projectidentificatie |
pipelineId |
GUID | Pad | De pijplijnidentificatie |
Antwoord
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"pipelineDescription": "Enriches event logs with additional attributes and calculations",
"status": "Active",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"stageType": "Validation",
"order": 1,
"configuration": {
"validateCaseId": true,
"validateTimestamps": true,
"requireActivityNames": true
}
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"stageType": "TimeCalculation",
"order": 2,
"configuration": {
"addDayOfWeek": true,
"addBusinessHours": true,
"timezoneId": "UTC"
}
}
],
"triggers": {
"automatic": true,
"schedule": "0 2 * * *",
"onDataUpdate": true
},
"dateCreated": "2024-01-15T10:30:00Z",
"dateModified": "2024-01-20T14:45:00Z",
"createdBy": "user123",
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"executionCount": 45
}
Lijst van alle pijplijnen
GET /api/{tenantId}/{projectId}/enrichment/pipelines
Haalt een lijst op van alle verrijkingspijplijnen binnen het project met basisgegevens en statusinformatie.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
status |
string | Filter op pijplijnstatus: Active, Inactive, Failed |
page |
integer | Paginanummer voor paginering (standaard: 1) |
pageSize |
integer | Aantal items per pagina (standaard: 20, max: 100) |
Antwoord
{
"pipelines": [
{
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"status": "Active",
"stageCount": 5,
"lastExecutionDate": "2024-01-20T02:00:00Z",
"lastExecutionStatus": "Success",
"dateCreated": "2024-01-15T10:30:00Z"
}
],
"totalCount": 12,
"page": 1,
"pageSize": 20,
"hasNextPage": false
}
Maak nieuwe pijplijn
POST /api/{tenantId}/{projectId}/enrichment/pipeline
Maakt een nieuwe verrijkingspijplijn aan met opgegeven fasen en configuratie. De pijplijn kan automatisch of handmatig worden uitgevoerd.
Verzoek lichaam
{
"pipelineName": "Customer Journey Enrichment",
"pipelineDescription": "Enriches customer journey data with demographics and behavior patterns",
"stages": [
{
"stageName": "Customer Data Lookup",
"stageType": "DataLookup",
"order": 1,
"configuration": {
"lookupTable": "customer_demographics",
"joinKey": "customerId",
"selectFields": ["age", "segment", "region"]
}
},
{
"stageName": "Journey Metrics",
"stageType": "Calculation",
"order": 2,
"configuration": {
"calculations": [
{
"fieldName": "journeyDuration",
"formula": "LAST_TIMESTAMP - FIRST_TIMESTAMP",
"groupBy": "caseId"
},
{
"fieldName": "touchpointCount",
"formula": "COUNT(*)",
"groupBy": "caseId"
}
]
}
}
],
"triggers": {
"automatic": false,
"schedule": null,
"onDataUpdate": true
}
}
Antwoord
Geeft 201 Created terug met het volledige pijplijnobject inclusief gegenereerde IDs en tijdstempels.
Pijplijn bijwerken
PUT /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Wijzigt de configuratie, fasen of triggers van een bestaande pijplijn. Wijzigingen worden bij de volgende uitvoering toegepast.
Verzoek lichaam
{
"pipelineName": "Updated Customer Journey Enrichment",
"pipelineDescription": "Enhanced customer journey data enrichment with ML insights",
"status": "Active",
"triggers": {
"automatic": true,
"schedule": "0 3 * * *",
"onDataUpdate": true
}
}
Antwoord
Geeft het bijgewerkte pijplijnobject terug met dezelfde structuur als de GET endpoint.
Pijplijn verwijderen
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}
Verwijdert permanent een pijplijn en alle uitvoeringgeschiedenis. Deze bewerking kan niet ongedaan worden gemaakt en stopt lopende uitvoeringen.
Antwoord codes
204 No Content- Pijplijn succesvol verwijderd404 Not Found- Pijplijn niet gevonden of toegang geweigerd409 Conflict- Pijplijn wordt momenteel uitgevoerd en kan niet worden verwijderd
Fase toevoegen aan pijplijn
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage
Voegt een nieuwe verwerkingsfase toe aan een bestaande pijplijn. De fase wordt ingevoegd op de opgegeven volgorde.
Verzoek lichaam
{
"stageName": "Process Performance Metrics",
"stageType": "PerformanceCalculation",
"order": 3,
"configuration": {
"metrics": [
{
"name": "cycleTime",
"calculation": "CASE_DURATION",
"unit": "hours"
},
{
"name": "waitTime",
"calculation": "ACTIVITY_WAITING_TIME",
"unit": "hours"
}
],
"aggregations": ["AVG", "MAX", "MIN", "P95"]
}
}
Antwoord
Geeft 201 Created terug met het volledige fase-object inclusief gegenereerde fase-ID.
Fase verwijderen uit pijplijn
DELETE /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/stage/{stageId}
Verwijdert een specifieke fase uit de pijplijn. Volgende fasen worden automatisch opnieuw geordend.
Antwoord codes
204 No Content- Fase succesvol verwijderd404 Not Found- Fase niet gevonden in pijplijn409 Conflict- Kan fase niet verwijderen terwijl pijplijn wordt uitgevoerd
Voorbeeld: Complete pijplijnworkflow
Dit voorbeeld toont het aanmaken en beheren van een verrijkingspijplijn:
// 1. Maak een nieuwe verrijkingspijplijn
const createPipeline = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
pipelineName: 'Order Processing Enrichment',
pipelineDescription: 'Enriches order data with fulfillment metrics',
stages: [
{
stageName: 'Order Validation',
stageType: 'Validation',
order: 1,
configuration: {
validateOrderId: true,
validateCustomerId: true,
validateAmounts: true
}
},
{
stageName: 'Fulfillment Time Calculation',
stageType: 'TimeCalculation',
order: 2,
configuration: {
startActivity: 'Order Received',
endActivity: 'Order Shipped',
outputField: 'fulfillmentTime'
}
}
],
triggers: {
automatic: true,
onDataUpdate: true
}
})
});
return await response.json();
};
// 2. Voeg een nieuwe fase toe aan een bestaande pijplijn
const addStage = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}/stage`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
stageName: 'Customer Segmentation',
stageType: 'Classification',
order: 3,
configuration: {
segmentationRules: [
{
segment: 'VIP',
condition: 'orderValue > 1000'
},
{
segment: 'Regular',
condition: 'orderValue <= 1000'
}
]
}
})
});
return await response.json();
};
// 3. Haal pijplijnstatus op
const getPipelineStatus = async (pipelineId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/pipeline/${pipelineId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
return await response.json();
};
Python voorbeeld
import requests
import json
from datetime import datetime
class EnrichmentPipelineManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def create_pipeline(self, name, description, stages, triggers=None):
"""Maak een nieuwe verrijkingspijplijn aan"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline"
payload = {
'pipelineName': name,
'pipelineDescription': description,
'stages': stages,
'triggers': triggers or {'automatic': False, 'onDataUpdate': True}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_pipeline(self, pipeline_id):
"""Haal pijplijngegevens op"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_pipelines(self, status=None, page=1, page_size=20):
"""Lijst alle pijplijnen met optionele filtering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipelines"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def add_stage(self, pipeline_id, stage_name, stage_type, order, configuration):
"""Voeg een nieuwe fase toe aan een bestaande pijplijn"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/stage"
payload = {
'stageName': stage_name,
'stageType': stage_type,
'order': order,
'configuration': configuration
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def update_pipeline(self, pipeline_id, name=None, description=None, status=None, triggers=None):
"""Werk de pijplijnconfiguratie bij"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
payload = {}
if name:
payload['pipelineName'] = name
if description:
payload['pipelineDescription'] = description
if status:
payload['status'] = status
if triggers:
payload['triggers'] = triggers
response = requests.put(url, json=payload, headers=self.headers)
return response.json()
def delete_pipeline(self, pipeline_id):
"""Verwijder een pijplijn"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}"
response = requests.delete(url, headers=self.headers)
return response.status_code == 204
# Gebruiksvoorbeeld
manager = EnrichmentPipelineManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Maak een uitgebreide verrijkingspijplijn aan
stages = [
{
'stageName': 'Data Quality Check',
'stageType': 'Validation',
'order': 1,
'configuration': {
'checkDuplicates': True,
'validateTimestamps': True,
'checkMissingValues': True
}
},
{
'stageName': 'Process Mining Metrics',
'stageType': 'ProcessCalculation',
'order': 2,
'configuration': {
'calculateCycleTime': True,
'calculateWaitingTime': True,
'calculateResourceUtilization': True,
'detectBottlenecks': True
}
},
{
'stageName': 'Anomaly Detection',
'stageType': 'AnomalyDetection',
'order': 3,
'configuration': {
'algorithm': 'isolation_forest',
'threshold': 0.1,
'features': ['duration', 'cost', 'resourceCount']
}
}
]
pipeline = manager.create_pipeline(
'Comprehensive Process Analysis',
'End-to-end process analysis with anomaly detection',
stages,
{'automatic': True, 'schedule': '0 1 * * *', 'onDataUpdate': True}
)
print(f"Created pipeline: {pipeline['pipelineId']}")
# Lijst alle actieve pijplijnen op
active_pipelines = manager.list_pipelines(status='Active')
print(f"Found {active_pipelines['totalCount']} active pipelines")