Yürütme

İş Yürütme API'si

Süreç madenciliği işlerinin yürütülmesini yönetin ve izleyin, eşzamansız işlemleri yönetin ve iş ilerlemesini gerçek zamanlı takip edin.

Özellikler

İş Kuyruğu

İş kuyruğunu ve önceliklerini yönetin.

Kuyruğu Görüntüle

İş Takibi

İş durumunu ve ilerlemesini takip edin.

İşleri Takip Et

Eşzamansız İşlemler

Uzun süreli eşzamansız işlemleri yönetin.

Eşzamansız İşlemler

İş Durumunu Al

GET /api/{tenantId}/{projectId}/execution/job/{jobId}

Herhangi bir yürütme işinin mevcut durumunu ve detaylarını, ilerleme bilgileri, yürütme metrikleri ve tamamlanma durumunu içerecek şekilde getirir.

Parametreler

Parametre Tür Yer Açıklama
tenantId GUID Yol Kiracı tanımlayıcısı
projectId GUID Yol Proje tanımlayıcısı
jobId GUID Yol İş tanımlayıcısı

Yanıt

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "projectId": "660e8400-e29b-41d4-a716-446655440000",
  "jobType": "ProcessMining",
  "jobName": "Customer Journey Analysis",
  "jobDescription": "Müşteri temas noktaları ve davranışlarının kapsamlı analizi",
  "status": "Running",
  "priority": "High",
  "progress": {
    "percentage": 65,
    "currentStage": "Data Processing",
    "estimatedCompletion": "2024-01-20T11:15:00Z",
    "elapsedTime": "8 dakika 32 saniye"
  },
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000",
    "resourceName": "Müşteri Analitik Boru Hattı"
  },
  "execution": {
    "startTime": "2024-01-20T10:30:00Z",
    "submittedBy": "user123",
    "executionNode": "worker-node-02",
    "memoryUsage": "2.1 GB",
    "cpuUsage": "45%",
    "diskUsage": "890 MB"
  },
  "metrics": {
    "recordsProcessed": 125430,
    "totalRecords": 192850,
    "errorCount": 3,
    "warningCount": 12,
    "averageProcessingRate": "1250 kayıt/saniye"
  },
  "dateCreated": "2024-01-20T10:28:00Z",
  "lastUpdated": "2024-01-20T10:38:45Z"
}

Tüm İşleri Listele

GET /api/{tenantId}/{projectId}/execution/jobs

Projede yer alan tüm yürütme işlerinin sayfalı listesini, durum, iş türü ve tarih aralıklarına göre filtreleme seçenekleri ile birlikte getirir.

Sorgu Parametreleri

Parametre Tür Açıklama
status string Duruma göre filtrele: Queued, Running, Completed, Failed, Cancelled
jobType string İş türüne göre filtrele: ProcessMining, DataEnrichment, Notebook, Analysis
priority string Önceliğe göre filtrele: Low, Normal, High, Critical
submittedBy string İş gönderen kullanıcıya göre filtrele
dateFrom datetime Bu tarihten itibaren işleri filtrele
dateTo datetime Bu tarihe kadar işleri filtrele
page integer Sayfa numarası (varsayılan: 1)
pageSize integer Sayfa başına öğe sayısı (varsayılan: 20, max: 100)

Yanıt

{
  "jobs": [
    {
      "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
      "jobType": "ProcessMining",
      "jobName": "Customer Journey Analysis",
      "status": "Running",
      "priority": "High",
      "progress": 65,
      "startTime": "2024-01-20T10:30:00Z",
      "estimatedCompletion": "2024-01-20T11:15:00Z",
      "submittedBy": "user123",
      "resourceName": "Müşteri Analitik Boru Hattı"
    },
    {
      "jobId": "dd0e8400-e29b-41d4-a716-446655440000",
      "jobType": "DataEnrichment",
      "jobName": "Daily Sales Enrichment",
      "status": "Completed",
      "priority": "Normal",
      "progress": 100,
      "startTime": "2024-01-20T09:00:00Z",
      "endTime": "2024-01-20T09:23:00Z",
      "duration": "23 dakika",
      "submittedBy": "system",
      "resourceName": "Satış Verisi Boru Hattı"
    }
  ],
  "summary": {
    "totalJobs": 156,
    "runningJobs": 3,
    "queuedJobs": 7,
    "completedJobs": 142,
    "failedJobs": 4
  },
  "page": 1,
  "pageSize": 20,
  "hasNextPage": true
}

Yeni İş Gönder

POST /api/{tenantId}/{projectId}/execution/job

Sisteme yeni bir yürütme işi gönderir. İş, öncelik ve kaynak kullanılabilirliğine göre sıraya alınır ve işlenir.

İstek Gövdesi

{
  "jobName": "Weekly Process Analysis",
  "jobDescription": "Süreç performansının otomatik haftalık analizi",
  "jobType": "ProcessMining",
  "priority": "Normal",
  "resource": {
    "resourceType": "Pipeline",
    "resourceId": "770e8400-e29b-41d4-a716-446655440000"
  },
  "parameters": {
    "datasetId": "880e8400-e29b-41d4-a716-446655440000",
    "analysisType": "comprehensive",
    "timeWindow": {
      "startDate": "2024-01-01",
      "endDate": "2024-01-07"
    },
    "includeAnomalyDetection": true,
    "outputFormat": "detailed_report"
  },
  "scheduling": {
    "executeImmediately": true,
    "scheduledTime": null,
    "timeoutMinutes": 120
  },
  "notifications": {
    "onCompletion": true,
    "onFailure": true,
    "emailRecipients": ["analyst@company.com"]
  }
}

Yanıt

{
  "jobId": "ee0e8400-e29b-41d4-a716-446655440000",
  "status": "Queued",
  "queuePosition": 3,
  "estimatedStartTime": "2024-01-20T10:45:00Z",
  "estimatedDuration": "45-60 dakika",
  "jobName": "Weekly Process Analysis",
  "priority": "Normal",
  "dateSubmitted": "2024-01-20T10:30:00Z",
  "submittedBy": "user123"
}

İşi İptal Et

DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}

Sıradaki veya çalışan bir işi iptal eder. Tamamlanmış işler iptal edilemez. Çalışan işler mümkün olduğunda nazikçe durdurulur.

İstek Gövdesi (Opsiyonel)

{
  "reason": "Kullanıcı iptal talebinde bulundu",
  "forceTermination": false,
  "preservePartialResults": true
}

Yanıt Kodları

  • 200 OK - İş başarıyla iptal edildi
  • 404 Not Found - İş bulunamadı
  • 409 Conflict - İş zaten tamamlanmış veya iptal edilemiyor

İş Sonuçlarını Al

GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results

Tamamlanmış bir iş yürütmesinin sonuçlarını ve çıktıları, oluşturulan artefaktlar, raporlar ve veri dosyaları ile birlikte getirir.

Sorgu Parametreleri

Parametre Tür Açıklama
format string Yanıt formatı: summary, detailed, download (varsayılan: summary)
includeArtifacts boolean İndirilebilir artefaktları yanıt içinde dahil et (varsayılan: true)
outputType string Çıktı türüne göre filtrele: raporlar, veri, modeller, görselleştirmeler

Yanıt

{
  "jobId": "cc0e8400-e29b-41d4-a716-446655440000",
  "status": "Completed",
  "completionTime": "2024-01-20T11:12:00Z",
  "totalDuration": "42 dakika",
  "success": true,
  "summary": {
    "recordsProcessed": 192850,
    "outputsGenerated": 7,
    "dataQualityScore": 94.2,
    "processingEfficiency": 87.5
  },
  "results": {
    "primaryOutput": {
      "type": "ProcessMiningReport",
      "title": "Customer Journey Analysis Report",
      "format": "html",
      "size": "2.3 MB",
      "downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
    },
    "additionalOutputs": [
      {
        "type": "EnrichedDataset",
        "title": "Customer Journey Data Enhanced",
        "format": "csv",
        "recordCount": 192850,
        "size": "45.7 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
      },
      {
        "type": "ProcessMap",
        "title": "Customer Journey Process Map",
        "format": "svg",
        "size": "890 KB",
        "downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
      },
      {
        "type": "AnalyticsModel",
        "title": "Journey Prediction Model",
        "format": "pkl",
        "accuracy": 0.89,
        "size": "12.4 MB",
        "downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
      }
    ]
  },
  "executionMetrics": {
    "totalCpuTime": "38.5 dakika",
    "peakMemoryUsage": "3.2 GB",
    "diskIoOperations": 45672,
    "networkDataTransfer": "567 MB"
  },
  "qualityMetrics": {
    "dataValidation": {
      "totalRecords": 195000,
      "validRecords": 192850,
      "duplicatesRemoved": 1890,
      "invalidRecords": 260
    },
    "processingErrors": [],
    "warnings": [
      {
        "type": "DataQuality",
        "message": "Bazı zaman damgaları çıkarımla belirlendi",
        "count": 125
      }
    ]
  }
}

Başarısız İşleri Tekrar Dene

POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry

Başarısız olmuş bir iş yürütmesini, isteğe bağlı parametre değişiklikleri ile tekrar dener. İş, aynı veya güncellenmiş konfigürasyonla kuyruğa yeniden alınır.

İstek Gövdesi

{
  "retryReason": "Altyapı sorunu çözüldü",
  "modifyParameters": true,
  "updatedParameters": {
    "timeoutMinutes": 180,
    "retryFailedRecords": true,
    "increaseMemoryLimit": true
  },
  "priority": "High",
  "immediateExecution": false
}

Yanıt

Güncellenmiş iş kimliği ve yeniden deneme bilgileri içeren yeni bir iş nesnesi ile 200 OK döner.

Sistem Yürütme Durumunu Al

GET /api/{tenantId}/execution/system/status

Sistem genelindeki mevcut yürütme durumunu, kaynak kullanımı, kuyruk sağlığı ve performans metrikleri dahilinde getirir.

Yanıt

{
  "systemStatus": "Healthy",
  "timestamp": "2024-01-20T10:45:00Z",
  "executionNodes": [
    {
      "nodeId": "worker-node-01",
      "status": "Active",
      "cpuUsage": 67,
      "memoryUsage": 78,
      "activeJobs": 2,
      "jobCapacity": 4
    },
    {
      "nodeId": "worker-node-02",
      "status": "Active",
      "cpuUsage": 45,
      "memoryUsage": 56,
      "activeJobs": 1,
      "jobCapacity": 4
    }
  ],
  "queueStatistics": {
    "totalQueuedJobs": 15,
    "highPriorityJobs": 3,
    "normalPriorityJobs": 10,
    "lowPriorityJobs": 2,
    "averageWaitTime": "4.2 dakika",
    "estimatedProcessingTime": "23 dakika"
  },
  "performanceMetrics": {
    "jobsCompletedToday": 847,
    "averageJobDuration": "18.5 dakika",
    "successRate": 97.8,
    "throughputPerHour": 35.2
  },
  "resourceUtilization": {
    "totalCpuCapacity": 1600,
    "usedCpuCapacity": 896,
    "totalMemoryCapacity": "64 GB",
    "usedMemoryCapacity": "38.4 GB",
    "diskSpaceAvailable": "2.3 TB"
  }
}

Örnek: Tam İş Yönetimi İş Akışı

Bu örnek bir iş gönderme, ilerlemesini izleme ve sonuçlarını alma sürecini göstermektedir:

// 1. Yeni bir iş gönder
const submitJob = async () => {
  const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      jobName: 'Customer Behavior Analysis',
      jobDescription: 'Müşteri etkileşim kalıplarının haftalık analizi',
      jobType: 'ProcessMining',
      priority: 'High',
      resource: {
        resourceType: 'Pipeline',
        resourceId: '770e8400-e29b-41d4-a716-446655440000'
      },
      parameters: {
        datasetId: '880e8400-e29b-41d4-a716-446655440000',
        analysisType: 'comprehensive',
        timeWindow: {
          startDate: '2024-01-13',
          endDate: '2024-01-19'
        },
        includeAnomalyDetection: true,
        outputFormat: 'detailed_report'
      },
      scheduling: {
        executeImmediately: true,
        timeoutMinutes: 90
      },
      notifications: {
        onCompletion: true,
        onFailure: true,
        emailRecipients: ['analyst@company.com']
      }
    })
  });

  return await response.json();
};

// 2. İş ilerlemesini izle
const monitorJob = async (jobId) => {
  const checkStatus = async () => {
    const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
      headers: {
        'Authorization': `Bearer ${token}`
      }
    });

    const job = await response.json();
    console.log(`İş ${jobId}: ${job.status} (${job.progress.percentage}%)`);
    console.log(`Mevcut aşama: ${job.progress.currentStage}`);
    console.log(`Tahmini tamamlanma: ${job.progress.estimatedCompletion}`);

    if (job.status === 'Running' || job.status === 'Queued') {
      setTimeout(() => checkStatus(), 30000); // Her 30 saniyede bir kontrol et
    } else if (job.status === 'Completed') {
      console.log('İş başarıyla tamamlandı!');
      await getJobResults(jobId);
    } else if (job.status === 'Failed') {
      console.log('İş başarısız oldu:', job.error);
    }
  };

  await checkStatus();
};

// 3. İş sonuçlarını al
const getJobResults = async (jobId) => {
  const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const results = await response.json();
  console.log('İş Sonuçları:', results.summary);
  console.log('Birincil Çıktı:', results.results.primaryOutput.downloadUrl);

  // Ek çıktıları indir
  for (const output of results.results.additionalOutputs) {
    console.log(`İndir ${output.type}: ${output.downloadUrl}`);
  }

  return results;
};

// 4. Sistem durumunu al
const getSystemStatus = async () => {
  const response = await fetch('/api/{tenantId}/execution/system/status', {
    headers: {
      'Authorization': `Bearer ${token}`
    }
  });

  const status = await response.json();
  console.log(`Sistem Durumu: ${status.systemStatus}`);
  console.log(`Kuyruk: ${status.queueStatistics.totalQueuedJobs} iş bekliyor`);
  console.log(`Ortalama bekleme süresi: ${status.queueStatistics.averageWaitTime}`);

  return status;
};

// İş akışını çalıştır
submitJob()
  .then(job => {
    console.log(`Gönderilen iş: ${job.jobId}`);
    console.log(`Kuyruk pozisyonu: ${job.queuePosition}`);
    console.log(`Tahmini başlama: ${job.estimatedStartTime}`);
    return monitorJob(job.jobId);
  })
  .catch(error => console.error('İş akışı başarısız oldu:', error));

Python Örneği

import requests
import time
import json
from datetime import datetime, timedelta

class ExecutionManager:
    def __init__(self, base_url, tenant_id, project_id, token):
        self.base_url = base_url
        self.tenant_id = tenant_id
        self.project_id = project_id
        self.headers = {
            'Authorization': f'Bearer {token}',
            'Content-Type': 'application/json'
        }

    def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
        """Yeni bir yürütme işi gönder"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
        payload = {
            'jobName': job_name,
            'jobType': job_type,
            'priority': priority,
            'resource': {
                'resourceType': resource_type,
                'resourceId': resource_id
            },
            'parameters': parameters or {},
            'scheduling': {
                'executeImmediately': True,
                'timeoutMinutes': 120
            },
            'notifications': {
                'onCompletion': True,
                'onFailure': True
            }
        }
        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_job_status(self, job_id):
        """Mevcut iş durumunu al"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()

    def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
        """İşleri opsiyonel filtrelerle listele"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
        params = {'page': page, 'pageSize': page_size}

        if status:
            params['status'] = status
        if job_type:
            params['jobType'] = job_type
        if date_from:
            params['dateFrom'] = date_from.isoformat()
        if date_to:
            params['dateTo'] = date_to.isoformat()

        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
        """İş tamamlanana kadar periyodik durum kontrolleriyle bekle"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            job = self.get_job_status(job_id)
            print(f"İş {job_id}: {job['status']} ({job['progress']['percentage']}%)")
            print(f"  Mevcut aşama: {job['progress']['currentStage']}")
            print(f"  Geçen süre: {job['progress']['elapsedTime']}")

            if job['status'] in ['Completed', 'Failed', 'Cancelled']:
                return job

            time.sleep(poll_interval)

        raise TimeoutError(f"İş {job_id}, {timeout} saniye içinde tamamlanmadı")

    def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
        """İş yürütme sonuçlarını al"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
        params = {
            'format': format_type,
            'includeArtifacts': str(include_artifacts).lower()
        }
        response = requests.get(url, params=params, headers=self.headers)
        return response.json()

    def cancel_job(self, job_id, reason="Kullanıcı iptali", force=False):
        """Çalışan veya sıralanan bir işi iptal et"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
        payload = {
            'reason': reason,
            'forceTermination': force,
            'preservePartialResults': True
        }
        response = requests.delete(url, json=payload, headers=self.headers)
        return response.status_code == 200

    def retry_job(self, job_id, reason="Başarısızlık sonrası yeniden dene", priority=None, modify_params=None):
        """Başarısız olmuş bir işi yeniden dene"""
        url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
        payload = {
            'retryReason': reason,
            'modifyParameters': modify_params is not None,
            'immediateExecution': False
        }

        if priority:
            payload['priority'] = priority
        if modify_params:
            payload['updatedParameters'] = modify_params

        response = requests.post(url, json=payload, headers=self.headers)
        return response.json()

    def get_system_status(self):
        """Sistem genelinde yürütme durumunu al"""
        url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
        response = requests.get(url, headers=self.headers)
        return response.json()

# Kullanım örneği
manager = ExecutionManager(
    'https://your-mindzie-instance.com',
    'tenant-guid',
    'project-guid',
    'your-auth-token'
)

try:
    # Sistem durumunu kontrol et
    system_status = manager.get_system_status()
    print(f"Sistem Durumu: {system_status['systemStatus']}")
    print(f"Kuyruktaki işler: {system_status['queueStatistics']['totalQueuedJobs']}")
    print(f"Ortalama bekleme süresi: {system_status['queueStatistics']['averageWaitTime']}")

    # Kapsamlı bir süreç madenciliği işi gönder
    job_params = {
        'datasetId': 'dataset-guid',
        'analysisType': 'comprehensive',
        'timeWindow': {
            'startDate': '2024-01-01',
            'endDate': '2024-01-31'
        },
        'includeAnomalyDetection': True,
        'includeProcessVariants': True,
        'generateInsights': True,
        'outputFormat': 'detailed_report',
        'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
        'qualityChecks': {
            'validateTimestamps': True,
            'checkDuplicates': True,
            'validateActivities': True
        }
    }

    job = manager.submit_job(
        'Monthly Process Analytics',
        'ProcessMining',
        'Pipeline',
        'pipeline-guid',
        job_params,
        'High'
    )

    print(f"Gönderilen iş: {job['jobId']}")
    print(f"Kuyruk pozisyonu: {job['queuePosition']}")
    print(f"Tahmini başlama: {job['estimatedStartTime']}")

    # Tamamlanmasını bekle
    final_job = manager.wait_for_completion(job['jobId'])

    if final_job['status'] == 'Completed':
        # Detaylı sonuçları al
        results = manager.get_job_results(job['jobId'])

        print("İş başarıyla tamamlandı!")
        print(f"İşlenen kayıt sayısı: {results['summary']['recordsProcessed']:,}")
        print(f"Veri kalitesi puanı: {results['summary']['dataQualityScore']}")
        print(f"İşleme verimliliği: {results['summary']['processingEfficiency']}%")

        # Birincil raporu indir
        print(f"Rapor indir: {results['results']['primaryOutput']['downloadUrl']}")

        # Tüm ek çıktıları listele
        for output in results['results']['additionalOutputs']:
            print(f"İndir {output['type']}: {output['downloadUrl']}")

    else:
        print(f"İş başarısız oldu, durum: {final_job['status']}")
        if 'error' in final_job:
            print(f"Hata: {final_job['error']}")

except Exception as e:
    print(f"Yürütme iş akışında hata: {e}")