Exécution
API d'Exécution des Tâches
Gérez et surveillez l'exécution des tâches de process mining, gérez les opérations asynchrones et suivez la progression des tâches en temps réel.
Fonctionnalités
File d'attente des tâches
Gérez la file d'attente des tâches et leurs priorités.
Suivi des tâches
Suivez le statut et la progression des tâches.
Opérations asynchrones
Gérez les opérations asynchrones de longue durée.
Obtenir le statut d'une tâche
GET /api/{tenantId}/{projectId}/execution/job/{jobId}
Récupère le statut actuel et les détails de toute tâche d'exécution, incluant les informations de progression, les métriques d'exécution et le statut d'achèvement.
Paramètres
| Paramètre | Type | Emplacement | Description |
|---|---|---|---|
tenantId |
GUID | Chemin | L'identifiant du locataire |
projectId |
GUID | Chemin | L'identifiant du projet |
jobId |
GUID | Chemin | L'identifiant de la tâche |
Réponse
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"projectId": "660e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"jobDescription": "Comprehensive analysis of customer touchpoints and behaviors",
"status": "Running",
"priority": "High",
"progress": {
"percentage": 65,
"currentStage": "Data Processing",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"elapsedTime": "8 minutes 32 seconds"
},
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000",
"resourceName": "Customer Analytics Pipeline"
},
"execution": {
"startTime": "2024-01-20T10:30:00Z",
"submittedBy": "user123",
"executionNode": "worker-node-02",
"memoryUsage": "2.1 GB",
"cpuUsage": "45%",
"diskUsage": "890 MB"
},
"metrics": {
"recordsProcessed": 125430,
"totalRecords": 192850,
"errorCount": 3,
"warningCount": 12,
"averageProcessingRate": "1250 records/second"
},
"dateCreated": "2024-01-20T10:28:00Z",
"lastUpdated": "2024-01-20T10:38:45Z"
}
Lister toutes les tâches
GET /api/{tenantId}/{projectId}/execution/jobs
Récupère une liste paginée de toutes les tâches d'exécution du projet avec des options de filtrage par statut, type de tâche et plages de dates.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
status |
string | Filtrer par statut : Queued, Running, Completed, Failed, Cancelled |
jobType |
string | Filtrer par type de tâche : ProcessMining, DataEnrichment, Notebook, Analysis |
priority |
string | Filtrer par priorité : Low, Normal, High, Critical |
submittedBy |
string | Filtrer par utilisateur ayant soumis la tâche |
dateFrom |
datetime | Filtrer les tâches à partir de cette date |
dateTo |
datetime | Filtrer les tâches jusqu'à cette date |
page |
integer | Numéro de page pour la pagination (par défaut : 1) |
pageSize |
integer | Nombre d'éléments par page (par défaut : 20, max : 100) |
Réponse
{
"jobs": [
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"jobType": "ProcessMining",
"jobName": "Customer Journey Analysis",
"status": "Running",
"priority": "High",
"progress": 65,
"startTime": "2024-01-20T10:30:00Z",
"estimatedCompletion": "2024-01-20T11:15:00Z",
"submittedBy": "user123",
"resourceName": "Customer Analytics Pipeline"
},
{
"jobId": "dd0e8400-e29b-41d4-a716-446655440000",
"jobType": "DataEnrichment",
"jobName": "Daily Sales Enrichment",
"status": "Completed",
"priority": "Normal",
"progress": 100,
"startTime": "2024-01-20T09:00:00Z",
"endTime": "2024-01-20T09:23:00Z",
"duration": "23 minutes",
"submittedBy": "system",
"resourceName": "Sales Data Pipeline"
}
],
"summary": {
"totalJobs": 156,
"runningJobs": 3,
"queuedJobs": 7,
"completedJobs": 142,
"failedJobs": 4
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Soumettre une nouvelle tâche
POST /api/{tenantId}/{projectId}/execution/job
Soumet une nouvelle tâche d'exécution au système. La tâche sera mise en file d'attente et traitée selon la priorité et la disponibilité des ressources.
Corps de la requête
{
"jobName": "Weekly Process Analysis",
"jobDescription": "Automated weekly analysis of process performance",
"jobType": "ProcessMining",
"priority": "Normal",
"resource": {
"resourceType": "Pipeline",
"resourceId": "770e8400-e29b-41d4-a716-446655440000"
},
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-07"
},
"includeAnomalyDetection": true,
"outputFormat": "detailed_report"
},
"scheduling": {
"executeImmediately": true,
"scheduledTime": null,
"timeoutMinutes": 120
},
"notifications": {
"onCompletion": true,
"onFailure": true,
"emailRecipients": ["analyst@company.com"]
}
}
Réponse
{
"jobId": "ee0e8400-e29b-41d4-a716-446655440000",
"status": "Queued",
"queuePosition": 3,
"estimatedStartTime": "2024-01-20T10:45:00Z",
"estimatedDuration": "45-60 minutes",
"jobName": "Weekly Process Analysis",
"priority": "Normal",
"dateSubmitted": "2024-01-20T10:30:00Z",
"submittedBy": "user123"
}
Annuler une tâche
DELETE /api/{tenantId}/{projectId}/execution/job/{jobId}
Annule une tâche en file d'attente ou en cours d'exécution. Les tâches terminées ne peuvent pas être annulées. Les tâches en cours seront arrêtées proprement si possible.
Corps de la requête (optionnel)
{
"reason": "User requested cancellation",
"forceTermination": false,
"preservePartialResults": true
}
Codes de réponse
200 OK- Tâche annulée avec succès404 Not Found- Tâche non trouvée409 Conflict- Tâche déjà terminée ou ne peut pas être annulée
Obtenir les résultats d'une tâche
GET /api/{tenantId}/{projectId}/execution/job/{jobId}/results
Récupère les résultats et sorties d'une tâche d'exécution terminée, incluant les artefacts générés, rapports et fichiers de données.
Paramètres de requête
| Paramètre | Type | Description |
|---|---|---|
format |
string | Format de réponse : summary, detailed, download (par défaut : summary) |
includeArtifacts |
boolean | Inclure les artefacts téléchargeables dans la réponse (par défaut : true) |
outputType |
string | Filtrer par type de sortie : reports, data, models, visualizations |
Réponse
{
"jobId": "cc0e8400-e29b-41d4-a716-446655440000",
"status": "Completed",
"completionTime": "2024-01-20T11:12:00Z",
"totalDuration": "42 minutes",
"success": true,
"summary": {
"recordsProcessed": 192850,
"outputsGenerated": 7,
"dataQualityScore": 94.2,
"processingEfficiency": 87.5
},
"results": {
"primaryOutput": {
"type": "ProcessMiningReport",
"title": "Customer Journey Analysis Report",
"format": "html",
"size": "2.3 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-cc0e8400.html"
},
"additionalOutputs": [
{
"type": "EnrichedDataset",
"title": "Customer Journey Data Enhanced",
"format": "csv",
"recordCount": 192850,
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/data-cc0e8400.csv"
},
{
"type": "ProcessMap",
"title": "Customer Journey Process Map",
"format": "svg",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/map-cc0e8400.svg"
},
{
"type": "AnalyticsModel",
"title": "Journey Prediction Model",
"format": "pkl",
"accuracy": 0.89,
"size": "12.4 MB",
"downloadUrl": "https://api.mindzie.com/downloads/model-cc0e8400.pkl"
}
]
},
"executionMetrics": {
"totalCpuTime": "38.5 minutes",
"peakMemoryUsage": "3.2 GB",
"diskIoOperations": 45672,
"networkDataTransfer": "567 MB"
},
"qualityMetrics": {
"dataValidation": {
"totalRecords": 195000,
"validRecords": 192850,
"duplicatesRemoved": 1890,
"invalidRecords": 260
},
"processingErrors": [],
"warnings": [
{
"type": "DataQuality",
"message": "Some timestamps had to be inferred",
"count": 125
}
]
}
}
Retenter une tâche échouée
POST /api/{tenantId}/{projectId}/execution/job/{jobId}/retry
Retente l'exécution d'une tâche échouée avec une modification optionnelle des paramètres. La tâche sera remise en file d'attente avec la même ou une configuration mise à jour.
Corps de la requête
{
"retryReason": "Infrastructure issue resolved",
"modifyParameters": true,
"updatedParameters": {
"timeoutMinutes": 180,
"retryFailedRecords": true,
"increaseMemoryLimit": true
},
"priority": "High",
"immediateExecution": false
}
Réponse
Retourne 200 OK avec un nouvel objet tâche contenant un nouvel ID de tâche et les informations de tentative.
Obtenir le statut d'exécution du système
GET /api/{tenantId}/execution/system/status
Récupère le statut courant d'exécution à l'échelle du système incluant l'utilisation des ressources, la santé de la file d'attente et les métriques de performance.
Réponse
{
"systemStatus": "Healthy",
"timestamp": "2024-01-20T10:45:00Z",
"executionNodes": [
{
"nodeId": "worker-node-01",
"status": "Active",
"cpuUsage": 67,
"memoryUsage": 78,
"activeJobs": 2,
"jobCapacity": 4
},
{
"nodeId": "worker-node-02",
"status": "Active",
"cpuUsage": 45,
"memoryUsage": 56,
"activeJobs": 1,
"jobCapacity": 4
}
],
"queueStatistics": {
"totalQueuedJobs": 15,
"highPriorityJobs": 3,
"normalPriorityJobs": 10,
"lowPriorityJobs": 2,
"averageWaitTime": "4.2 minutes",
"estimatedProcessingTime": "23 minutes"
},
"performanceMetrics": {
"jobsCompletedToday": 847,
"averageJobDuration": "18.5 minutes",
"successRate": 97.8,
"throughputPerHour": 35.2
},
"resourceUtilization": {
"totalCpuCapacity": 1600,
"usedCpuCapacity": 896,
"totalMemoryCapacity": "64 GB",
"usedMemoryCapacity": "38.4 GB",
"diskSpaceAvailable": "2.3 TB"
}
}
Exemple : Flux complet de gestion des tâches
Cet exemple illustre la soumission d'une tâche, le suivi de sa progression et la récupération des résultats :
// 1. Soumettre une nouvelle tâche
const submitJob = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/execution/job', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
jobName: 'Customer Behavior Analysis',
jobDescription: 'Weekly analysis of customer interaction patterns',
jobType: 'ProcessMining',
priority: 'High',
resource: {
resourceType: 'Pipeline',
resourceId: '770e8400-e29b-41d4-a716-446655440000'
},
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-13',
endDate: '2024-01-19'
},
includeAnomalyDetection: true,
outputFormat: 'detailed_report'
},
scheduling: {
executeImmediately: true,
timeoutMinutes: 90
},
notifications: {
onCompletion: true,
onFailure: true,
emailRecipients: ['analyst@company.com']
}
})
});
return await response.json();
};
// 2. Surveiller la progression de la tâche
const monitorJob = async (jobId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const job = await response.json();
console.log(`Job ${jobId}: ${job.status} (${job.progress.percentage}%)`);
console.log(`Current stage: ${job.progress.currentStage}`);
console.log(`ETA: ${job.progress.estimatedCompletion}`);
if (job.status === 'Running' || job.status === 'Queued') {
setTimeout(() => checkStatus(), 30000); // Vérifier toutes les 30 secondes
} else if (job.status === 'Completed') {
console.log('Tâche terminée avec succès !');
await getJobResults(jobId);
} else if (job.status === 'Failed') {
console.log('Tâche échouée :', job.error);
}
};
await checkStatus();
};
// 3. Obtenir les résultats de la tâche
const getJobResults = async (jobId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/execution/job/${jobId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Résultats de la tâche :', results.summary);
console.log('Sortie principale :', results.results.primaryOutput.downloadUrl);
// Télécharger les sorties additionnelles
for (const output of results.results.additionalOutputs) {
console.log(`Télécharger ${output.type} : ${output.downloadUrl}`);
}
return results;
};
// 4. Obtenir le statut système
const getSystemStatus = async () => {
const response = await fetch('/api/{tenantId}/execution/system/status', {
headers: {
'Authorization': `Bearer ${token}`
}
});
const status = await response.json();
console.log(`Statut du système : ${status.systemStatus}`);
console.log(`File d'attente : ${status.queueStatistics.totalQueuedJobs} tâches en attente`);
console.log(`Temps d'attente moyen : ${status.queueStatistics.averageWaitTime}`);
return status;
};
// Exécuter le flux de travail
submitJob()
.then(job => {
console.log(`Tâche soumise : ${job.jobId}`);
console.log(`Position dans la file d'attente : ${job.queuePosition}`);
console.log(`Début estimé : ${job.estimatedStartTime}`);
return monitorJob(job.jobId);
})
.catch(error => console.error('Échec du flux de tâches :', error));
Exemple Python
import requests
import time
import json
from datetime import datetime, timedelta
class ExecutionManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def submit_job(self, job_name, job_type, resource_type, resource_id, parameters=None, priority="Normal"):
"""Soumettre une nouvelle tâche d'exécution"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job"
payload = {
'jobName': job_name,
'jobType': job_type,
'priority': priority,
'resource': {
'resourceType': resource_type,
'resourceId': resource_id
},
'parameters': parameters or {},
'scheduling': {
'executeImmediately': True,
'timeoutMinutes': 120
},
'notifications': {
'onCompletion': True,
'onFailure': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_job_status(self, job_id):
"""Obtenir le statut actuel de la tâche"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_jobs(self, status=None, job_type=None, date_from=None, date_to=None, page=1, page_size=20):
"""Lister les tâches avec filtrage optionnel"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/jobs"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if job_type:
params['jobType'] = job_type
if date_from:
params['dateFrom'] = date_from.isoformat()
if date_to:
params['dateTo'] = date_to.isoformat()
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def wait_for_completion(self, job_id, poll_interval=30, timeout=3600):
"""Attendre la complétion de la tâche avec vérifications périodiques de statut"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_id)
print(f"Tâche {job_id} : {job['status']} ({job['progress']['percentage']}%)")
print(f" Étape actuelle : {job['progress']['currentStage']}")
print(f" Temps écoulé : {job['progress']['elapsedTime']}")
if job['status'] in ['Completed', 'Failed', 'Cancelled']:
return job
time.sleep(poll_interval)
raise TimeoutError(f"La tâche {job_id} n'a pas terminé dans les {timeout} secondes")
def get_job_results(self, job_id, format_type="detailed", include_artifacts=True):
"""Obtenir les résultats de l'exécution de la tâche"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_job(self, job_id, reason="User cancellation", force=False):
"""Annuler une tâche en cours ou en file d'attente"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}"
payload = {
'reason': reason,
'forceTermination': force,
'preservePartialResults': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.status_code == 200
def retry_job(self, job_id, reason="Retry after failure", priority=None, modify_params=None):
"""Retenter une tâche échouée"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/execution/job/{job_id}/retry"
payload = {
'retryReason': reason,
'modifyParameters': modify_params is not None,
'immediateExecution': False
}
if priority:
payload['priority'] = priority
if modify_params:
payload['updatedParameters'] = modify_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_system_status(self):
"""Obtenir le statut d'exécution global du système"""
url = f"{self.base_url}/api/{self.tenant_id}/execution/system/status"
response = requests.get(url, headers=self.headers)
return response.json()
# Exemple d'utilisation
manager = ExecutionManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Vérifier le statut du système
system_status = manager.get_system_status()
print(f"Statut du système : {system_status['systemStatus']}")
print(f"Tâches en file d'attente : {system_status['queueStatistics']['totalQueuedJobs']}")
print(f"Temps d'attente moyen : {system_status['queueStatistics']['averageWaitTime']}")
# Soumettre une tâche de process mining complète
job_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'includeAnomalyDetection': True,
'includeProcessVariants': True,
'generateInsights': True,
'outputFormat': 'detailed_report',
'performanceMetrics': ['cycle_time', 'waiting_time', 'resource_utilization'],
'qualityChecks': {
'validateTimestamps': True,
'checkDuplicates': True,
'validateActivities': True
}
}
job = manager.submit_job(
'Monthly Process Analytics',
'ProcessMining',
'Pipeline',
'pipeline-guid',
job_params,
'High'
)
print(f"Tâche soumise : {job['jobId']}")
print(f"Position dans la file d'attente : {job['queuePosition']}")
print(f"Début estimé : {job['estimatedStartTime']}")
# Attendre la complétion
final_job = manager.wait_for_completion(job['jobId'])
if final_job['status'] == 'Completed':
# Obtenir les résultats détaillés
results = manager.get_job_results(job['jobId'])
print("Tâche terminée avec succès !")
print(f"Enregistrements traités : {results['summary']['recordsProcessed']:,}")
print(f"Score de qualité des données : {results['summary']['dataQualityScore']}")
print(f"Efficacité du traitement : {results['summary']['processingEfficiency']}%")
# Télécharger le rapport principal
print(f"Télécharger le rapport : {results['results']['primaryOutput']['downloadUrl']}")
# Lister toutes les sorties additionnelles
for output in results['results']['additionalOutputs']:
print(f"Télécharger {output['type']} : {output['downloadUrl']}")
else:
print(f"Tâche échouée avec le statut : {final_job['status']}")
if 'error' in final_job:
print(f"Erreur : {final_job['error']}")
except Exception as e:
print(f"Erreur dans le flux d'exécution : {e}")