Pipeline Uitvoering
Voer Verrijkingspijplijnen Uit
Voer verrijkingspijplijnen uit op datasets, monitor de voortgang en haal verbeterde resultaten op.
Pipeline Uitvoeren
POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute
Start de uitvoering van een verrijkingspijplijn op een opgegeven dataset. De uitvoering wordt asynchroon uitgevoerd en retourneert een uitvoerings-ID voor het volgen van de voortgang.
Parameters
| Parameter | Type | Locatie | Beschrijving |
|---|---|---|---|
tenantId |
GUID | Pad | De tenant-identificator |
projectId |
GUID | Pad | De projectidentificator |
pipelineId |
GUID | Pad | De pijplijnidentificator |
Verzoek Body
{
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"executionName": "Monthly Process Analysis",
"executionDescription": "Enrichment for monthly performance review",
"parameters": {
"timeRange": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"filterCriteria": {
"includeWeekends": false,
"minCaseDuration": "1h"
},
"outputOptions": {
"includeRawData": true,
"generateSummary": true,
"exportFormat": "CSV"
}
},
"priority": "Normal",
"notifyOnCompletion": true
}
Antwoord
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"estimatedDuration": "15-20 minutes",
"executionName": "Monthly Process Analysis",
"dateSubmitted": "2024-01-20T10:30:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"status": "Pending",
"estimatedDuration": "2-3 minutes"
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Pending",
"estimatedDuration": "8-10 minutes"
}
]
}
Uitvoeringsstatus Opvragen
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Haalt de huidige status en voortgangsinformatie op voor een pijplijnuitvoering, inclusief gedetailleerde voortgang per fase.
Antwoord
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"progress": 45,
"currentStage": {
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"estimatedCompletion": "2024-01-20T10:45:00Z"
},
"executionName": "Monthly Process Analysis",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateStarted": "2024-01-20T10:32:00Z",
"estimatedCompletion": "2024-01-20T10:50:00Z",
"priority": "Normal",
"stages": [
{
"stageId": "stage-001",
"stageName": "Data Validation",
"status": "Completed",
"progress": 100,
"startTime": "2024-01-20T10:32:00Z",
"endTime": "2024-01-20T10:35:00Z",
"duration": "3 minutes",
"recordsProcessed": 15420,
"validationResults": {
"totalRecords": 15420,
"validRecords": 15418,
"errors": 2,
"warnings": 15
}
},
{
"stageId": "stage-002",
"stageName": "Time Enrichment",
"status": "Running",
"progress": 60,
"startTime": "2024-01-20T10:35:00Z",
"recordsProcessed": 9252,
"totalRecords": 15418
}
],
"metrics": {
"totalRecords": 15420,
"processedRecords": 9252,
"errorCount": 2,
"warningCount": 15
}
}
Uitvoeringsresultaten Opvragen
GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results
Haalt de definitieve resultaten op van een voltooide pijplijnuitvoering, inclusief verrijkte data, samenvattende statistieken en te downloaden output.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
format |
string | Responseformaat: summary, full, download (standaard: summary) |
includeRawData |
boolean | Inclusief originele dataset in de response (standaard: false) |
limit |
integer | Limiteer het aantal teruggegeven records (max: 10000) |
Antwoord
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionDate": "2024-01-20T10:48:00Z",
"totalDuration": "18 minutes",
"summary": {
"originalRecords": 15420,
"enrichedRecords": 15418,
"newAttributes": 8,
"dataQualityScore": 98.7,
"enrichmentCoverage": 99.9
},
"enrichedAttributes": [
{
"attributeName": "dayOfWeek",
"attributeType": "string",
"coverage": 100,
"uniqueValues": 7,
"description": "Dag van de week voor elk event"
},
{
"attributeName": "businessHours",
"attributeType": "boolean",
"coverage": 100,
"description": "Of het event tijdens kantooruren plaatsvond"
},
{
"attributeName": "cycleTime",
"attributeType": "duration",
"coverage": 99.8,
"averageValue": "4.2 hours",
"description": "Tijd van case-start tot voltooiing"
}
],
"dataQuality": {
"completeness": 99.9,
"accuracy": 98.5,
"consistency": 99.2,
"validity": 97.8,
"issues": [
{
"type": "Missing Timestamp",
"count": 2,
"severity": "High"
},
{
"type": "Invalid Duration",
"count": 15,
"severity": "Medium"
}
]
},
"downloadUrls": {
"enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
"summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
"dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
}
}
Lijst van Pipeline Uitvoeringen
GET /api/{tenantId}/{projectId}/enrichment/executions
Haalt een lijst op van alle pijplijnuitvoeringen met filter- en paginatiemogelijkheden. Handig om de uitvoeringgeschiedenis en prestaties te monitoren.
Query Parameters
| Parameter | Type | Beschrijving |
|---|---|---|
pipelineId |
GUID | Filter op specifieke pijplijn |
status |
string | Filter op status: Queued, Running, Completed, Failed, Cancelled |
dateFrom |
datetime | Filter uitvoeringen vanaf deze datum |
dateTo |
datetime | Filter uitvoeringen tot deze datum |
page |
integer | Paginanummer voor paginering (standaard: 1) |
pageSize |
integer | Aantal items per pagina (standaard: 20, max: 100) |
Antwoord
{
"executions": [
{
"executionId": "990e8400-e29b-41d4-a716-446655440000",
"pipelineId": "770e8400-e29b-41d4-a716-446655440000",
"pipelineName": "Process Mining Data Enrichment",
"executionName": "Monthly Process Analysis",
"status": "Completed",
"dateSubmitted": "2024-01-20T10:30:00Z",
"dateCompleted": "2024-01-20T10:48:00Z",
"duration": "18 minutes",
"recordsProcessed": 15418,
"priority": "Normal",
"submittedBy": "user123"
}
],
"totalCount": 47,
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Uitvoering Annuleren
DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}
Annuleert een lopende of in wachtrij staande pijplijnuitvoering. Voltooide fasen blijven behouden, maar de uitvoering stopt bij de huidige fase.
Verzoek Body (Optioneel)
{
"reason": "User requested cancellation",
"preservePartialResults": true
}
Antwoordcodes
200 OK- Uitvoering succesvol geannuleerd404 Not Found- Uitvoering niet gevonden409 Conflict- Uitvoering al voltooid of kan niet worden geannuleerd
Mislukte Uitvoering Opnieuw Starten
POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart
Start een mislukte pijplijnuitvoering opnieuw vanaf het punt van falen. Eerder voltooide fasen worden overgeslagen tenzij expliciet gevraagd opnieuw uit te voeren.
Verzoek Body
{
"restartFromStage": "stage-003",
"rerunCompletedStages": false,
"updateParameters": {
"retryFailedRecords": true,
"increaseTimeout": true
}
}
Antwoord
Retourneert 200 OK met een nieuw uitvoeringsobject met een bijgewerkte uitvoering-ID en status.
Voorbeeld: Volledige Uitvoeringsworkflow
Dit voorbeeld demonstreert het uitvoeren van een pijplijn en het monitoren van de voortgang:
// 1. Voer pijplijn uit
const executeEnrichment = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
datasetId: '880e8400-e29b-41d4-a716-446655440000',
executionName: 'Customer Journey Analysis',
executionDescription: 'Enriching customer data with journey metrics',
parameters: {
timeRange: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
outputOptions: {
includeRawData: true,
generateSummary: true,
exportFormat: 'CSV'
}
},
priority: 'High',
notifyOnCompletion: true
})
});
return await response.json();
};
// 2. Monitor de voortgang van de uitvoering
const monitorExecution = async (executionId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const execution = await response.json();
console.log(`Status: ${execution.status}, Voortgang: ${execution.progress}%`);
if (execution.status === 'Running' || execution.status === 'Queued') {
// Controleer over 30 seconden opnieuw
setTimeout(() => checkStatus(), 30000);
} else if (execution.status === 'Completed') {
console.log('Uitvoering succesvol voltooid!');
await getResults(executionId);
} else if (execution.status === 'Failed') {
console.log('Uitvoering mislukt:', execution.error);
}
};
await checkStatus();
};
// 3. Resultaten ophalen wanneer voltooid
const getResults = async (executionId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Verrijkingssamenvatting:', results.summary);
console.log('Download URLs:', results.downloadUrls);
return results;
};
// Voer de workflow uit
executeEnrichment()
.then(execution => {
console.log(`Uitvoering gestart: ${execution.executionId}`);
return monitorExecution(execution.executionId);
})
.catch(error => console.error('Uitvoering mislukt:', error));
Python Voorbeeld
import requests
import time
import json
from datetime import datetime, timedelta
class PipelineExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
"""Voer een verrijkingspijplijn uit"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
payload = {
'datasetId': dataset_id,
'executionName': execution_name,
'parameters': parameters or {},
'priority': priority,
'notifyOnCompletion': True
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_execution_status(self, execution_id):
"""Haal de huidige uitvoeringsstatus op"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
"""Wacht tot uitvoering voltooid is met periodieke statuscontroles"""
start_time = time.time()
while time.time() - start_time < timeout:
status = self.get_execution_status(execution_id)
print(f"Uitvoering {execution_id}: {status['status']} ({status.get('progress', 0)}%)")
if status['status'] in ['Completed', 'Failed', 'Cancelled']:
return status
time.sleep(poll_interval)
raise TimeoutError(f"Uitvoering {execution_id} is niet voltooid binnen {timeout} seconden")
def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
"""Haal uitvoeringsresultaten op"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
params = {
'format': format_type,
'includeRawData': include_raw_data
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_execution(self, execution_id, reason="User cancellation"):
"""Annuleer een lopende uitvoering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
payload = {
'reason': reason,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
"""Lijst van pijplijnuitvoeringen met filtermogelijkheden"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
params = {'page': page, 'pageSize': page_size}
if pipeline_id:
params['pipelineId'] = pipeline_id
if status:
params['status'] = status
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
# Gebruik voorbeeld
manager = PipelineExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
# Voer pijplijn uit met aangepaste parameters
execution_params = {
'timeRange': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'filterCriteria': {
'includeWeekends': False,
'minCaseDuration': '1h'
},
'outputOptions': {
'includeRawData': True,
'generateSummary': True,
'exportFormat': 'CSV'
}
}
try:
# Start uitvoering
execution = manager.execute_pipeline(
'pipeline-guid',
'dataset-guid',
'Monthly Process Analysis',
execution_params,
'High'
)
print(f"Uitvoering gestart: {execution['executionId']}")
print(f"Geschatte duur: {execution['estimatedDuration']}")
# Wacht op voltooiing
final_status = manager.wait_for_completion(execution['executionId'])
if final_status['status'] == 'Completed':
# Haal resultaten op
results = manager.get_execution_results(execution['executionId'])
print(f"Verrijking succesvol voltooid!")
print(f"Originele records: {results['summary']['originalRecords']}")
print(f"Verrijkte records: {results['summary']['enrichedRecords']}")
print(f"Data kwaliteitscore: {results['summary']['dataQualityScore']}")
print(f"Download verrijkte data: {results['downloadUrls']['enrichedDataset']}")
else:
print(f"Uitvoering mislukt met status: {final_status['status']}")
except Exception as e:
print(f"Fout bij uitvoeren van pijplijn: {e}")