Python

Vue d'ensemble

L'enrichissement Python est l'un des opérateurs d'enrichissement les plus puissants et flexibles dans mindzieStudio, vous permettant d'écrire du code Python personnalisé pour transformer, analyser et enrichir vos données de fouille de processus. Cet opérateur offre un accès direct à vos données de journal d'événements via des DataFrames Pandas, vous permettant de tirer parti de l'écosystème Python complet, y compris des bibliothèques comme NumPy, Pandas, et une logique métier personnalisée pour créer des transformations de données sophistiquées qui vont au-delà des enrichissements standards.

Avec une fréquence d'utilisation de 95 %, Python est l'un des enrichissements les plus couramment utilisés dans mindzieStudio. Il comble le fossé entre les opérations standards de fouille de processus et les flux de travail avancés de science des données, permettant aux data scientists et analystes d'appliquer des algorithmes personnalisés, des règles métier complexes et des analyses avancées directement au sein de leur pipeline de fouille de processus. L'opérateur intègre parfaitement l'exécution de code Python avec le modèle de données de mindzieStudio, gérant automatiquement la sérialisation des données, la conversion des types et l'intégration des résultats dans votre jeu de données.

Usages courants

  • Calculer des indicateurs clés complexes nécessitant une logique métier personnalisée non disponible dans les calculateurs standards
  • Appliquer des modèles d'apprentissage automatique pour la prédiction, la classification ou le regroupement directement sur les données de processus
  • Effectuer un traitement avancé de texte et de traitement du langage naturel sur les attributs des événements
  • Mettre en œuvre des vérifications de conformité personnalisées basées sur des règles métier complexes
  • Créer des attributs dérivés en utilisant des analyses statistiques et des opérations mathématiques avancées
  • Intégrer des sources de données externes en appelant des API ou en lisant des fichiers externes via du code Python
  • Construire des contrôles qualité de données personnalisés et des règles de validation spécifiques à votre domaine métier

Paramètres

Filter : Filtre optionnel pour limiter les cas traités par le script Python. Cela vous permet d'appliquer des transformations uniquement à des sous-ensembles spécifiques de vos données, améliorant les performances et permettant une analyse ciblée. Lorsqu'aucun filtre n'est appliqué, le code Python traite tous les cas du jeu de données.

Columns : Sélectionnez les colonnes existantes de votre jeu de données qui seront mises à disposition du script Python. Ces colonnes seront accessibles dans les DataFrames case_table et event_table. Seules les colonnes sélectionnées sont transmises à Python afin de minimiser l'utilisation mémoire et d'améliorer les performances. La colonne CaseId est toujours incluse automatiquement.

Change Columns : Spécifiez quelles colonnes sélectionnées peuvent être modifiées par votre script Python. Ce paramètre vous permet de mettre à jour les valeurs d'attributs existants tout en maintenant l'intégrité des données. Seules les colonnes déjà sélectionnées dans Columns peuvent être marquées pour modification.

New Columns : Définissez les nouveaux attributs que votre script Python créera. Pour chaque nouvelle colonne, vous devez spécifier :

  • Nom de colonne : Le nom interne utilisé dans le code Python
  • Nom affiché : Le nom convivial affiché dans mindzieStudio
  • Type de données : Le type de données (String, Integer, DateTime, Boolean, Double)
  • Type source : Si l'attribut est ajouté au niveau Case ou Event
  • Format : Format d'affichage optionnel pour l'attribut

Python Code : Le script Python qui sera exécuté sur vos données. Votre code a accès à :

  • case_table : DataFrame Pandas contenant les attributs au niveau des cas
  • event_table : DataFrame Pandas contenant les données au niveau des événements avec des colonnes pour InternalEventIndex, CaseId, ActivityName, ActivityTime et les attributs d'événements sélectionnés

Le script doit modifier ces DataFrames sur place. Toute modification des colonnes existantes (marquées dans Change Columns) ou ajouts de nouvelles colonnes (définies dans New Columns) seront automatiquement intégrés dans votre jeu de données.

Python Image : Spécifie l'environnement d'exécution Python. Options disponibles :

  • LOCAL : Utilise l'installation Python locale (si disponible)
  • Nom d'image Docker : Image Docker spécifique avec les paquets Python requis
  • Par défaut : Environnement Python standard de mindzie avec les bibliothèques courantes de data science

Exemples

Exemple 1 : Calculer le score d'efficacité du traitement des commandes

Scénario : Dans un processus order-to-cash, vous devez calculer un score d'efficacité personnalisé basé sur la valeur de la commande, le temps de traitement et le nombre d'activités de retouche.

Paramètres :

  • Filter : Aucun (traiter tous les cas)
  • Columns : OrderValue, CustomerPriority
  • Change Columns : Aucun
  • New Columns :
    • Nom de colonne : EfficiencyScore
    • Nom affiché : Score d'efficacité
    • Type de données : Double
    • Type source : Case
  • Python Code :
# Calculate efficiency score for each order
import numpy as np

# Count rework activities per case
rework_activities = ['Order Correction', 'Price Adjustment', 'Approval Retry']
event_table['IsRework'] = event_table['ActivityName'].isin(rework_activities)
rework_counts = event_table.groupby('CaseId')['IsRework'].sum().reset_index()
rework_counts.columns = ['CaseId', 'ReworkCount']

# Calculate case duration in days
case_durations = event_table.groupby('CaseId')['ActivityTime'].agg(['min', 'max'])
case_durations['Duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 86400
case_durations = case_durations.reset_index()[['CaseId', 'Duration']]

# Merge with case table
case_table = case_table.merge(rework_counts, on='CaseId', how='left')
case_table = case_table.merge(case_durations, on='CaseId', how='left')

# Calculate efficiency score (0-100)
case_table['EfficiencyScore'] = 100 * (
    (case_table['OrderValue'] / case_table['OrderValue'].max()) * 0.4 +
    (1 - case_table['ReworkCount'] / 10) * 0.3 +
    (1 - case_table['Duration'] / 30) * 0.3
)
case_table['EfficiencyScore'] = np.clip(case_table['EfficiencyScore'], 0, 100)

# Clean up temporary columns
case_table = case_table.drop(['ReworkCount', 'Duration'], axis=1)
  • Python Image : LOCAL

Résultat : Crée un nouvel attribut de cas "Score d'efficacité" avec des valeurs allant de 0 à 100, où des scores plus élevés indiquent un traitement plus efficace des commandes basé sur la combinaison de la valeur de la commande, un minimum de retouches et un temps de traitement plus rapide.

Insights : Ce score personnalisé aide à identifier les commandes traitées le plus efficacement et peut être utilisé pour mesurer la performance, identifier les meilleures pratiques et prioriser les initiatives d'amélioration des processus.

Exemple 2 : Détecter des séquences d'événements anormales

Scénario : Dans un processus de traitement de patients en santé, identifier les cas où la séquence des procédures médicales dévie des protocoles standard.

Paramètres :

  • Filter : Aucun
  • Columns : PatientAge, Department
  • Change Columns : Aucun
  • New Columns :
    • Nom de colonne : HasAnomalousSequence
    • Nom affiché : Séquence anormale détectée
    • Type de données : Boolean
    • Type source : Case
    • Nom de colonne : AnomalyDescription
    • Nom affiché : Description de l'anomalie
    • Type de données : String
    • Type source : Case
  • Python Code :
# Define expected sequence patterns
normal_sequences = [
    ['Registration', 'Triage', 'Examination', 'Treatment', 'Discharge'],
    ['Registration', 'Triage', 'Examination', 'Lab Test', 'Treatment', 'Discharge'],
    ['Registration', 'Emergency Assessment', 'Treatment', 'Observation', 'Discharge']
]

def check_sequence_anomaly(group):
    activities = group.sort_values('ActivityTime')['ActivityName'].tolist()

    # Check for repeated activities
    if len(activities) != len(set(activities)):
        return True, "Repeated activities detected"

    # Check for out-of-order activities
    if 'Discharge' in activities and activities.index('Discharge') < len(activities) - 1:
        return True, "Activities after discharge"

    if 'Registration' in activities and activities.index('Registration') > 0:
        return True, "Registration not first activity"

    # Check if sequence matches any normal pattern
    matches_normal = any(
        all(act in activities for act in normal_seq)
        for normal_seq in normal_sequences
    )

    if not matches_normal and len(activities) > 3:
        return True, "Non-standard sequence pattern"

    return False, ""

# Apply anomaly detection to each case
anomaly_results = event_table.groupby('CaseId').apply(check_sequence_anomaly)

# Add results to case table
case_table['HasAnomalousSequence'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][0] if x in anomaly_results.index else False
)
case_table['AnomalyDescription'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][1] if x in anomaly_results.index else ""
)
  • Python Image : LOCAL

Résultat : Crée deux nouveaux attributs de cas :

  • "Séquence anormale détectée" : indicateur booléen indiquant si le cas a une séquence inhabituelle
  • "Description de l'anomalie" : description textuelle expliquant le type d'anomalie détecté

Insights : Cet enrichissement aide à identifier les cas qui dévient des protocoles médicaux standards, permettant aux équipes d'assurance qualité d'enquêter sur les problèmes potentiels et d'assurer la sécurité des patients.

Exemple 3 : Calculer les métriques de performance des fournisseurs

Scénario : Dans un processus d'approvisionnement, calculer des métriques complètes de performance des fournisseurs basées sur les délais de livraison, les problèmes de qualité et la complétude des commandes.

Paramètres :

  • Filter : ActivityName = "Goods Receipt"
  • Columns : SupplierID, OrderQuantity, ReceivedQuantity
  • Change Columns : Aucun
  • New Columns :
    • Nom de colonne : OnTimeDeliveryRate
    • Nom affiché : Taux de livraison à temps %
    • Type de données : Double
    • Type source : Case
    • Nom de colonne : QualityScore
    • Nom affiché : Score qualité fournisseur
    • Type de données : Double
    • Type source : Case
  • Python Code :
# Calculate delivery performance
def calculate_supplier_metrics(group):
    po_created = group[group['ActivityName'] == 'PO Created']['ActivityTime'].min()
    goods_received = group[group['ActivityName'] == 'Goods Receipt']['ActivityTime'].max()

    # Expected delivery time is 5 business days
    expected_days = 5
    actual_days = np.busday_count(po_created.date(), goods_received.date())
    on_time = 1 if actual_days <= expected_days else 0

    # Check for quality issues
    has_quality_issue = 'Quality Issue' in group['ActivityName'].values
    has_return = 'Return to Supplier' in group['ActivityName'].values

    quality_score = 100
    if has_quality_issue:
        quality_score -= 30
    if has_return:
        quality_score -= 40

    return pd.Series({
        'OnTimeDelivery': on_time,
        'QualityScore': quality_score
    })

# Calculate metrics for each case
supplier_metrics = event_table.groupby('CaseId').apply(calculate_supplier_metrics)

# Aggregate by supplier
supplier_performance = case_table.merge(supplier_metrics, left_on='CaseId', right_index=True)
supplier_summary = supplier_performance.groupby('SupplierID').agg({
    'OnTimeDelivery': 'mean',
    'QualityScore': 'mean'
}).reset_index()
supplier_summary.columns = ['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']

# Add back to case table
case_table = case_table.merge(
    supplier_summary[['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']],
    on='SupplierID',
    how='left'
)
case_table['OnTimeDeliveryRate'] = case_table['OnTimeDeliveryRate'] * 100
case_table.rename(columns={'AvgQualityScore': 'QualityScore'}, inplace=True)
  • Python Image : LOCAL

Résultat : Crée des métriques de performance fournisseur au niveau des cas :

  • "Taux de livraison à temps %" : Pourcentage de commandes livrées à temps par ce fournisseur
  • "Score qualité fournisseur" : Score qualité de 0 à 100 basé sur les problèmes qualité et les retours

Insights : Ces métriques permettent aux équipes d'approvisionnement d'évaluer objectivement la performance des fournisseurs, de soutenir les décisions de sélection des fournisseurs et d'identifier ceux nécessitant des actions d'amélioration.

Exemple 4 : Fouille de texte sur les commentaires de processus

Scénario : Dans un processus de gestion IT, analyser les commentaires des tickets pour catégoriser les problèmes et détecter le sentiment.

Paramètres :

  • Filter : Aucun
  • Columns : TicketDescription, ResolutionNotes
  • Change Columns : Aucun
  • New Columns :
    • Nom de colonne : IssueCategory
    • Nom affiché : Catégorie de problème
    • Type de données : String
    • Type source : Case
    • Nom de colonne : CustomerSentiment
    • Nom affiché : Sentiment client
    • Type de données : String
    • Type source : Case
  • Python Code :
import re

# Define keywords for categorization
category_keywords = {
    'Hardware': ['laptop', 'desktop', 'printer', 'mouse', 'keyboard', 'monitor', 'hardware'],
    'Software': ['application', 'software', 'program', 'install', 'update', 'crash', 'error'],
    'Network': ['network', 'internet', 'wifi', 'connection', 'vpn', 'firewall'],
    'Access': ['password', 'login', 'access', 'permission', 'authentication', 'account'],
    'Other': []
}

# Sentiment indicators
negative_words = ['slow', 'broken', 'failed', 'cannot', 'unable', 'frustrated', 'urgent', 'critical']
positive_words = ['resolved', 'working', 'fixed', 'thank', 'great', 'excellent', 'happy']

def categorize_issue(text):
    if pd.isna(text):
        return 'Other'
    text_lower = text.lower()
    for category, keywords in category_keywords.items():
        if any(keyword in text_lower for keyword in keywords):
            return category
    return 'Other'

def analyze_sentiment(text):
    if pd.isna(text):
        return 'Neutral'
    text_lower = text.lower()
    negative_count = sum(1 for word in negative_words if word in text_lower)
    positive_count = sum(1 for word in positive_words if word in text_lower)

    if negative_count > positive_count:
        return 'Negative'
    elif positive_count > negative_count:
        return 'Positive'
    else:
        return 'Neutral'

# Apply text analysis
case_table['IssueCategory'] = case_table['TicketDescription'].apply(categorize_issue)
case_table['CustomerSentiment'] = case_table['TicketDescription'].apply(analyze_sentiment)
  • Python Image : LOCAL

Résultat : Crée deux attributs dérivés de texte :

  • "Catégorie de problème" : catégorisation du problème IT (Hardware, Software, Network, Access, Other)
  • "Sentiment client" : résultat de l'analyse de sentiment (Positif, Négatif, Neutre)

Insights : Cet enrichissement permet aux responsables IT de comprendre la distribution des problèmes, de prioriser selon le sentiment client et d'identifier les domaines nécessitant un soutien supplémentaire ou une formation.

Exemple 5 : Scoring du risque de conformité financière

Scénario : Dans un processus d'approbation de transaction financière, calculer un score de risque de conformité basé sur plusieurs facteurs de risque.

Paramètres :

  • Filter : TransactionType = "Wire Transfer"
  • Columns : TransactionAmount, CustomerCountry, AccountAge, PreviousFlags
  • Change Columns : Aucun
  • New Columns :
    • Nom de colonne : ComplianceRiskScore
    • Nom affiché : Score de risque de conformité
    • Type de données : Integer
    • Type source : Case
    • Nom de colonne : RiskLevel
    • Nom affiché : Niveau de risque
    • Type de données : String
    • Type source : Case
  • Python Code :
# Define risk factors and weights
high_risk_countries = ['Country1', 'Country2', 'Country3']  # Placeholder for actual list
suspicious_amount_threshold = 10000
rapid_transaction_threshold = 5  # transactions per day

def calculate_risk_score(row):
    risk_score = 0

    # Amount risk (0-30 points)
    if row['TransactionAmount'] > suspicious_amount_threshold:
        risk_score += min(30, int(row['TransactionAmount'] / suspicious_amount_threshold * 10))

    # Geographic risk (0-25 points)
    if row['CustomerCountry'] in high_risk_countries:
        risk_score += 25

    # Account age risk (0-20 points)
    if pd.notna(row['AccountAge']) and row['AccountAge'] < 30:
        risk_score += 20 - int(row['AccountAge'] / 30 * 20)

    # Previous flags risk (0-25 points)
    if pd.notna(row['PreviousFlags']) and row['PreviousFlags'] > 0:
        risk_score += min(25, row['PreviousFlags'] * 5)

    return risk_score

# Calculate transaction velocity
transaction_counts = event_table[event_table['ActivityName'] == 'Transaction Initiated'].groupby('CaseId').size()
case_table['TransactionVelocity'] = case_table['CaseId'].map(transaction_counts).fillna(0)

# Calculate risk scores
case_table['ComplianceRiskScore'] = case_table.apply(calculate_risk_score, axis=1)

# Assign risk levels
def assign_risk_level(score):
    if score >= 70:
        return 'High'
    elif score >= 40:
        return 'Medium'
    else:
        return 'Low'

case_table['RiskLevel'] = case_table['ComplianceRiskScore'].apply(assign_risk_level)

# Clean up temporary columns
case_table = case_table.drop(['TransactionVelocity'], axis=1)
  • Python Image : LOCAL

Résultat : Crée des attributs complets d'évaluation du risque :

  • "Score de risque de conformité" : score numérique de risque de 0 à 100
  • "Niveau de risque" : classification catégorielle du risque (Haut, Moyen, Faible)

Insights : Ce scoring permet aux équipes conformité de prioriser les revues de transactions, d'automatiser les workflows d'approbation basés sur le niveau de risque et de garantir la conformité réglementaire tout en minimisant les faux positifs.

Sortie

L'opérateur d'enrichissement Python génère de nouveaux attributs ou modifie des attributs existants basés sur votre code personnalisé :

Nouveaux attributs de cas : Toutes les colonnes ajoutées au DataFrame case_table correspondant aux New Columns définies seront créées en tant qu'attributs au niveau du cas dans votre jeu de données. Ces attributs sont immédiatement disponibles pour une utilisation dans les filtres, calculateurs et autres enrichissements.

Nouveaux attributs d'événements : Toutes les colonnes ajoutées au DataFrame event_table correspondant aux New Columns définies seront créées en tant qu'attributs au niveau des événements. Ceux-ci peuvent capturer des calculs ou classifications spécifiques à chaque événement.

Attributs modifiés : Les colonnes existantes spécifiées dans Change Columns peuvent voir leurs valeurs mises à jour. Le type de données d'origine doit être conservé, mais les valeurs peuvent être transformées selon votre logique métier.

Gestion des types de données : L'opérateur gère automatiquement la conversion des types entre Python et mindzieStudio :

  • Chaînes Python → String mindzieStudio
  • int32/int64 Python → Integer mindzieStudio
  • float Python → Double mindzieStudio
  • datetime Python → DateTime mindzieStudio
  • bool Python → Boolean mindzieStudio

Suppression de cas et événements : L'utilisation avancée permet de supprimer des cas ou événements en les filtrant hors des DataFrames respectifs. Les cas absents du case_table de sortie seront supprimés du jeu de données.

Les attributs enrichis s'intègrent parfaitement avec toutes les autres fonctionnalités de mindzieStudio, vous permettant de tirer parti de transformations Python personnalisées tout au long de votre analyse de fouille de processus.

Voir aussi


Cette documentation fait partie de la plateforme de fouille de processus mindzie Studio.