Pipeline Uitvoering

Voer Verrijkingspijplijnen Uit

Voer verrijkingspijplijnen uit op datasets, monitor de voortgang en haal verbeterde resultaten op.

Pipeline Uitvoeren

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Start de uitvoering van een verrijkingspijplijn op een opgegeven dataset. De uitvoering wordt asynchroon uitgevoerd en retourneert een uitvoerings-ID voor het volgen van de voortgang.

Parameters

Parameter Type Locatie Beschrijving
tenantId GUID Pad De tenant-identificator
projectId GUID Pad De projectidentificator
pipelineId GUID Pad De pijplijnidentificator

Verzoek Body

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Monthly Process Analysis",
  "executionDescription": "Enrichment for monthly performance review",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Antwoord

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 minutes",
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Pending",
      "estimatedDuration": "2-3 minutes"
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Pending",
      "estimatedDuration": "8-10 minutes"
    }
  ]
}

Uitvoeringsstatus Opvragen

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Haalt de huidige status en voortgangsinformatie op voor een pijplijnuitvoering, inclusief gedetailleerde voortgang per fase.

Antwoord

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Time Enrichment",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Monthly Process Analysis",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Data Validation",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 minutes",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Time Enrichment",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Uitvoeringsresultaten Opvragen

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Haalt de definitieve resultaten op van een voltooide pijplijnuitvoering, inclusief verrijkte data, samenvattende statistieken en te downloaden output.

Query Parameters

Parameter Type Beschrijving
format string Responseformaat: summary, full, download (standaard: summary)
includeRawData boolean Inclusief originele dataset in de response (standaard: false)
limit integer Limiteer het aantal teruggegeven records (max: 10000)

Antwoord

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 minutes",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Dag van de week voor elk event"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Of het event tijdens kantooruren plaatsvond"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 hours",
      "description": "Tijd van case-start tot voltooiing"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Missing Timestamp",
        "count": 2,
        "severity": "High"
      },
      {
        "type": "Invalid Duration",
        "count": 15,
        "severity": "Medium"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

Lijst van Pipeline Uitvoeringen

GET /api/{tenantId}/{projectId}/enrichment/executions

Haalt een lijst op van alle pijplijnuitvoeringen met filter- en paginatiemogelijkheden. Handig om de uitvoeringgeschiedenis en prestaties te monitoren.

Query Parameters

Parameter Type Beschrijving
pipelineId GUID Filter op specifieke pijplijn
status string Filter op status: Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Filter uitvoeringen vanaf deze datum
dateTo datetime Filter uitvoeringen tot deze datum
page integer Paginanummer voor paginering (standaard: 1)
pageSize integer Aantal items per pagina (standaard: 20, max: 100)

Antwoord

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Process Mining Data Enrichment",
      "executionName": "Monthly Process Analysis",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 minutes",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Uitvoering Annuleren

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Annuleert een lopende of in wachtrij staande pijplijnuitvoering. Voltooide fasen blijven behouden, maar de uitvoering stopt bij de huidige fase.

Verzoek Body (Optioneel)

{
  "reason": "User requested cancellation",
  "preservePartialResults": true
}

Antwoordcodes

  • 200 OK - Uitvoering succesvol geannuleerd
  • 404 Not Found - Uitvoering niet gevonden
  • 409 Conflict - Uitvoering al voltooid of kan niet worden geannuleerd

Mislukte Uitvoering Opnieuw Starten

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Start een mislukte pijplijnuitvoering opnieuw vanaf het punt van falen. Eerder voltooide fasen worden overgeslagen tenzij expliciet gevraagd opnieuw uit te voeren.

Verzoek Body

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Antwoord

Retourneert 200 OK met een nieuw uitvoeringsobject met een bijgewerkte uitvoering-ID en status.

Voorbeeld: Volledige Uitvoeringsworkflow

Dit voorbeeld demonstreert het uitvoeren van een pijplijn en het monitoren van de voortgang:

// 1. Voer pijplijn uit
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Customer Journey Analysis',
      executionDescription: 'Enriching customer data with journey metrics',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Monitor de voortgang van de uitvoering
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Status: ${execution.status}, Voortgang: ${execution.progress}%`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // Controleer over 30 seconden opnieuw
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Uitvoering succesvol voltooid!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Uitvoering mislukt:', execution.error);
    }
  };

  await checkStatus();
};

// 3. Resultaten ophalen wanneer voltooid
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Verrijkingssamenvatting:', results.summary);
  console.log('Download URLs:', results.downloadUrls);

  return results;
};

// Voer de workflow uit
executeEnrichment()
  .then(execution => {
    console.log(`Uitvoering gestart: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Uitvoering mislukt:', error));

Python Voorbeeld

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Voer een verrijkingspijplijn uit"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Haal de huidige uitvoeringsstatus op"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Wacht tot uitvoering voltooid is met periodieke statuscontroles"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"Uitvoering {execution_id}: {status['status']} ({status.get('progress', 0)}%)")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"Uitvoering {execution_id} is niet voltooid binnen {timeout} seconden")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Haal uitvoeringsresultaten op"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="User cancellation"):
        """Annuleer een lopende uitvoering"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """Lijst van pijplijnuitvoeringen met filtermogelijkheden"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Gebruik voorbeeld
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Voer pijplijn uit met aangepaste parameters
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Start uitvoering
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Monthly Process Analysis',
        execution_params,
        'High'
    )

    print(f"Uitvoering gestart: {execution['executionId']}")
    print(f"Geschatte duur: {execution['estimatedDuration']}")

    # Wacht op voltooiing
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Haal resultaten op
        results = manager.get_execution_results(execution['executionId'])
        print(f"Verrijking succesvol voltooid!")
        print(f"Originele records: {results['summary']['originalRecords']}")
        print(f"Verrijkte records: {results['summary']['enrichedRecords']}")
        print(f"Data kwaliteitscore: {results['summary']['dataQualityScore']}")
        print(f"Download verrijkte data: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Uitvoering mislukt met status: {final_status['status']}")

except Exception as e:
    print(f"Fout bij uitvoeren van pijplijn: {e}")