Overview
The Python enrichment is one of the most powerful and flexible enrichment operators in mindzieStudio, enabling you to write custom Python code to transform, analyze, and enrich your process mining data. This operator provides direct access to your event log data through Pandas DataFrames, allowing you to leverage the full Python ecosystem including libraries like NumPy, Pandas, and custom business logic to create sophisticated data transformations that go beyond standard enrichments.
With a usage frequency of 95%, Python is one of the most commonly used enrichments in mindzieStudio. It bridges the gap between standard process mining operations and advanced data science workflows, enabling data scientists and analysts to apply custom algorithms, complex business rules, and advanced analytics directly within their process mining pipeline. The operator seamlessly integrates Python code execution with mindzieStudio's data model, automatically handling data serialization, type conversion, and result integration back into your dataset.
Common Uses
- Calculate complex KPIs that require custom business logic not available in standard calculators
- Apply machine learning models for prediction, classification, or clustering directly on process data
- Perform advanced text processing and natural language processing on event attributes
- Implement custom conformance checks based on complex business rules
- Create derived attributes using statistical analysis and advanced mathematical operations
- Integrate external data sources by calling APIs or reading external files within Python code
- Build custom data quality checks and validation rules specific to your business domain
Settings
Filter: Optional filter to limit which cases are processed by the Python script. This allows you to apply transformations only to specific subsets of your data, improving performance and enabling targeted analysis. When no filter is applied, the Python code processes all cases in the dataset.
Columns: Select which existing columns from your dataset should be made available to the Python script. These columns will be accessible in the case_table and event_table DataFrames. Only selected columns are passed to Python to minimize memory usage and improve performance. The CaseId column is always included automatically.
Change Columns: Specify which of the selected columns can be modified by your Python script. This setting allows you to update existing attribute values while maintaining data integrity. Only columns that were selected in the Columns setting can be marked for modification.
New Columns: Define new attributes that your Python script will create. For each new column, you must specify:
- Column Name: The internal name used in Python code
- Display Name: The user-friendly name shown in mindzieStudio
- Data Type: The data type (String, Integer, DateTime, Boolean, Double)
- Source Type: Whether the attribute is added at Case or Event level
- Format: Optional display format for the attribute
Python Code: The Python script that will be executed on your data. Your code has access to:
case_table: Pandas DataFrame containing case-level attributesevent_table: Pandas DataFrame containing event-level data with columns for InternalEventIndex, CaseId, ActivityName, ActivityTime, and any selected event attributes
The script should modify these DataFrames in place. Any changes to existing columns (marked in Change Columns) or additions of new columns (defined in New Columns) will be automatically integrated back into your dataset.
Python Image: Specifies the Python execution environment. Options include:
- LOCAL: Uses local Python installation (if available)
- Docker image name: Specific Docker image with required Python packages
- Default: mindzie's standard Python environment with common data science libraries
Examples
Example 1: Calculate Order Processing Efficiency Score
Scenario: In an order-to-cash process, you need to calculate a custom efficiency score based on order value, processing time, and number of rework activities.
Settings:
- Filter: None (process all cases)
- Columns: OrderValue, CustomerPriority
- Change Columns: None
- New Columns:
- Column Name: EfficiencyScore
- Display Name: Efficiency Score
- Data Type: Double
- Source Type: Case
- Python Code:
# Calculate efficiency score for each order
import numpy as np
# Count rework activities per case
rework_activities = ['Order Correction', 'Price Adjustment', 'Approval Retry']
event_table['IsRework'] = event_table['ActivityName'].isin(rework_activities)
rework_counts = event_table.groupby('CaseId')['IsRework'].sum().reset_index()
rework_counts.columns = ['CaseId', 'ReworkCount']
# Calculate case duration in days
case_durations = event_table.groupby('CaseId')['ActivityTime'].agg(['min', 'max'])
case_durations['Duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 86400
case_durations = case_durations.reset_index()[['CaseId', 'Duration']]
# Merge with case table
case_table = case_table.merge(rework_counts, on='CaseId', how='left')
case_table = case_table.merge(case_durations, on='CaseId', how='left')
# Calculate efficiency score (0-100)
case_table['EfficiencyScore'] = 100 * (
(case_table['OrderValue'] / case_table['OrderValue'].max()) * 0.4 +
(1 - case_table['ReworkCount'] / 10) * 0.3 +
(1 - case_table['Duration'] / 30) * 0.3
)
case_table['EfficiencyScore'] = np.clip(case_table['EfficiencyScore'], 0, 100)
# Clean up temporary columns
case_table = case_table.drop(['ReworkCount', 'Duration'], axis=1)
- Python Image: LOCAL
Output: Creates a new case attribute "Efficiency Score" with values ranging from 0 to 100, where higher scores indicate more efficient order processing based on the combination of order value, minimal rework, and faster processing time.
Insights: This custom score helps identify which orders are processed most efficiently and can be used to benchmark performance, identify best practices, and prioritize process improvement initiatives.
Example 2: Detect Anomalous Event Sequences
Scenario: In a healthcare patient treatment process, identify cases where the sequence of medical procedures deviates from standard protocols.
Settings:
- Filter: None
- Columns: PatientAge, Department
- Change Columns: None
- New Columns:
- Column Name: HasAnomalousSequence
- Display Name: Anomalous Sequence Detected
- Data Type: Boolean
- Source Type: Case
- Column Name: AnomalyDescription
- Display Name: Anomaly Description
- Data Type: String
- Source Type: Case
- Python Code:
# Define expected sequence patterns
normal_sequences = [
['Registration', 'Triage', 'Examination', 'Treatment', 'Discharge'],
['Registration', 'Triage', 'Examination', 'Lab Test', 'Treatment', 'Discharge'],
['Registration', 'Emergency Assessment', 'Treatment', 'Observation', 'Discharge']
]
def check_sequence_anomaly(group):
activities = group.sort_values('ActivityTime')['ActivityName'].tolist()
# Check for repeated activities
if len(activities) != len(set(activities)):
return True, "Repeated activities detected"
# Check for out-of-order activities
if 'Discharge' in activities and activities.index('Discharge') < len(activities) - 1:
return True, "Activities after discharge"
if 'Registration' in activities and activities.index('Registration') > 0:
return True, "Registration not first activity"
# Check if sequence matches any normal pattern
matches_normal = any(
all(act in activities for act in normal_seq)
for normal_seq in normal_sequences
)
if not matches_normal and len(activities) > 3:
return True, "Non-standard sequence pattern"
return False, ""
# Apply anomaly detection to each case
anomaly_results = event_table.groupby('CaseId').apply(check_sequence_anomaly)
# Add results to case table
case_table['HasAnomalousSequence'] = case_table['CaseId'].map(
lambda x: anomaly_results[x][0] if x in anomaly_results.index else False
)
case_table['AnomalyDescription'] = case_table['CaseId'].map(
lambda x: anomaly_results[x][1] if x in anomaly_results.index else ""
)
- Python Image: LOCAL
Output: Creates two new case attributes:
- "Anomalous Sequence Detected": Boolean flag indicating if the case has an unusual sequence
- "Anomaly Description": Text description explaining the type of anomaly detected
Insights: This enrichment helps identify cases that deviate from standard medical protocols, enabling quality assurance teams to investigate potential issues and ensure patient safety.
Example 3: Calculate Supplier Performance Metrics
Scenario: In a procurement process, calculate comprehensive supplier performance metrics based on delivery times, quality issues, and order completeness.
Settings:
- Filter: ActivityName = "Goods Receipt"
- Columns: SupplierID, OrderQuantity, ReceivedQuantity
- Change Columns: None
- New Columns:
- Column Name: OnTimeDeliveryRate
- Display Name: On-Time Delivery Rate %
- Data Type: Double
- Source Type: Case
- Column Name: QualityScore
- Display Name: Supplier Quality Score
- Data Type: Double
- Source Type: Case
- Python Code:
# Calculate delivery performance
def calculate_supplier_metrics(group):
po_created = group[group['ActivityName'] == 'PO Created']['ActivityTime'].min()
goods_received = group[group['ActivityName'] == 'Goods Receipt']['ActivityTime'].max()
# Expected delivery time is 5 business days
expected_days = 5
actual_days = np.busday_count(po_created.date(), goods_received.date())
on_time = 1 if actual_days <= expected_days else 0
# Check for quality issues
has_quality_issue = 'Quality Issue' in group['ActivityName'].values
has_return = 'Return to Supplier' in group['ActivityName'].values
quality_score = 100
if has_quality_issue:
quality_score -= 30
if has_return:
quality_score -= 40
return pd.Series({
'OnTimeDelivery': on_time,
'QualityScore': quality_score
})
# Calculate metrics for each case
supplier_metrics = event_table.groupby('CaseId').apply(calculate_supplier_metrics)
# Aggregate by supplier
supplier_performance = case_table.merge(supplier_metrics, left_on='CaseId', right_index=True)
supplier_summary = supplier_performance.groupby('SupplierID').agg({
'OnTimeDelivery': 'mean',
'QualityScore': 'mean'
}).reset_index()
supplier_summary.columns = ['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']
# Add back to case table
case_table = case_table.merge(
supplier_summary[['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']],
on='SupplierID',
how='left'
)
case_table['OnTimeDeliveryRate'] = case_table['OnTimeDeliveryRate'] * 100
case_table.rename(columns={'AvgQualityScore': 'QualityScore'}, inplace=True)
- Python Image: LOCAL
Output: Creates supplier performance metrics at the case level:
- "On-Time Delivery Rate %": Percentage of orders delivered on time by this supplier
- "Supplier Quality Score": Quality score from 0-100 based on quality issues and returns
Insights: These metrics enable procurement teams to evaluate supplier performance objectively, support vendor selection decisions, and identify suppliers requiring performance improvement interventions.
Example 4: Text Mining on Process Comments
Scenario: In an IT service management process, analyze ticket comments to categorize issues and detect sentiment.
Settings:
- Filter: None
- Columns: TicketDescription, ResolutionNotes
- Change Columns: None
- New Columns:
- Column Name: IssueCategory
- Display Name: Issue Category
- Data Type: String
- Source Type: Case
- Column Name: CustomerSentiment
- Display Name: Customer Sentiment
- Data Type: String
- Source Type: Case
- Python Code:
import re
# Define keywords for categorization
category_keywords = {
'Hardware': ['laptop', 'desktop', 'printer', 'mouse', 'keyboard', 'monitor', 'hardware'],
'Software': ['application', 'software', 'program', 'install', 'update', 'crash', 'error'],
'Network': ['network', 'internet', 'wifi', 'connection', 'vpn', 'firewall'],
'Access': ['password', 'login', 'access', 'permission', 'authentication', 'account'],
'Other': []
}
# Sentiment indicators
negative_words = ['slow', 'broken', 'failed', 'cannot', 'unable', 'frustrated', 'urgent', 'critical']
positive_words = ['resolved', 'working', 'fixed', 'thank', 'great', 'excellent', 'happy']
def categorize_issue(text):
if pd.isna(text):
return 'Other'
text_lower = text.lower()
for category, keywords in category_keywords.items():
if any(keyword in text_lower for keyword in keywords):
return category
return 'Other'
def analyze_sentiment(text):
if pd.isna(text):
return 'Neutral'
text_lower = text.lower()
negative_count = sum(1 for word in negative_words if word in text_lower)
positive_count = sum(1 for word in positive_words if word in text_lower)
if negative_count > positive_count:
return 'Negative'
elif positive_count > negative_count:
return 'Positive'
else:
return 'Neutral'
# Apply text analysis
case_table['IssueCategory'] = case_table['TicketDescription'].apply(categorize_issue)
case_table['CustomerSentiment'] = case_table['TicketDescription'].apply(analyze_sentiment)
- Python Image: LOCAL
Output: Creates two text-derived attributes:
- "Issue Category": Categorization of the IT issue (Hardware, Software, Network, Access, Other)
- "Customer Sentiment": Sentiment analysis result (Positive, Negative, Neutral)
Insights: This enrichment enables IT service managers to understand issue distribution, prioritize based on customer sentiment, and identify areas requiring additional support resources or training.
Example 5: Financial Compliance Risk Scoring
Scenario: In a financial transaction approval process, calculate a compliance risk score based on multiple risk factors.
Settings:
- Filter: TransactionType = "Wire Transfer"
- Columns: TransactionAmount, CustomerCountry, AccountAge, PreviousFlags
- Change Columns: None
- New Columns:
- Column Name: ComplianceRiskScore
- Display Name: Compliance Risk Score
- Data Type: Integer
- Source Type: Case
- Column Name: RiskLevel
- Display Name: Risk Level
- Data Type: String
- Source Type: Case
- Python Code:
# Define risk factors and weights
high_risk_countries = ['Country1', 'Country2', 'Country3'] # Placeholder for actual list
suspicious_amount_threshold = 10000
rapid_transaction_threshold = 5 # transactions per day
def calculate_risk_score(row):
risk_score = 0
# Amount risk (0-30 points)
if row['TransactionAmount'] > suspicious_amount_threshold:
risk_score += min(30, int(row['TransactionAmount'] / suspicious_amount_threshold * 10))
# Geographic risk (0-25 points)
if row['CustomerCountry'] in high_risk_countries:
risk_score += 25
# Account age risk (0-20 points)
if pd.notna(row['AccountAge']) and row['AccountAge'] < 30:
risk_score += 20 - int(row['AccountAge'] / 30 * 20)
# Previous flags risk (0-25 points)
if pd.notna(row['PreviousFlags']) and row['PreviousFlags'] > 0:
risk_score += min(25, row['PreviousFlags'] * 5)
return risk_score
# Calculate transaction velocity
transaction_counts = event_table[event_table['ActivityName'] == 'Transaction Initiated'].groupby('CaseId').size()
case_table['TransactionVelocity'] = case_table['CaseId'].map(transaction_counts).fillna(0)
# Calculate risk scores
case_table['ComplianceRiskScore'] = case_table.apply(calculate_risk_score, axis=1)
# Assign risk levels
def assign_risk_level(score):
if score >= 70:
return 'High'
elif score >= 40:
return 'Medium'
else:
return 'Low'
case_table['RiskLevel'] = case_table['ComplianceRiskScore'].apply(assign_risk_level)
# Clean up temporary columns
case_table = case_table.drop(['TransactionVelocity'], axis=1)
- Python Image: LOCAL
Output: Creates comprehensive risk assessment attributes:
- "Compliance Risk Score": Numerical risk score from 0-100
- "Risk Level": Categorical risk classification (High, Medium, Low)
Insights: This risk scoring enables compliance teams to prioritize transaction reviews, automate approval workflows based on risk levels, and ensure regulatory compliance while minimizing false positives.
Output
The Python enrichment operator produces new or modified attributes based on your custom code:
New Case Attributes: Any columns added to the case_table DataFrame that match the defined New Columns will be created as case-level attributes in your dataset. These attributes are available immediately for use in filters, calculators, and other enrichments.
New Event Attributes: Any columns added to the event_table DataFrame that match the defined New Columns will be created as event-level attributes. These can capture event-specific calculations or classifications.
Modified Attributes: Existing columns specified in Change Columns can have their values updated. The original data type must be maintained, but values can be transformed according to your business logic.
Data Type Handling: The operator automatically handles type conversion between Python and mindzieStudio data types:
- Python strings → mindzieStudio String
- Python int32/int64 → mindzieStudio Integer
- Python float → mindzieStudio Double
- Python datetime → mindzieStudio DateTime
- Python bool → mindzieStudio Boolean
Case and Event Removal: Advanced usage allows removing cases or events by filtering them out of the respective DataFrames. Cases not present in the output case_table will be removed from the dataset.
The enriched attributes integrate seamlessly with all other mindzieStudio features, enabling you to leverage custom Python transformations throughout your process mining analysis workflow.
See Also
- AI Case Prediction - Use machine learning models for prediction
- Attribute Calculator - Create simple derived attributes without code
- Filter Cases - Filter data before processing
- Representative Case Attribute - Extract representative values from events
This documentation is part of the mindzie Studio process mining platform.