Pipeline Yürütme

Zenginleştirme Pipeline'larını Çalıştırın

Veri setleri üzerinde zenginleştirme pipeline'larını çalıştırın, ilerlemeyi izleyin ve geliştirilmiş sonuçları alın.

Pipeline Çalıştır

POST /api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute

Belirtilen bir veri seti üzerinde zenginleştirme pipeline'ının çalışmasını tetikler. Çalışma asenkron olarak gerçekleşir ve ilerlemenin takibi için bir yürütme ID'si döner.

Parametreler

Parametre Tür Konum Açıklama
tenantId GUID Path Kiracı tanımlayıcısı
projectId GUID Path Proje tanımlayıcısı
pipelineId GUID Path Pipeline tanımlayıcısı

İstek Gövdesi

{
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "executionName": "Aylık Süreç Analizi",
  "executionDescription": "Aylık performans incelemesi için zenginleştirme",
  "parameters": {
    "timeRange": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-31"
    },
    "filterCriteria": {
      "includeWeekends": false,
      "minCaseDuration": "1h"
    },
    "outputOptions": {
      "includeRawData": true,
      "generateSummary": true,
      "exportFormat": "CSV"
    }
  },
  "priority": "Normal",
  "notifyOnCompletion": true
}

Yanıt

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "estimatedDuration": "15-20 dakika",
  "executionName": "Aylık Süreç Analizi",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Veri Doğrulama",
      "status": "Pending",
      "estimatedDuration": "2-3 dakika"
    },
    {
      "stageId": "stage-002",
      "stageName": "Zaman Zenginleştirme",
      "status": "Pending",
      "estimatedDuration": "8-10 dakika"
    }
  ]
}

Yürütme Durumunu Al

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Bir pipeline yürütmesinin mevcut durumunu ve ilerleme bilgisini, aşama aşama detaylı ilerleme dahil olmak üzere getirir.

Yanıt

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
  "datasetId": "880e8400-e29b-41d4-a716-446655440000",
  "status": "Running",
  "progress": 45,
  "currentStage": {
    "stageId": "stage-002",
    "stageName": "Zaman Zenginleştirme",
    "status": "Running",
    "progress": 60,
    "startTime": "2024-01-20T10:35:00Z",
    "estimatedCompletion": "2024-01-20T10:45:00Z"
  },
  "executionName": "Aylık Süreç Analizi",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "dateStarted": "2024-01-20T10:32:00Z",
  "estimatedCompletion": "2024-01-20T10:50:00Z",
  "priority": "Normal",
  "stages": [
    {
      "stageId": "stage-001",
      "stageName": "Veri Doğrulama",
      "status": "Completed",
      "progress": 100,
      "startTime": "2024-01-20T10:32:00Z",
      "endTime": "2024-01-20T10:35:00Z",
      "duration": "3 dakika",
      "recordsProcessed": 15420,
      "validationResults": {
        "totalRecords": 15420,
        "validRecords": 15418,
        "errors": 2,
        "warnings": 15
      }
    },
    {
      "stageId": "stage-002",
      "stageName": "Zaman Zenginleştirme",
      "status": "Running",
      "progress": 60,
      "startTime": "2024-01-20T10:35:00Z",
      "recordsProcessed": 9252,
      "totalRecords": 15418
    }
  ],
  "metrics": {
    "totalRecords": 15420,
    "processedRecords": 9252,
    "errorCount": 2,
    "warningCount": 15
  }
}

Yürütme Sonuçlarını Al

GET /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/results

Tamamlanmış bir pipeline yürütmesinin nihai sonuçlarını, zenginleştirilmiş verileri, özet istatistikleri ve indirilebilir çıktılarını getirir.

Sorgu Parametreleri

Parametre Tür Açıklama
format string Yanıt formatı: summary, full, download (varsayılan: summary)
includeRawData boolean Orijinal veri setini yanıtla dahil et (varsayılan: false)
limit integer Döndürülecek kayıt sayısını sınırla (maksimum: 10000)

Yanıt

{
  "executionId": "990e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionDate": "2024-01-20T10:48:00Z",
  "totalDuration": "18 dakika",
  "summary": {
    "originalRecords": 15420,
    "enrichedRecords": 15418,
    "newAttributes": 8,
    "dataQualityScore": 98.7,
    "enrichmentCoverage": 99.9
  },
  "enrichedAttributes": [
    {
      "attributeName": "dayOfWeek",
      "attributeType": "string",
      "coverage": 100,
      "uniqueValues": 7,
      "description": "Her olay için haftanın günü"
    },
    {
      "attributeName": "businessHours",
      "attributeType": "boolean",
      "coverage": 100,
      "description": "Olayın çalışma saatlerinde gerçekleşip gerçekleşmediği"
    },
    {
      "attributeName": "cycleTime",
      "attributeType": "duration",
      "coverage": 99.8,
      "averageValue": "4.2 saat",
      "description": "Vaka başlangıcından tamamlanmaya kadar geçen süre"
    }
  ],
  "dataQuality": {
    "completeness": 99.9,
    "accuracy": 98.5,
    "consistency": 99.2,
    "validity": 97.8,
    "issues": [
      {
        "type": "Eksik Zaman Damgası",
        "count": 2,
        "severity": "Yüksek"
      },
      {
        "type": "Geçersiz Süre",
        "count": 15,
        "severity": "Orta"
      }
    ]
  },
  "downloadUrls": {
    "enrichedDataset": "https://api.mindzie.com/downloads/enriched-990e8400.csv",
    "summary": "https://api.mindzie.com/downloads/summary-990e8400.pdf",
    "dataQualityReport": "https://api.mindzie.com/downloads/quality-990e8400.html"
  }
}

Pipeline Yürütmelerini Listele

GET /api/{tenantId}/{projectId}/enrichment/executions

Filtreleme ve sayfalama seçenekleri ile tüm pipeline yürütmelerinin listesini getirir. Yürütme geçmişi ve performansını izlemek için kullanışlıdır.

Sorgu Parametreleri

Parametre Tür Açıklama
pipelineId GUID Belirli bir pipeline’a göre filtrele
status string Duruma göre filtrele: Queued, Running, Completed, Failed, Cancelled
dateFrom datetime Bu tarihten itibaren yürütmeleri filtrele
dateTo datetime Bu tarihe kadar yürütmeleri filtrele
page integer Sayfa numarası (varsayılan: 1)
pageSize integer Sayfa başına öğe sayısı (varsayılan: 20, maksimum: 100)

Yanıt

{
  "executions": [
    {
      "executionId": "990e8400-e29b-41d4-a716-446655440000",
      "pipelineId": "770e8400-e29b-41d4-a716-446655440000",
      "pipelineName": "Süreç Madenciliği Veri Zenginleştirme",
      "executionName": "Aylık Süreç Analizi",
      "status": "Completed",
      "dateSubmitted": "2024-01-20T10:30:00Z",
      "dateCompleted": "2024-01-20T10:48:00Z",
      "duration": "18 dakika",
      "recordsProcessed": 15418,
      "priority": "Normal",
      "submittedBy": "user123"
    }
  ],
  "totalCount": 47,
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Yürütmeyi İptal Et

DELETE /api/{tenantId}/{projectId}/enrichment/execution/{executionId}

Çalışmakta olan veya kuyruğa alınmış bir pipeline yürütmesini iptal eder. Tamamlanmış aşamalar korunur ancak yürütme mevcut aşamada durur.

İstek Gövdesi (Opsiyonel)

{
  "reason": "Kullanıcı iptal talebinde bulundu",
  "preservePartialResults": true
}

Yanıt Kodları

  • 200 OK - Yürütme başarıyla iptal edildi
  • 404 Not Found - Yürütme bulunamadı
  • 409 Conflict - Yürütme zaten tamamlandı veya iptal edilemiyor

Başarısız Yürütmeyi Yeniden Başlat

POST /api/{tenantId}/{projectId}/enrichment/execution/{executionId}/restart

Başarısız olmuş pipeline yürütmesini başarısızlık noktasından yeniden başlatır. Daha önce tamamlanmış aşamalar açıkça tekrar çalıştırılması istenmedikçe atlanır.

İstek Gövdesi

{
  "restartFromStage": "stage-003",
  "rerunCompletedStages": false,
  "updateParameters": {
    "retryFailedRecords": true,
    "increaseTimeout": true
  }
}

Yanıt

Güncellenmiş yürütme ID'si ve durumu içeren yeni bir yürütme nesnesi ile 200 OK döner.

Örnek: Tam Yürütme İş Akışı

Bu örnek bir pipeline’ı çalıştırmayı ve ilerlemesini izlemeyi göstermektedir:

// 1. Pipeline'ı çalıştır
const executeEnrichment = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/enrichment/pipeline/{pipelineId}/execute', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      datasetId: '880e8400-e29b-41d4-a716-446655440000',
      executionName: 'Müşteri Yolculuğu Analizi',
      executionDescription: 'Müşteri verilerini yolculuk metrikleri ile zenginleştirme',
      parameters: {
        timeRange: {
          startDate: '2024-01-01',
          endDate: '2024-01-31'
        },
        outputOptions: {
          includeRawData: true,
          generateSummary: true,
          exportFormat: 'CSV'
        }
      },
      priority: 'High',
      notifyOnCompletion: true
    })
  });

  return await response.json();
};

// 2. Yürütme ilerlemesini takip et
const monitorExecution = async (executionId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const execution = await response.json();
    console.log(`Durum: ${execution.status}, İlerleme: %${execution.progress}`);

    if (execution.status === 'Running' || execution.status === 'Queued') {
      // 30 saniye sonra tekrar kontrol et
      setTimeout(() => checkStatus(), 30000);
    } else if (execution.status === 'Completed') {
      console.log('Yürütme başarıyla tamamlandı!');
      await getResults(executionId);
    } else if (execution.status === 'Failed') {
      console.log('Yürütme başarısız oldu:', execution.error);
    }
  };

  await checkStatus();
};

// 3. Tamamlandığında sonuçları al
const getResults = async (executionId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/enrichment/execution/${executionId}/results?format=summary`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('Zenginleştirme Özeti:', results.summary);
  console.log('İndirme URL’leri:', results.downloadUrls);

  return results;
};

// İş akışını çalıştır
executeEnrichment()
  .then(execution => {
    console.log(`Başlatılan yürütme: ${execution.executionId}`);
    return monitorExecution(execution.executionId);
  })
  .catch(error => console.error('Yürütme başarısız:', error));

Python Örneği

import requests
import time
import json
from datetime import datetime, timedelta

class PipelineExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def execute_pipeline(self, pipeline_id, dataset_id, execution_name, parameters=None, priority="Normal"):
        """Bir zenginleştirme pipeline'ını çalıştır"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/pipeline/{pipeline_id}/execute"
        payload = {
            'datasetId': dataset_id,
            'executionName': execution_name,
            'parameters': parameters or {},
            'priority': priority,
            'notifyOnCompletion': True
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_execution_status(self, execution_id):
        """Mevcut yürütme durumunu al"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def wait_for_completion(self, execution_id, poll_interval=30, timeout=3600):
        """Periyodik durum kontrolleri ile yürütme tamamlanana kadar bekle"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            status = self.get_execution_status(execution_id)
            print(f"{execution_id} yürütmesi: {status['status']} (%{status.get('progress', 0)})")

            if status['status'] in ['Completed', 'Failed', 'Cancelled']:
                return status

            time.sleep(poll_interval)

        raise TimeoutError(f"{execution_id} yürütmesi {timeout} saniye içinde tamamlanmadı")

    def get_execution_results(self, execution_id, format_type="summary", include_raw_data=False):
        """Yürütme sonuçlarını al"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}/results"
        params = {
            'format': format_type,
            'includeRawData': include_raw_data
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_execution(self, execution_id, reason="Kullanıcı iptali"):
        """Çalışmakta olan yürütmeyi iptal et"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/execution/{execution_id}"
        payload = {
            'reason': reason,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def list_executions(self, pipeline_id=None, status=None, date_from=None, date_to=None, page=1, page_size=20):
        """Filtreleme ile pipeline yürütmelerini listele"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/enrichment/executions"
        params = {'page': page, 'pageSize': page_size}

        if pipeline_id:
            params['pipelineId'] = pipeline_id
        if status:
            params['status'] = status
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

# Kullanım örneği
manager = PipelineExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

# Özel parametrelerle pipeline çalıştır
execution_params = {
    'timeRange': {
        'startDate': '2024-01-01',
        'endDate': '2024-01-31'
    },
    'filterCriteria': {
        'includeWeekends': False,
        'minCaseDuration': '1h'
    },
    'outputOptions': {
        'includeRawData': True,
        'generateSummary': True,
        'exportFormat': 'CSV'
    }
}

try:
    # Yürütmeyi başlat
    execution = manager.execute_pipeline(
        'pipeline-guid',
        'dataset-guid',
        'Aylık Süreç Analizi',
        execution_params,
        'High'
    )

    print(f"Başlatılan yürütme: {execution['executionId']}")
    print(f"Tahmini süre: {execution['estimatedDuration']}")

    # Tamamlanmasını bekle
    final_status = manager.wait_for_completion(execution['executionId'])

    if final_status['status'] == 'Completed':
        # Sonuçları al
        results = manager.get_execution_results(execution['executionId'])
        print(f"Zenginleştirme başarıyla tamamlandı!")
        print(f"Orjinal kayıt sayısı: {results['summary']['originalRecords']}")
        print(f"Zenginleştirilmiş kayıt sayısı: {results['summary']['enrichedRecords']}")
        print(f"Veri kalite puanı: {results['summary']['dataQualityScore']}")
        print(f"Zenginleştirilmiş veriyi indir: {results['downloadUrls']['enrichedDataset']}")
    else:
        print(f"Yürütme durumuyla başarısız oldu: {final_status['status']}")

except Exception as e:
    print(f"Pipeline yürütme sırasında hata oluştu: {e}")