Python

Übersicht

Die Python-Anreicherung ist einer der leistungsfähigsten und flexibelsten Anreicherungsoperatoren in mindzieStudio. Sie ermöglicht es Ihnen, benutzerdefinierten Python-Code zu schreiben, um Ihre Process-Mining-Daten zu transformieren, zu analysieren und anzureichern. Dieser Operator bietet direkten Zugriff auf Ihre Ereignisprotokolldaten über Pandas DataFrames, sodass Sie das gesamte Python-Ökosystem einschließlich Bibliotheken wie NumPy, Pandas und benutzerdefinierter Geschäftslogik nutzen können, um anspruchsvolle Datenumwandlungen zu erstellen, die über Standardanreicherungen hinausgehen.

Mit einer Nutzungsfrequenz von 95 % ist Python eine der am häufigsten verwendeten Anreicherungen in mindzieStudio. Es überbrückt die Lücke zwischen Standard-Process-Mining-Operationen und fortgeschrittenen Data-Science-Workflows und ermöglicht Datenwissenschaftlern und Analysten, benutzerdefinierte Algorithmen, komplexe Geschäftsregeln und fortgeschrittene Analysen direkt innerhalb ihrer Process-Mining-Pipeline anzuwenden. Der Operator integriert die Ausführung von Python-Code nahtlos in das Datenmodell von mindzieStudio, übernimmt automatisch die Datenserialisierung, Typkonvertierung und die Integration der Ergebnisse zurück in Ihren Datensatz.

Häufige Anwendungsfälle

  • Berechnung komplexer KPIs, die benutzerdefinierte Geschäftslogik erfordern, welche in Standard-Berechnern nicht verfügbar ist
  • Anwendung von Machine-Learning-Modellen für Vorhersage, Klassifizierung oder Clustering direkt auf Prozessdaten
  • Durchführung fortgeschrittener Textverarbeitung und Verarbeitung natürlicher Sprache auf Ereignisattributen
  • Umsetzung benutzerdefinierter Konformitätsprüfungen basierend auf komplexen Geschäftsregeln
  • Erstellung abgeleiteter Attribute mittels statistischer Analyse und fortgeschrittener mathematischer Operationen
  • Integration externer Datenquellen durch Aufrufe von APIs oder das Einlesen externer Dateien innerhalb des Python-Codes
  • Erstellung benutzerdefinierter Datenqualitätsprüfungen und Validierungsregeln, die speziell auf Ihr Geschäftsfeld zugeschnitten sind

Einstellungen

Filter: Optionaler Filter, um festzulegen, welche Fälle durch das Python-Skript verarbeitet werden. Dadurch können Sie Transformationen nur auf bestimmte Teildatensätze anwenden, was die Leistung verbessert und gezielte Analysen ermöglicht. Wenn kein Filter gesetzt ist, verarbeitet der Python-Code alle Fälle im Datensatz.

Spalten: Auswahl, welche bestehenden Spalten Ihres Datensatzes für das Python-Skript verfügbar gemacht werden sollen. Diese Spalten sind in den DataFrames case_table und event_table zugänglich. Nur ausgewählte Spalten werden an Python übergeben, um den Speicherverbrauch zu minimieren und die Performance zu verbessern. Die Spalte CaseId ist immer automatisch enthalten.

Spalten ändern: Legen Sie fest, welche der ausgewählten Spalten durch Ihr Python-Skript modifiziert werden können. Diese Einstellung erlaubt es, bestehende Attributwerte zu aktualisieren und gleichzeitig die Datenintegrität beizubehalten. Nur Spalten, die in der Einstellung Spalten gewählt wurden, können hier zur Änderung markiert werden.

Neue Spalten: Definieren Sie neue Attribute, die Ihr Python-Skript erzeugt. Für jede neue Spalte müssen Sie angeben:

  • Spaltenname: Der interne Name, der im Python-Code verwendet wird
  • Anzeigename: Der benutzerfreundliche Name, der in mindzieStudio angezeigt wird
  • Datentyp: Der Datentyp (String, Integer, DateTime, Boolean, Double)
  • Quelltyp: Ob das Attribut auf Case- oder Event-Ebene hinzugefügt wird
  • Format: Optionales Anzeigeformat für das Attribut

Python-Code: Das Python-Skript, das auf Ihre Daten ausgeführt wird. Ihr Code hat Zugriff auf:

  • case_table: Pandas DataFrame mit Fall-attributen auf Case-Ebene
  • event_table: Pandas DataFrame mit Ereignis-Daten, einschließlich der Spalten InternalEventIndex, CaseId, ActivityName, ActivityTime sowie der ausgewählten Ereignisattribute

Das Skript sollte diese DataFrames direkt modifizieren. Alle Änderungen an bestehenden Spalten (in Spalten ändern markiert) oder Ergänzungen neuer Spalten (in Neue Spalten definiert) werden automatisch wieder in Ihren Datensatz integriert.

Python Image: Legt die Python-Ausführungsumgebung fest. Optionen sind:

  • LOCAL: Verwendet eine lokale Python-Installation (sofern verfügbar)
  • Docker-Image-Name: Spezifisches Docker-Image mit den benötigten Python-Paketen
  • Default: Das Standard-Python-Umfeld von mindzie mit gängigen Data-Science-Bibliotheken

Beispiele

Beispiel 1: Berechnung des Effizienz-Scores der Auftragsbearbeitung

Szenario: Im Order-to-Cash-Prozess soll ein benutzerdefinierter Effizienz-Score berechnet werden, basierend auf Auftragswert, Bearbeitungszeit und Anzahl der Nacharbeiten.

Einstellungen:

  • Filter: Kein Filter (alle Fälle verarbeiten)
  • Spalten: OrderValue, CustomerPriority
  • Spalten ändern: Keine
  • Neue Spalten:
    • Spaltenname: EfficiencyScore
    • Anzeigename: Efficiency Score
    • Datentyp: Double
    • Quelltyp: Case
  • Python-Code:
# Berechnung des Effizienz-Scores für jede Bestellung
import numpy as np

# Nacharbeitsaktivitäten zählen pro Fall
rework_activities = ['Order Correction', 'Price Adjustment', 'Approval Retry']
event_table['IsRework'] = event_table['ActivityName'].isin(rework_activities)
rework_counts = event_table.groupby('CaseId')['IsRework'].sum().reset_index()
rework_counts.columns = ['CaseId', 'ReworkCount']

# Fall-Dauer in Tagen berechnen
case_durations = event_table.groupby('CaseId')['ActivityTime'].agg(['min', 'max'])
case_durations['Duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 86400
case_durations = case_durations.reset_index()[['CaseId', 'Duration']]

# Mit Fall-Tabelle zusammenführen
case_table = case_table.merge(rework_counts, on='CaseId', how='left')
case_table = case_table.merge(case_durations, on='CaseId', how='left')

# Effizienz-Score berechnen (0-100)
case_table['EfficiencyScore'] = 100 * (
    (case_table['OrderValue'] / case_table['OrderValue'].max()) * 0.4 +
    (1 - case_table['ReworkCount'] / 10) * 0.3 +
    (1 - case_table['Duration'] / 30) * 0.3
)
case_table['EfficiencyScore'] = np.clip(case_table['EfficiencyScore'], 0, 100)

# Temporäre Spalten entfernen
case_table = case_table.drop(['ReworkCount', 'Duration'], axis=1)
  • Python Image: LOCAL

Ergebnis: Erstellt ein neues Case-Attribut "Efficiency Score" mit Werten von 0 bis 100, wobei höhere Werte eine effizientere Auftragsbearbeitung anzeigen, basierend auf der Kombination von Auftragswert, minimaler Nacharbeit und schnellerer Bearbeitungszeit.

Erkenntnisse: Dieser benutzerdefinierte Score hilft dabei, die effizientesten Aufträge zu identifizieren und ermöglicht Benchmarking, Erkennung von Best Practices und Priorisierung von Prozessverbesserungsmaßnahmen.

Beispiel 2: Erkennung anomaler Ereignisfolgen

Szenario: In einem medizinischen Patientenbehandlungsprozess sollen Fälle identifiziert werden, bei denen die Reihenfolge der medizinischen Prozeduren von Standardprotokollen abweicht.

Einstellungen:

  • Filter: Kein Filter
  • Spalten: PatientAge, Department
  • Spalten ändern: Keine
  • Neue Spalten:
    • Spaltenname: HasAnomalousSequence
    • Anzeigename: Anomalous Sequence Detected
    • Datentyp: Boolean
    • Quelltyp: Case
    • Spaltenname: AnomalyDescription
    • Anzeigename: Anomaly Description
    • Datentyp: String
    • Quelltyp: Case
  • Python-Code:
# Erwartete Normal-Sequenzen definieren
normal_sequences = [
    ['Registration', 'Triage', 'Examination', 'Treatment', 'Discharge'],
    ['Registration', 'Triage', 'Examination', 'Lab Test', 'Treatment', 'Discharge'],
    ['Registration', 'Emergency Assessment', 'Treatment', 'Observation', 'Discharge']
]

def check_sequence_anomaly(group):
    activities = group.sort_values('ActivityTime')['ActivityName'].tolist()

    # Wiederholte Aktivitäten prüfen
    if len(activities) != len(set(activities)):
        return True, "Repeated activities detected"

    # Aktivitäten nach Entlassung prüfen
    if 'Discharge' in activities and activities.index('Discharge') < len(activities) - 1:
        return True, "Activities after discharge"

    # Registrierung muss erste Aktivität sein
    if 'Registration' in activities and activities.index('Registration') > 0:
        return True, "Registration not first activity"

    # Prüfen, ob die Sequenz zu einem Normalmuster passt
    matches_normal = any(
        all(act in activities for act in normal_seq)
        for normal_seq in normal_sequences
    )

    if not matches_normal and len(activities) > 3:
        return True, "Non-standard sequence pattern"

    return False, ""

# Anomalieerkennung auf jeden Fall anwenden
anomaly_results = event_table.groupby('CaseId').apply(check_sequence_anomaly)

# Ergebnisse zur Fall-Tabelle hinzufügen
case_table['HasAnomalousSequence'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][0] if x in anomaly_results.index else False
)
case_table['AnomalyDescription'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][1] if x in anomaly_results.index else ""
)
  • Python Image: LOCAL

Ergebnis: Erstellt zwei neue Case-Attribute:

  • "Anomalous Sequence Detected": Boolean-Flag, das angibt, ob eine ungewöhnliche Reihenfolge vorliegt
  • "Anomaly Description": Textbeschreibung zur Art der erkannten Anomalie

Erkenntnisse: Diese Anreicherung hilft, Fälle zu identifizieren, die von Standardprotokollen abweichen, und unterstützt Qualitätsmanagementteams dabei, potenzielle Probleme zu untersuchen und die Patientensicherheit sicherzustellen.

Beispiel 3: Berechnung von Lieferantenleistungs-Kennzahlen

Szenario: Im Beschaffungsprozess sollen umfassende Kennzahlen zur Lieferantenleistung auf Basis von Lieferzeiten, Qualitätsproblemen und Vollständigkeit der Bestellungen berechnet werden.

Einstellungen:

  • Filter: ActivityName = "Goods Receipt"
  • Spalten: SupplierID, OrderQuantity, ReceivedQuantity
  • Spalten ändern: Keine
  • Neue Spalten:
    • Spaltenname: OnTimeDeliveryRate
    • Anzeigename: On-Time Delivery Rate %
    • Datentyp: Double
    • Quelltyp: Case
    • Spaltenname: QualityScore
    • Anzeigename: Supplier Quality Score
    • Datentyp: Double
    • Quelltyp: Case
  • Python-Code:
# Lieferleistung berechnen
def calculate_supplier_metrics(group):
    po_created = group[group['ActivityName'] == 'PO Created']['ActivityTime'].min()
    goods_received = group[group['ActivityName'] == 'Goods Receipt']['ActivityTime'].max()

    # Erwartete Lieferzeit sind 5 Werktage
    expected_days = 5
    actual_days = np.busday_count(po_created.date(), goods_received.date())
    on_time = 1 if actual_days <= expected_days else 0

    # Qualitätsprobleme prüfen
    has_quality_issue = 'Quality Issue' in group['ActivityName'].values
    has_return = 'Return to Supplier' in group['ActivityName'].values

    quality_score = 100
    if has_quality_issue:
        quality_score -= 30
    if has_return:
        quality_score -= 40

    return pd.Series({
        'OnTimeDelivery': on_time,
        'QualityScore': quality_score
    })

# Kennzahlen für jeden Fall berechnen
supplier_metrics = event_table.groupby('CaseId').apply(calculate_supplier_metrics)

# Nach Lieferanten aggregieren
supplier_performance = case_table.merge(supplier_metrics, left_on='CaseId', right_index=True)
supplier_summary = supplier_performance.groupby('SupplierID').agg({
    'OnTimeDelivery': 'mean',
    'QualityScore': 'mean'
}).reset_index()
supplier_summary.columns = ['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']

# Zurück in Fall-Tabelle mergen
case_table = case_table.merge(
    supplier_summary[['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']],
    on='SupplierID',
    how='left'
)
case_table['OnTimeDeliveryRate'] = case_table['OnTimeDeliveryRate'] * 100
case_table.rename(columns={'AvgQualityScore': 'QualityScore'}, inplace=True)
  • Python Image: LOCAL

Ergebnis: Erstellt Lieferantenleistungskennzahlen auf Case-Ebene:

  • "On-Time Delivery Rate %": Prozentsatz der pünktlich gelieferten Bestellungen des jeweiligen Lieferanten
  • "Supplier Quality Score": Qualitätsbewertung von 0 bis 100 basierend auf Qualitätsproblemen und Rücksendungen

Erkenntnisse: Diese Kennzahlen ermöglichen es Beschaffungsteams, die Lieferantenleistung objektiv zu bewerten, Lieferantenauswahlentscheidungen zu unterstützen und Lieferanten zu identifizieren, bei denen Verbesserungsmaßnahmen erforderlich sind.

Beispiel 4: Text Mining auf Prozesskommentaren

Szenario: Im IT-Service-Management-Prozess sollen Ticket-Kommentare analysiert werden, um Probleme zu kategorisieren und die Stimmung zu erkennen.

Einstellungen:

  • Filter: Kein Filter
  • Spalten: TicketDescription, ResolutionNotes
  • Spalten ändern: Keine
  • Neue Spalten:
    • Spaltenname: IssueCategory
    • Anzeigename: Issue Category
    • Datentyp: String
    • Quelltyp: Case
    • Spaltenname: CustomerSentiment
    • Anzeigename: Customer Sentiment
    • Datentyp: String
    • Quelltyp: Case
  • Python-Code:
import re

# Schlüsselwörter zur Kategorisierung definieren
category_keywords = {
    'Hardware': ['laptop', 'desktop', 'printer', 'mouse', 'keyboard', 'monitor', 'hardware'],
    'Software': ['application', 'software', 'program', 'install', 'update', 'crash', 'error'],
    'Network': ['network', 'internet', 'wifi', 'connection', 'vpn', 'firewall'],
    'Access': ['password', 'login', 'access', 'permission', 'authentication', 'account'],
    'Other': []
}

# Sentiment-Indikatoren
negative_words = ['slow', 'broken', 'failed', 'cannot', 'unable', 'frustrated', 'urgent', 'critical']
positive_words = ['resolved', 'working', 'fixed', 'thank', 'great', 'excellent', 'happy']

def categorize_issue(text):
    if pd.isna(text):
        return 'Other'
    text_lower = text.lower()
    for category, keywords in category_keywords.items():
        if any(keyword in text_lower for keyword in keywords):
            return category
    return 'Other'

def analyze_sentiment(text):
    if pd.isna(text):
        return 'Neutral'
    text_lower = text.lower()
    negative_count = sum(1 for word in negative_words if word in text_lower)
    positive_count = sum(1 for word in positive_words if word in text_lower)

    if negative_count > positive_count:
        return 'Negative'
    elif positive_count > negative_count:
        return 'Positive'
    else:
        return 'Neutral'

# Textanalyse anwenden
case_table['IssueCategory'] = case_table['TicketDescription'].apply(categorize_issue)
case_table['CustomerSentiment'] = case_table['TicketDescription'].apply(analyze_sentiment)
  • Python Image: LOCAL

Ergebnis: Erstellt zwei textbasierte Attribute:

  • "Issue Category": Kategorisierung des IT-Problems (Hardware, Software, Netzwerk, Zugriff, Andere)
  • "Customer Sentiment": Ergebnis der Sentiment-Analyse (Positiv, Negativ, Neutral)

Erkenntnisse: Diese Anreicherung hilft IT-Service-Managern, die Problembereiche zu verstehen, Priorisierungen anhand der Kundenzufriedenheit vorzunehmen und Bereiche zu erkennen, die zusätzlichen Support oder Schulungen erfordern.

Beispiel 5: Finanzielle Compliance-Risikobewertung

Szenario: In einem Finanztransaktionsgenehmigungsprozess soll ein Risiko-Score für Compliance auf Basis mehrerer Risikofaktoren berechnet werden.

Einstellungen:

  • Filter: TransactionType = "Wire Transfer"
  • Spalten: TransactionAmount, CustomerCountry, AccountAge, PreviousFlags
  • Spalten ändern: Keine
  • Neue Spalten:
    • Spaltenname: ComplianceRiskScore
    • Anzeigename: Compliance Risk Score
    • Datentyp: Integer
    • Quelltyp: Case
    • Spaltenname: RiskLevel
    • Anzeigename: Risk Level
    • Datentyp: String
    • Quelltyp: Case
  • Python-Code:
# Risikofaktoren und Gewichtungen definieren
high_risk_countries = ['Country1', 'Country2', 'Country3']  # Platzhalter für tatsächliche Liste
suspicious_amount_threshold = 10000
rapid_transaction_threshold = 5  # Transaktionen pro Tag

def calculate_risk_score(row):
    risk_score = 0

    # Betragsrisiko (0-30 Punkte)
    if row['TransactionAmount'] > suspicious_amount_threshold:
        risk_score += min(30, int(row['TransactionAmount'] / suspicious_amount_threshold * 10))

    # Geografisches Risiko (0-25 Punkte)
    if row['CustomerCountry'] in high_risk_countries:
        risk_score += 25

    # Kontodauer-Risiko (0-20 Punkte)
    if pd.notna(row['AccountAge']) and row['AccountAge'] < 30:
        risk_score += 20 - int(row['AccountAge'] / 30 * 20)

    # Frühere Flags-Risiko (0-25 Punkte)
    if pd.notna(row['PreviousFlags']) and row['PreviousFlags'] > 0:
        risk_score += min(25, row['PreviousFlags'] * 5)

    return risk_score

# Berechnung der Transaktionsgeschwindigkeit
transaction_counts = event_table[event_table['ActivityName'] == 'Transaction Initiated'].groupby('CaseId').size()
case_table['TransactionVelocity'] = case_table['CaseId'].map(transaction_counts).fillna(0)

# Risikowerte berechnen
case_table['ComplianceRiskScore'] = case_table.apply(calculate_risk_score, axis=1)

# Risiko-Level zuweisen
def assign_risk_level(score):
    if score >= 70:
        return 'High'
    elif score >= 40:
        return 'Medium'
    else:
        return 'Low'

case_table['RiskLevel'] = case_table['ComplianceRiskScore'].apply(assign_risk_level)

# Temporäre Spalten entfernen
case_table = case_table.drop(['TransactionVelocity'], axis=1)
  • Python Image: LOCAL

Ergebnis: Erstellt umfassende Risikobewertungsattribute:

  • "Compliance Risk Score": Numerischer Risikowert von 0 bis 100
  • "Risk Level": Kategorische Risiko-Einstufung (Hoch, Mittel, Niedrig)

Erkenntnisse: Diese Risikobewertung unterstützt Compliance-Teams dabei, die Priorisierung der Transaktionsüberprüfung vorzunehmen, Genehmigungsworkflows basierend auf Risiko-Leveln zu automatisieren und regulatorische Vorgaben einzuhalten, während Fehlalarme minimiert werden.

Ausgabe

Der Python-Anreicherungsoperator erzeugt neue oder modifizierte Attribute basierend auf Ihrem benutzerdefinierten Code:

Neue Case-Attribute: Alle Spalten, die zum case_table DataFrame hinzugefügt werden und den in Neue Spalten definierten entsprechen, werden als Case-Ebenen-Attribute in Ihrem Datensatz erstellt. Diese Attribute stehen sofort für Filter, Berechner und andere Anreicherungen zur Verfügung.

Neue Event-Attribute: Spalten, die zum event_table DataFrame hinzugefügt werden und mit den definierten Neuen Spalten übereinstimmen, werden als Event-Ebenen-Attribute erstellt. Diese können ereignisspezifische Berechnungen oder Klassifikationen enthalten.

Geänderte Attribute: Bereits vorhandene Spalten, die in Spalten ändern ausgewählt wurden, können aktualisierte Werte erhalten. Der ursprüngliche Datentyp muss beibehalten werden, Werte können aber gemäß Ihrer Geschäftslogik transformiert werden.

Datentyp-Handhabung: Der Operator übernimmt automatisch die Typkonvertierung zwischen Python- und mindzieStudio-Datentypen:

  • Python strings → mindzieStudio String
  • Python int32/int64 → mindzieStudio Integer
  • Python float → mindzieStudio Double
  • Python datetime → mindzieStudio DateTime
  • Python bool → mindzieStudio Boolean

Entfernung von Cases und Events: In fortgeschrittener Nutzung können Fälle oder Events entfernt werden, indem sie aus den jeweiligen DataFrames herausgefiltert werden. Fälle, die im Ausgabe-case_table nicht mehr vorhanden sind, werden aus dem Datensatz gelöscht.

Die angereicherten Attribute integrieren sich nahtlos mit allen weiteren mindzieStudio-Funktionalitäten, sodass Sie benutzerdefinierte Python-Transformationen in Ihrem gesamten Process-Mining-Analyse-Workflow nutzen können.

Siehe auch


Diese Dokumentation ist Teil der mindzie Studio Process-Mining-Plattform.