Long-Running Operations
Handle asynchronous operations with callbacks, webhooks, and real-time status updates.
Start Async Operation
POST /api/{tenantId}/{projectId}/async/operation
Initiates a long-running asynchronous operation and returns an operation ID for tracking. Supports callback URLs and webhook notifications.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
tenantId |
GUID | Path | The tenant identifier |
projectId |
GUID | Path | The project identifier |
Request Body
{
"operationType": "ProcessMiningAnalysis",
"operationName": "Comprehensive Customer Journey Analysis",
"operationDescription": "Deep analysis of customer interaction patterns with advanced ML algorithms",
"priority": "High",
"parameters": {
"datasetId": "880e8400-e29b-41d4-a716-446655440000",
"analysisType": "comprehensive",
"timeWindow": {
"startDate": "2024-01-01",
"endDate": "2024-01-31"
},
"algorithmSettings": {
"useAdvancedML": true,
"enableAnomalyDetection": true,
"performanceOptimization": "high_accuracy"
},
"outputOptions": {
"generateReports": true,
"createVisualizations": true,
"exportFormats": ["PDF", "CSV", "JSON"]
}
},
"callbacks": {
"onProgress": "https://your-app.com/webhooks/progress",
"onCompletion": "https://your-app.com/webhooks/completion",
"onError": "https://your-app.com/webhooks/error"
},
"notifications": {
"email": ["analyst@company.com", "manager@company.com"],
"slack": {
"channel": "#process-mining",
"mentionUsers": ["@analyst", "@data-team"]
}
},
"timeout": 7200,
"retryPolicy": {
"maxRetries": 3,
"retryDelay": 300,
"backoffMultiplier": 2.0
}
}
Response
{
"operationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"operationType": "ProcessMiningAnalysis",
"operationName": "Comprehensive Customer Journey Analysis",
"status": "Initiated",
"estimatedDuration": "45-60 minutes",
"estimatedCompletion": "2024-01-20T12:15:00Z",
"trackingUrl": "/api/{tenantId}/{projectId}/async/operation/op-ff0e8400-e29b-41d4-a716-446655440000",
"webhooksRegistered": 3,
"priority": "High",
"dateCreated": "2024-01-20T11:15:00Z",
"timeoutAt": "2024-01-20T13:15:00Z",
"resourcesAllocated": {
"cpuUnits": 4,
"memoryGB": 8,
"estimatedCost": "$2.45"
}
}
Get Operation Status
GET /api/{tenantId}/{projectId}/async/operation/{operationId}
Retrieves the current status and progress of an asynchronous operation, including detailed execution information and estimated completion times.
Parameters
| Parameter | Type | Location | Description |
|---|---|---|---|
operationId |
string | Path | The operation identifier |
Response
{
"operationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"operationType": "ProcessMiningAnalysis",
"operationName": "Comprehensive Customer Journey Analysis",
"status": "Running",
"progress": {
"percentage": 67,
"currentPhase": "Machine Learning Analysis",
"phasesCompleted": 2,
"totalPhases": 3,
"startTime": "2024-01-20T11:18:00Z",
"elapsedTime": "23 minutes 15 seconds",
"estimatedRemaining": "15-20 minutes",
"estimatedCompletion": "2024-01-20T12:05:00Z"
},
"execution": {
"executionId": "exec-aa1e8400-e29b-41d4-a716-446655440000",
"workerNode": "async-worker-03",
"resourceUsage": {
"cpuUsage": 78,
"memoryUsage": "6.2 GB",
"diskUsage": "1.3 GB",
"networkIO": "45 MB"
},
"processedRecords": 125430,
"totalRecords": 187250,
"processingRate": "1890 records/minute"
},
"phases": [
{
"phaseName": "Data Loading & Validation",
"status": "Completed",
"startTime": "2024-01-20T11:18:00Z",
"endTime": "2024-01-20T11:25:00Z",
"duration": "7 minutes",
"recordsProcessed": 187250,
"validationResults": {
"validRecords": 187248,
"errorRecords": 2,
"dataQualityScore": 99.9
}
},
{
"phaseName": "Process Discovery",
"status": "Completed",
"startTime": "2024-01-20T11:25:00Z",
"endTime": "2024-01-20T11:38:00Z",
"duration": "13 minutes",
"results": {
"activitiesDiscovered": 52,
"processVariants": 347,
"uniquePaths": 289
}
},
{
"phaseName": "Machine Learning Analysis",
"status": "Running",
"startTime": "2024-01-20T11:38:00Z",
"progress": 72,
"currentActivity": "Training anomaly detection models",
"modelsTraining": 3,
"modelsCompleted": 2
},
{
"phaseName": "Report Generation",
"status": "Pending",
"estimatedStartTime": "2024-01-20T11:55:00Z",
"estimatedDuration": "8-10 minutes"
}
],
"callbacks": {
"progressCallbacksSent": 15,
"lastProgressCallback": "2024-01-20T11:40:00Z",
"callbacksSuccessful": 15,
"callbacksFailed": 0
},
"dateCreated": "2024-01-20T11:15:00Z",
"timeoutAt": "2024-01-20T13:15:00Z",
"priority": "High"
}
List Async Operations
GET /api/{tenantId}/{projectId}/async/operations
Retrieves a list of asynchronous operations with filtering and pagination options. Useful for monitoring multiple long-running operations.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
status |
string | Filter by status: Initiated, Running, Completed, Failed, Cancelled, Timeout |
operationType |
string | Filter by operation type: ProcessMiningAnalysis, DataEnrichment, ReportGeneration |
priority |
string | Filter by priority: Low, Normal, High, Critical |
dateFrom |
datetime | Filter operations from this date |
dateTo |
datetime | Filter operations to this date |
includeDetails |
boolean | Include detailed execution information (default: false) |
page |
integer | Page number for pagination (default: 1) |
pageSize |
integer | Number of items per page (default: 20, max: 100) |
Response
{
"operations": [
{
"operationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"operationType": "ProcessMiningAnalysis",
"operationName": "Comprehensive Customer Journey Analysis",
"status": "Running",
"progress": 67,
"priority": "High",
"startTime": "2024-01-20T11:18:00Z",
"estimatedCompletion": "2024-01-20T12:05:00Z",
"currentPhase": "Machine Learning Analysis",
"resourceUsage": {
"cpuUsage": 78,
"memoryUsage": "6.2 GB"
}
},
{
"operationId": "op-gg1e8400-e29b-41d4-a716-446655440000",
"operationType": "DataEnrichment",
"operationName": "Sales Data Processing",
"status": "Completed",
"progress": 100,
"priority": "Normal",
"startTime": "2024-01-20T10:45:00Z",
"endTime": "2024-01-20T11:10:00Z",
"duration": "25 minutes",
"recordsProcessed": 89420
}
],
"summary": {
"totalOperations": 47,
"running": 3,
"completed": 41,
"failed": 2,
"cancelled": 1
},
"page": 1,
"pageSize": 20,
"hasNextPage": true
}
Cancel Async Operation
DELETE /api/{tenantId}/{projectId}/async/operation/{operationId}
Cancels a running or pending asynchronous operation. The operation will be stopped gracefully, preserving any completed work.
Request Body (Optional)
{
"reason": "User requested cancellation due to changed requirements",
"preservePartialResults": true,
"forceTermination": false,
"notifyCallbacks": true
}
Response
{
"operationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"status": "Cancelled",
"cancellationTime": "2024-01-20T11:42:00Z",
"reason": "User requested cancellation due to changed requirements",
"progressAtCancellation": 67,
"phaseAtCancellation": "Machine Learning Analysis",
"partialResults": {
"available": true,
"completedPhases": 2,
"downloadUrls": [
"https://api.mindzie.com/downloads/partial-results-ff0e8400.zip"
]
},
"resourcesReleased": {
"cpuUnits": 4,
"memoryGB": 8,
"costSaved": "$1.20"
},
"cancelledBy": "user123"
}
Get Operation Results
GET /api/{tenantId}/{projectId}/async/operation/{operationId}/results
Retrieves the complete results of a finished asynchronous operation, including all generated outputs, reports, and downloadable artifacts.
Query Parameters
| Parameter | Type | Description |
|---|---|---|
format |
string | Response format: summary, detailed, download (default: summary) |
includeArtifacts |
boolean | Include downloadable artifacts in response (default: true) |
phase |
string | Get results from specific phase only |
Response
{
"operationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"operationType": "ProcessMiningAnalysis",
"operationName": "Comprehensive Customer Journey Analysis",
"status": "Completed",
"completionTime": "2024-01-20T12:03:00Z",
"totalDuration": "45 minutes",
"success": true,
"summary": {
"recordsAnalyzed": 187248,
"processVariants": 347,
"anomaliesDetected": 23,
"modelsGenerated": 3,
"reportsCreated": 5,
"dataQualityScore": 94.7,
"overallConfidenceScore": 91.2
},
"phaseResults": [
{
"phaseName": "Data Loading & Validation",
"status": "Completed",
"results": {
"recordsLoaded": 187250,
"validRecords": 187248,
"dataQualityScore": 99.9,
"validationErrors": [
{
"type": "Missing Timestamp",
"count": 2,
"resolved": true
}
]
}
},
{
"phaseName": "Process Discovery",
"status": "Completed",
"results": {
"processModel": {
"activities": 52,
"transitions": 178,
"variants": 347,
"complexity": "Medium-High"
},
"performanceMetrics": {
"averageCycleTime": "4.2 hours",
"medianCycleTime": "3.1 hours",
"bottleneckActivities": ["Review Application", "Manager Approval"],
"efficiency": 78.3
}
}
},
{
"phaseName": "Machine Learning Analysis",
"status": "Completed",
"results": {
"anomalies": {
"detected": 23,
"highSeverity": 5,
"mediumSeverity": 12,
"lowSeverity": 6,
"falsePositiveRate": 0.03
},
"predictions": {
"cycleTimePrediction": {
"accuracy": 0.89,
"meanAbsoluteError": "0.3 hours"
},
"pathPrediction": {
"accuracy": 0.92,
"confidence": 0.87
}
},
"patterns": {
"frequentPatterns": 15,
"rarePatterns": 8,
"criticalPaths": 3
}
}
}
],
"artifacts": [
{
"name": "Process Mining Analysis Report",
"type": "Report",
"format": "PDF",
"size": "3.2 MB",
"downloadUrl": "https://api.mindzie.com/downloads/report-ff0e8400.pdf",
"description": "Comprehensive analysis report with insights and recommendations"
},
{
"name": "Process Model Visualization",
"type": "Visualization",
"format": "SVG",
"size": "890 KB",
"downloadUrl": "https://api.mindzie.com/downloads/process-map-ff0e8400.svg",
"description": "Interactive process flow diagram"
},
{
"name": "Anomaly Detection Results",
"type": "Dataset",
"format": "CSV",
"size": "1.8 MB",
"downloadUrl": "https://api.mindzie.com/downloads/anomalies-ff0e8400.csv",
"description": "Detailed anomaly analysis with severity scores"
},
{
"name": "Predictive Models",
"type": "Model",
"format": "PKL",
"size": "45.7 MB",
"downloadUrl": "https://api.mindzie.com/downloads/models-ff0e8400.zip",
"description": "Trained ML models for cycle time and path prediction"
}
],
"performance": {
"totalExecutionTime": "45 minutes",
"resourceUtilization": {
"averageCpuUsage": 72,
"peakMemoryUsage": "7.8 GB",
"totalCpuHours": 3.0,
"totalCost": "$2.31"
},
"throughput": "4161 records/minute",
"efficiency": 87.2
},
"recommendations": [
{
"category": "Process Optimization",
"priority": "High",
"recommendation": "Focus on reducing wait times in 'Manager Approval' activity",
"expectedImprovement": "25% reduction in overall cycle time"
},
{
"category": "Data Quality",
"priority": "Medium",
"recommendation": "Implement automated timestamp validation",
"expectedImprovement": "Improved data quality score to 99.5%"
}
]
}
Register Webhook
POST /api/{tenantId}/{projectId}/async/webhooks
Registers a webhook endpoint to receive real-time notifications about asynchronous operations. Supports multiple event types and custom filtering.
Request Body
{
"webhookUrl": "https://your-app.com/webhooks/async-operations",
"webhookName": "Main Operations Webhook",
"events": [
"operation.started",
"operation.progress",
"operation.phase.completed",
"operation.completed",
"operation.failed",
"operation.cancelled"
],
"filters": {
"operationTypes": ["ProcessMiningAnalysis", "DataEnrichment"],
"priorities": ["High", "Critical"],
"minProgressIncrement": 10
},
"authentication": {
"type": "hmac-sha256",
"secret": "your-webhook-secret-key"
},
"retryPolicy": {
"maxRetries": 5,
"retryDelay": 60,
"backoffMultiplier": 2.0,
"maxDelay": 3600
},
"headers": {
"X-Source": "mindzie-api",
"X-Environment": "production"
}
}
Response
{
"webhookId": "wh-123e8400-e29b-41d4-a716-446655440000",
"webhookUrl": "https://your-app.com/webhooks/async-operations",
"webhookName": "Main Operations Webhook",
"status": "Active",
"eventsSubscribed": [
"operation.started",
"operation.progress",
"operation.phase.completed",
"operation.completed",
"operation.failed",
"operation.cancelled"
],
"filters": {
"operationTypes": ["ProcessMiningAnalysis", "DataEnrichment"],
"priorities": ["High", "Critical"],
"minProgressIncrement": 10
},
"createdAt": "2024-01-20T11:45:00Z",
"lastDelivery": null,
"deliveryStats": {
"totalDeliveries": 0,
"successfulDeliveries": 0,
"failedDeliveries": 0,
"averageResponseTime": null
}
}
Retry Failed Operation
POST /api/{tenantId}/{projectId}/async/operation/{operationId}/retry
Retries a failed asynchronous operation with optional parameter modifications. Can resume from the point of failure or restart completely.
Request Body
{
"retryMode": "resume",
"retryReason": "Infrastructure issue resolved, retrying with increased resources",
"modifyParameters": true,
"updatedParameters": {
"algorithmSettings": {
"useAdvancedML": true,
"enableAnomalyDetection": true,
"performanceOptimization": "high_throughput"
},
"resourceAllocation": {
"cpuUnits": 6,
"memoryGB": 12,
"priority": "Critical"
}
},
"retryPolicy": {
"maxRetries": 2,
"retryDelay": 180,
"backoffMultiplier": 1.5
},
"newTimeout": 10800,
"preserveOriginalResults": true
}
Response
{
"originalOperationId": "op-ff0e8400-e29b-41d4-a716-446655440000",
"newOperationId": "op-retry-ff0e8400-e29b-41d4-a716-446655440000",
"retryMode": "resume",
"retryNumber": 1,
"resumeFromPhase": "Machine Learning Analysis",
"status": "Initiated",
"estimatedDuration": "20-25 minutes",
"estimatedCompletion": "2024-01-20T12:30:00Z",
"preservedResults": {
"phasesPreserved": 2,
"recordsProcessed": 187248,
"progressSaved": 45
},
"resourcesAllocated": {
"cpuUnits": 6,
"memoryGB": 12,
"estimatedCost": "$3.20"
},
"retryAttemptDate": "2024-01-20T12:05:00Z"
}
Submit Batch Operations
POST /api/{tenantId}/{projectId}/async/batch
Submits multiple asynchronous operations as a batch with dependencies and coordination. Useful for complex workflows requiring multiple interconnected operations.
Request Body
{
"batchName": "Monthly Process Mining Pipeline",
"batchDescription": "Complete monthly analysis workflow with multiple datasets",
"operations": [
{
"operationName": "Data Preparation",
"operationType": "DataEnrichment",
"priority": "High",
"operationKey": "data-prep",
"parameters": {
"datasetId": "dataset-1",
"cleaningRules": ["remove_duplicates", "fix_timestamps"],
"outputFormat": "processed_csv"
}
},
{
"operationName": "Process Discovery",
"operationType": "ProcessMiningAnalysis",
"priority": "High",
"operationKey": "discovery",
"dependencies": ["data-prep"],
"parameters": {
"algorithm": "alpha_miner_enhanced",
"enableVariantAnalysis": true
}
},
{
"operationName": "Performance Analysis",
"operationType": "ProcessMiningAnalysis",
"priority": "Normal",
"operationKey": "performance",
"dependencies": ["discovery"],
"parameters": {
"enableBottleneckDetection": true,
"generateOptimizationRecommendations": true
}
}
],
"batchCallbacks": {
"onBatchStart": "https://your-app.com/webhooks/batch-start",
"onOperationComplete": "https://your-app.com/webhooks/operation-complete",
"onBatchComplete": "https://your-app.com/webhooks/batch-complete"
},
"failurePolicy": {
"stopOnFirstFailure": false,
"continueIndependentOperations": true,
"retryFailedOperations": true
}
}
Response
{
"batchId": "batch-567e8400-e29b-41d4-a716-446655440000",
"batchName": "Monthly Process Mining Pipeline",
"status": "Initiated",
"operations": [
{
"operationKey": "data-prep",
"operationId": "op-prep-890e8400-e29b-41d4-a716-446655440000",
"status": "Running",
"dependencies": [],
"estimatedDuration": "15 minutes"
},
{
"operationKey": "discovery",
"operationId": "op-disc-901e8400-e29b-41d4-a716-446655440000",
"status": "Pending",
"dependencies": ["data-prep"],
"estimatedStartTime": "2024-01-20T12:20:00Z"
},
{
"operationKey": "performance",
"operationId": "op-perf-012e8400-e29b-41d4-a716-446655440000",
"status": "Pending",
"dependencies": ["discovery"],
"estimatedStartTime": "2024-01-20T12:45:00Z"
}
],
"totalOperations": 3,
"estimatedBatchDuration": "75-90 minutes",
"estimatedBatchCompletion": "2024-01-20T13:45:00Z",
"batchStartTime": "2024-01-20T12:05:00Z",
"trackingUrl": "/api/{tenantId}/{projectId}/async/batch/batch-567e8400-e29b-41d4-a716-446655440000"
}
Example: Complete Async Operation Workflow
This example demonstrates the full lifecycle of asynchronous operations:
// 1. Register webhook for real-time notifications
const registerWebhook = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/async/webhooks', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
webhookUrl: 'https://your-app.com/webhooks/async-operations',
webhookName: 'Process Mining Webhook',
events: [
'operation.started',
'operation.progress',
'operation.completed',
'operation.failed'
],
filters: {
operationTypes: ['ProcessMiningAnalysis'],
priorities: ['High', 'Critical'],
minProgressIncrement: 15
},
authentication: {
type: 'hmac-sha256',
secret: 'your-secret-key'
}
})
});
return await response.json();
};
// 2. Start a complex async operation
const startAsyncAnalysis = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/async/operation', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
operationType: 'ProcessMiningAnalysis',
operationName: 'Advanced Customer Journey Analysis',
operationDescription: 'Deep ML-powered analysis with anomaly detection',
priority: 'High',
parameters: {
datasetId: '880e8400-e29b-41d4-a716-446655440000',
analysisType: 'comprehensive',
timeWindow: {
startDate: '2024-01-01',
endDate: '2024-01-31'
},
algorithmSettings: {
useAdvancedML: true,
enableAnomalyDetection: true,
performanceOptimization: 'high_accuracy'
},
outputOptions: {
generateReports: true,
createVisualizations: true,
exportFormats: ['PDF', 'CSV', 'JSON']
}
},
callbacks: {
onProgress: 'https://your-app.com/webhooks/progress',
onCompletion: 'https://your-app.com/webhooks/completion',
onError: 'https://your-app.com/webhooks/error'
},
notifications: {
email: ['analyst@company.com'],
slack: {
channel: '#process-mining',
mentionUsers: ['@analyst']
}
},
timeout: 7200
})
});
return await response.json();
};
// 3. Monitor operation progress
const monitorOperation = async (operationId) => {
const checkStatus = async () => {
const response = await fetch(`/api/{tenantId}/{projectId}/async/operation/${operationId}`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const operation = await response.json();
console.log(`Operation ${operationId}: ${operation.status} (${operation.progress.percentage}%)`);
console.log(`Current phase: ${operation.progress.currentPhase}`);
console.log(`ETA: ${operation.progress.estimatedCompletion}`);
if (operation.status === 'Running') {
setTimeout(() => checkStatus(), 60000); // Check every minute
} else if (operation.status === 'Completed') {
console.log('Operation completed successfully!');
await getOperationResults(operationId);
} else if (operation.status === 'Failed') {
console.log('Operation failed, attempting retry...');
await retryOperation(operationId);
}
};
await checkStatus();
};
// 4. Get operation results
const getOperationResults = async (operationId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/async/operation/${operationId}/results?format=detailed&includeArtifacts=true`, {
headers: {
'Authorization': `Bearer ${token}`
}
});
const results = await response.json();
console.log('Operation Results:', results.summary);
console.log('Generated Artifacts:');
results.artifacts.forEach(artifact => {
console.log(`- ${artifact.name} (${artifact.format}): ${artifact.downloadUrl}`);
});
return results;
};
// 5. Retry failed operation
const retryOperation = async (operationId) => {
const response = await fetch(`/api/{tenantId}/{projectId}/async/operation/${operationId}/retry`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
retryMode: 'resume',
retryReason: 'Automatic retry with increased resources',
modifyParameters: true,
updatedParameters: {
resourceAllocation: {
cpuUnits: 6,
memoryGB: 12,
priority: 'Critical'
}
},
newTimeout: 10800
})
});
const retryResult = await response.json();
console.log(`Retry operation started: ${retryResult.newOperationId}`);
// Monitor the retry operation
await monitorOperation(retryResult.newOperationId);
return retryResult;
};
// 6. Submit batch operations
const submitBatchOperations = async () => {
const response = await fetch('/api/{tenantId}/{projectId}/async/batch', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
batchName: 'Complete Process Mining Pipeline',
batchDescription: 'End-to-end analysis with data prep and reporting',
operations: [
{
operationName: 'Data Cleaning',
operationType: 'DataEnrichment',
priority: 'High',
operationKey: 'clean',
parameters: {
datasetId: 'raw-dataset-123',
cleaningRules: ['remove_duplicates', 'fix_timestamps', 'validate_activities']
}
},
{
operationName: 'Process Analysis',
operationType: 'ProcessMiningAnalysis',
priority: 'High',
operationKey: 'analyze',
dependencies: ['clean'],
parameters: {
analysisType: 'comprehensive',
enableML: true,
generateInsights: true
}
},
{
operationName: 'Report Generation',
operationType: 'ReportGeneration',
priority: 'Normal',
operationKey: 'report',
dependencies: ['analyze'],
parameters: {
reportType: 'executive_summary',
includeVisualizations: true,
exportFormats: ['PDF', 'PowerPoint']
}
}
],
failurePolicy: {
stopOnFirstFailure: false,
continueIndependentOperations: true,
retryFailedOperations: true
}
})
});
return await response.json();
};
// Execute complete async workflow
const runAsyncWorkflow = async () => {
try {
console.log('Starting async operation workflow...');
// Register webhook
const webhook = await registerWebhook();
console.log(`Webhook registered: ${webhook.webhookId}`);
// Start operation
const operation = await startAsyncAnalysis();
console.log(`Operation started: ${operation.operationId}`);
console.log(`Estimated completion: ${operation.estimatedCompletion}`);
// Monitor progress
await monitorOperation(operation.operationId);
} catch (error) {
console.error('Async workflow failed:', error);
}
};
// Start the workflow
runAsyncWorkflow();
Python Example
import requests
import time
import json
import hmac
import hashlib
from datetime import datetime, timedelta
class AsyncOperationManager:
def __init__(self, base_url, tenant_id, project_id, token):
self.base_url = base_url
self.tenant_id = tenant_id
self.project_id = project_id
self.headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
def start_operation(self, operation_type, name, parameters, priority='Normal', timeout=3600):
"""Start an asynchronous operation"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operation"
payload = {
'operationType': operation_type,
'operationName': name,
'priority': priority,
'parameters': parameters,
'timeout': timeout,
'callbacks': {
'onProgress': 'https://your-app.com/webhooks/progress',
'onCompletion': 'https://your-app.com/webhooks/completion',
'onError': 'https://your-app.com/webhooks/error'
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get_operation_status(self, operation_id):
"""Get current operation status"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operation/{operation_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def list_operations(self, status=None, operation_type=None, page=1, page_size=20):
"""List async operations with filtering"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operations"
params = {'page': page, 'pageSize': page_size}
if status:
params['status'] = status
if operation_type:
params['operationType'] = operation_type
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def cancel_operation(self, operation_id, reason="User cancellation"):
"""Cancel a running operation"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operation/{operation_id}"
payload = {
'reason': reason,
'preservePartialResults': True,
'notifyCallbacks': True
}
response = requests.delete(url, json=payload, headers=self.headers)
return response.json()
def get_operation_results(self, operation_id, format_type='detailed', include_artifacts=True):
"""Get operation results"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operation/{operation_id}/results"
params = {
'format': format_type,
'includeArtifacts': str(include_artifacts).lower()
}
response = requests.get(url, params=params, headers=self.headers)
return response.json()
def retry_operation(self, operation_id, retry_mode='resume', updated_params=None):
"""Retry a failed operation"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/operation/{operation_id}/retry"
payload = {
'retryMode': retry_mode,
'retryReason': 'Automatic retry with optimization',
'modifyParameters': updated_params is not None
}
if updated_params:
payload['updatedParameters'] = updated_params
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def register_webhook(self, webhook_url, events, filters=None):
"""Register webhook for operation notifications"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/webhooks"
payload = {
'webhookUrl': webhook_url,
'webhookName': 'Python SDK Webhook',
'events': events,
'filters': filters or {},
'authentication': {
'type': 'hmac-sha256',
'secret': 'your-webhook-secret'
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def submit_batch_operations(self, batch_name, operations, failure_policy=None):
"""Submit multiple operations as a batch"""
url = f"{self.base_url}/api/{self.tenant_id}/{self.project_id}/async/batch"
payload = {
'batchName': batch_name,
'operations': operations,
'failurePolicy': failure_policy or {
'stopOnFirstFailure': False,
'continueIndependentOperations': True,
'retryFailedOperations': True
}
}
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def wait_for_completion(self, operation_id, check_interval=60, timeout=7200):
"""Wait for operation to complete with periodic status checks"""
start_time = time.time()
while time.time() - start_time < timeout:
operation = self.get_operation_status(operation_id)
status = operation['status']
progress = operation['progress']['percentage']
print(f"Operation {operation_id}: {status} ({progress}%)")
if operation['progress']['currentPhase']:
print(f" Current phase: {operation['progress']['currentPhase']}")
if status == 'Completed':
print("Operation completed successfully!")
return operation
elif status in ['Failed', 'Cancelled', 'Timeout']:
print(f"Operation ended with status: {status}")
return operation
time.sleep(check_interval)
raise TimeoutError(f"Operation {operation_id} did not complete within {timeout} seconds")
def verify_webhook_signature(self, payload, signature, secret):
"""Verify webhook signature for security"""
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, f"sha256={expected_signature}")
# Usage example
manager = AsyncOperationManager(
'https://your-mindzie-instance.com',
'tenant-guid',
'project-guid',
'your-auth-token'
)
try:
# Register webhook for notifications
webhook = manager.register_webhook(
'https://your-app.com/webhooks/async',
['operation.started', 'operation.progress', 'operation.completed', 'operation.failed'],
{'operationTypes': ['ProcessMiningAnalysis'], 'priorities': ['High', 'Critical']}
)
print(f"Webhook registered: {webhook['webhookId']}")
# Start comprehensive process mining operation
operation_params = {
'datasetId': 'dataset-guid',
'analysisType': 'comprehensive',
'timeWindow': {
'startDate': '2024-01-01',
'endDate': '2024-01-31'
},
'algorithmSettings': {
'useAdvancedML': True,
'enableAnomalyDetection': True,
'performanceOptimization': 'high_accuracy'
},
'outputOptions': {
'generateReports': True,
'createVisualizations': True,
'exportFormats': ['PDF', 'CSV', 'JSON']
}
}
operation = manager.start_operation(
'ProcessMiningAnalysis',
'Advanced Customer Journey Analysis',
operation_params,
'High',
7200
)
print(f"Operation started: {operation['operationId']}")
print(f"Estimated duration: {operation['estimatedDuration']}")
print(f"Estimated completion: {operation['estimatedCompletion']}")
# Wait for completion
final_operation = manager.wait_for_completion(operation['operationId'])
if final_operation['status'] == 'Completed':
# Get detailed results
results = manager.get_operation_results(operation['operationId'])
print("Operation completed successfully!")
print(f"Records analyzed: {results['summary']['recordsAnalyzed']:,}")
print(f"Process variants: {results['summary']['processVariants']}")
print(f"Anomalies detected: {results['summary']['anomaliesDetected']}")
print(f"Data quality score: {results['summary']['dataQualityScore']}")
print("\nGenerated artifacts:")
for artifact in results['artifacts']:
print(f"- {artifact['name']} ({artifact['format']}): {artifact['downloadUrl']}")
print("\nRecommendations:")
for rec in results['recommendations']:
print(f"- {rec['category']} ({rec['priority']}): {rec['recommendation']}")
else:
print(f"Operation failed with status: {final_operation['status']}")
# Try to retry if failed
if final_operation['status'] == 'Failed':
print("Attempting to retry operation...")
retry_result = manager.retry_operation(
operation['operationId'],
'resume',
{'resourceAllocation': {'cpuUnits': 6, 'memoryGB': 12}}
)
print(f"Retry operation started: {retry_result['newOperationId']}")
except Exception as e:
print(f"Error in async operation workflow: {e}")