Python

概要

Pythonエンリッチメントは、mindzieStudioの中で最も強力かつ柔軟なエンリッチメントオペレーターの一つであり、カスタムPythonコードを記述してプロセスマイニングデータの変換、分析、エンリッチメントを行うことを可能にします。このオペレーターはPandas DataFrameを通じてイベントログデータに直接アクセスでき、NumPy、Pandasなどのライブラリやカスタムビジネスロジックをフルに活用して、標準的なエンリッチメントを超えた高度なデータ変換を実現します。

使用頻度は95%で、mindzieStudioで最も一般的に使われるエンリッチメントの一つです。標準的なプロセスマイニング操作と高度なデータサイエンスワークフローの橋渡しを行い、データサイエンティストやアナリストがカスタムアルゴリズム、複雑なビジネスルール、高度な解析をプロセスマイニングパイプライン内で直接適用することを可能にします。このオペレーターはPythonコードの実行をmindzieStudioのデータモデルとシームレスに統合し、データのシリアライズ、型変換、結果のデータセットへの自動統合を行います。

よくある用途

  • 標準の計算機では不可能なカスタムビジネスロジックを用いた複雑なKPIの計算
  • プロセスデータに対する機械学習モデルの予測、分類、クラスタリングの適用
  • イベント属性に対する高度なテキスト処理や自然言語処理
  • 複雑なビジネスルールに基づくカスタム適合性チェックの実装
  • 統計分析や高度な数学演算を用いた派生属性の生成
  • APIの呼び出しや外部ファイルの読み込みを通じた外部データソースの統合
  • ビジネスドメイン固有のカスタムデータ品質チェックや検証ルールの構築

設定

Filter: Pythonスクリプトで処理するケースを限定する任意のフィルターです。これにより、データの特定のサブセットのみを変換対象とし、パフォーマンス向上やターゲット分析が可能になります。フィルターを指定しない場合は、全てのケースがPythonコードで処理されます。

Columns: データセットのうちPythonスクリプトで利用可能とする既存のカラムを選択します。これらはcase_tableおよびevent_tableのDataFrameでアクセス可能になります。メモリ使用量を最小限に抑えパフォーマンスを向上させるために、選択されたカラムのみがPythonに渡されます。CaseIdカラムは自動的に常に含まれます。

Change Columns: Pythonスクリプト内で変更可能な選択済みカラムを指定します。この設定により既存属性値を更新できます。Change Columnsで指定可能なのは、Columnsで選択されたカラムのみです。

New Columns: Pythonスクリプトが新たに作成する属性を定義します。新規カラムごとに以下を指定する必要があります:

  • カラム名:Pythonコード内で使用する内部名
  • 表示名:mindzieStudioで表示されるユーザーフレンドリーな名前
  • データタイプ:データ型(文字列、整数、日時、ブール、倍精度浮動小数点)
  • ソースタイプ:ケースレベルかイベントレベルか
  • フォーマット:属性表示用の任意のフォーマット

Python Code: データに対して実行されるPythonスクリプトです。以下のオブジェクトへアクセスできます:

  • case_table:ケースレベル属性を含むPandas DataFrame
  • event_table:内部イベントインデックス、CaseId、ActivityName、ActivityTime、および選択したイベント属性を含むPandas DataFrame

スクリプトはこれらのDataFrameをインプレースで変更すべきです。変更可能とした既存カラムの値変更や新規定義カラムの追加は自動的に元のデータセットへ統合されます。

Python Image: Python実行環境を指定します。選択肢は以下の通りです:

  • LOCAL:ローカルPython環境を使用(利用可能な場合)
  • Dockerイメージ名:必要なPythonパッケージを含む特定のDockerイメージ
  • Default:mindzie標準のPython環境(一般的なデータサイエンスライブラリを含む)

例 1: 注文処理効率スコアの計算

シナリオ: 注文から回収までのプロセスで、注文額、処理時間、手戻り活動の回数に基づくカスタム効率スコアを計算する必要があります。

設定:

  • Filter: なし(全ケース処理)
  • Columns: OrderValue, CustomerPriority
  • Change Columns: なし
  • New Columns:
    • Column Name: EfficiencyScore
    • Display Name: Efficiency Score
    • Data Type: Double
    • Source Type: Case
  • Python Code:
# Calculate efficiency score for each order
import numpy as np

# Count rework activities per case
rework_activities = ['Order Correction', 'Price Adjustment', 'Approval Retry']
event_table['IsRework'] = event_table['ActivityName'].isin(rework_activities)
rework_counts = event_table.groupby('CaseId')['IsRework'].sum().reset_index()
rework_counts.columns = ['CaseId', 'ReworkCount']

# Calculate case duration in days
case_durations = event_table.groupby('CaseId')['ActivityTime'].agg(['min', 'max'])
case_durations['Duration'] = (case_durations['max'] - case_durations['min']).dt.total_seconds() / 86400
case_durations = case_durations.reset_index()[['CaseId', 'Duration']]

# Merge with case table
case_table = case_table.merge(rework_counts, on='CaseId', how='left')
case_table = case_table.merge(case_durations, on='CaseId', how='left')

# Calculate efficiency score (0-100)
case_table['EfficiencyScore'] = 100 * (
    (case_table['OrderValue'] / case_table['OrderValue'].max()) * 0.4 +
    (1 - case_table['ReworkCount'] / 10) * 0.3 +
    (1 - case_table['Duration'] / 30) * 0.3
)
case_table['EfficiencyScore'] = np.clip(case_table['EfficiencyScore'], 0, 100)

# Clean up temporary columns
case_table = case_table.drop(['ReworkCount', 'Duration'], axis=1)
  • Python Image: LOCAL

出力: 注文額、手戻り最小化、処理時間の短縮に基づき、0から100の範囲で効率スコアを算出するケース属性「Efficiency Score」を作成します。スコアが高いほど効率的な注文処理を示します。

インサイト: このカスタムスコアにより、最も効率的に処理された注文の特定やパフォーマンスのベンチマーク、ベストプラクティスの抽出、プロセス改善の優先順位付けが可能です。

例 2: 異常なイベントシーケンスの検出

シナリオ: 医療患者の治療プロセスにおいて、医療手順のシーケンスが標準プロトコルから逸脱しているケースを特定します。

設定:

  • Filter: なし
  • Columns: PatientAge, Department
  • Change Columns: なし
  • New Columns:
    • Column Name: HasAnomalousSequence
    • Display Name: Anomalous Sequence Detected
    • Data Type: Boolean
    • Source Type: Case
    • Column Name: AnomalyDescription
    • Display Name: Anomaly Description
    • Data Type: String
    • Source Type: Case
  • Python Code:
# Define expected sequence patterns
normal_sequences = [
    ['Registration', 'Triage', 'Examination', 'Treatment', 'Discharge'],
    ['Registration', 'Triage', 'Examination', 'Lab Test', 'Treatment', 'Discharge'],
    ['Registration', 'Emergency Assessment', 'Treatment', 'Observation', 'Discharge']
]

def check_sequence_anomaly(group):
    activities = group.sort_values('ActivityTime')['ActivityName'].tolist()

    # Check for repeated activities
    if len(activities) != len(set(activities)):
        return True, "Repeated activities detected"

    # Check for out-of-order activities
    if 'Discharge' in activities and activities.index('Discharge') < len(activities) - 1:
        return True, "Activities after discharge"

    if 'Registration' in activities and activities.index('Registration') > 0:
        return True, "Registration not first activity"

    # Check if sequence matches any normal pattern
    matches_normal = any(
        all(act in activities for act in normal_seq)
        for normal_seq in normal_sequences
    )

    if not matches_normal and len(activities) > 3:
        return True, "Non-standard sequence pattern"

    return False, ""

# Apply anomaly detection to each case
anomaly_results = event_table.groupby('CaseId').apply(check_sequence_anomaly)

# Add results to case table
case_table['HasAnomalousSequence'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][0] if x in anomaly_results.index else False
)
case_table['AnomalyDescription'] = case_table['CaseId'].map(
    lambda x: anomaly_results[x][1] if x in anomaly_results.index else ""
)
  • Python Image: LOCAL

出力: 以下の2つの新規ケース属性を作成します:

  • 「Anomalous Sequence Detected」:異常なシーケンスの有無を示すブール値フラグ
  • 「Anomaly Description」:検出された異常の種類を説明するテキスト

インサイト: このエンリッチメントにより、標準的な医療プロトコルから逸脱したケースを特定し、品質保証チームが問題を調査し患者安全を確保するためのサポートとなります。

例 3: サプライヤーパフォーマンス指標の計算

シナリオ: 調達プロセスで、納期、品質問題、注文の完全性に基づく包括的なサプライヤーパフォーマンス指標を算出します。

設定:

  • Filter: ActivityName = "Goods Receipt"
  • Columns: SupplierID, OrderQuantity, ReceivedQuantity
  • Change Columns: なし
  • New Columns:
    • Column Name: OnTimeDeliveryRate
    • Display Name: On-Time Delivery Rate %
    • Data Type: Double
    • Source Type: Case
    • Column Name: QualityScore
    • Display Name: Supplier Quality Score
    • Data Type: Double
    • Source Type: Case
  • Python Code:
# Calculate delivery performance
def calculate_supplier_metrics(group):
    po_created = group[group['ActivityName'] == 'PO Created']['ActivityTime'].min()
    goods_received = group[group['ActivityName'] == 'Goods Receipt']['ActivityTime'].max()

    # Expected delivery time is 5 business days
    expected_days = 5
    actual_days = np.busday_count(po_created.date(), goods_received.date())
    on_time = 1 if actual_days <= expected_days else 0

    # Check for quality issues
    has_quality_issue = 'Quality Issue' in group['ActivityName'].values
    has_return = 'Return to Supplier' in group['ActivityName'].values

    quality_score = 100
    if has_quality_issue:
        quality_score -= 30
    if has_return:
        quality_score -= 40

    return pd.Series({
        'OnTimeDelivery': on_time,
        'QualityScore': quality_score
    })

# Calculate metrics for each case
supplier_metrics = event_table.groupby('CaseId').apply(calculate_supplier_metrics)

# Aggregate by supplier
supplier_performance = case_table.merge(supplier_metrics, left_on='CaseId', right_index=True)
supplier_summary = supplier_performance.groupby('SupplierID').agg({
    'OnTimeDelivery': 'mean',
    'QualityScore': 'mean'
}).reset_index()
supplier_summary.columns = ['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']

# Add back to case table
case_table = case_table.merge(
    supplier_summary[['SupplierID', 'OnTimeDeliveryRate', 'AvgQualityScore']],
    on='SupplierID',
    how='left'
)
case_table['OnTimeDeliveryRate'] = case_table['OnTimeDeliveryRate'] * 100
case_table.rename(columns={'AvgQualityScore': 'QualityScore'}, inplace=True)
  • Python Image: LOCAL

出力: ケースレベルで以下のサプライヤー性能指標を作成します:

  • 「On-Time Delivery Rate %」:このサプライヤーによる納品のオンタイム率(%)
  • 「Supplier Quality Score」:品質問題や返品に基づく0〜100の品質スコア

インサイト: これらの指標により調達チームはサプライヤーパフォーマンスを客観的に評価し、ベンダー選定や改善介入の判断を支援します。

例 4: プロセスコメントのテキストマイニング

シナリオ: ITサービス管理プロセスにおいて、チケットコメントを分析し問題のカテゴリ分類や感情分析を行います。

設定:

  • Filter: なし
  • Columns: TicketDescription, ResolutionNotes
  • Change Columns: なし
  • New Columns:
    • Column Name: IssueCategory
    • Display Name: Issue Category
    • Data Type: String
    • Source Type: Case
    • Column Name: CustomerSentiment
    • Display Name: Customer Sentiment
    • Data Type: String
    • Source Type: Case
  • Python Code:
import re

# Define keywords for categorization
category_keywords = {
    'Hardware': ['laptop', 'desktop', 'printer', 'mouse', 'keyboard', 'monitor', 'hardware'],
    'Software': ['application', 'software', 'program', 'install', 'update', 'crash', 'error'],
    'Network': ['network', 'internet', 'wifi', 'connection', 'vpn', 'firewall'],
    'Access': ['password', 'login', 'access', 'permission', 'authentication', 'account'],
    'Other': []
}

# Sentiment indicators
negative_words = ['slow', 'broken', 'failed', 'cannot', 'unable', 'frustrated', 'urgent', 'critical']
positive_words = ['resolved', 'working', 'fixed', 'thank', 'great', 'excellent', 'happy']

def categorize_issue(text):
    if pd.isna(text):
        return 'Other'
    text_lower = text.lower()
    for category, keywords in category_keywords.items():
        if any(keyword in text_lower for keyword in keywords):
            return category
    return 'Other'

def analyze_sentiment(text):
    if pd.isna(text):
        return 'Neutral'
    text_lower = text.lower()
    negative_count = sum(1 for word in negative_words if word in text_lower)
    positive_count = sum(1 for word in positive_words if word in text_lower)

    if negative_count > positive_count:
        return 'Negative'
    elif positive_count > negative_count:
        return 'Positive'
    else:
        return 'Neutral'

# Apply text analysis
case_table['IssueCategory'] = case_table['TicketDescription'].apply(categorize_issue)
case_table['CustomerSentiment'] = case_table['TicketDescription'].apply(analyze_sentiment)
  • Python Image: LOCAL

出力: 以下の2つのテキスト派生属性を生成します:

  • 「Issue Category」:IT問題のカテゴリ分類(ハードウェア、ソフトウェア、ネットワーク、アクセス、その他)
  • 「Customer Sentiment」:感情分析結果(ポジティブ、ネガティブ、ニュートラル)

インサイト: このエンリッチメントにより、ITサービスマネージャーは問題の分布を理解し、顧客感情に基づいて優先順位付けし、追加支援や教育が必要な領域を特定できます。

例 5: 財務コンプライアンスリスクスコアリング

シナリオ: 財務取引承認プロセスにおいて、複数のリスク要因に基づきコンプライアンスリスクスコアを計算します。

設定:

  • Filter: TransactionType = "Wire Transfer"
  • Columns: TransactionAmount, CustomerCountry, AccountAge, PreviousFlags
  • Change Columns: なし
  • New Columns:
    • Column Name: ComplianceRiskScore
    • Display Name: Compliance Risk Score
    • Data Type: Integer
    • Source Type: Case
    • Column Name: RiskLevel
    • Display Name: Risk Level
    • Data Type: String
    • Source Type: Case
  • Python Code:
# Define risk factors and weights
high_risk_countries = ['Country1', 'Country2', 'Country3']  # Placeholder for actual list
suspicious_amount_threshold = 10000
rapid_transaction_threshold = 5  # transactions per day

def calculate_risk_score(row):
    risk_score = 0

    # Amount risk (0-30 points)
    if row['TransactionAmount'] > suspicious_amount_threshold:
        risk_score += min(30, int(row['TransactionAmount'] / suspicious_amount_threshold * 10))

    # Geographic risk (0-25 points)
    if row['CustomerCountry'] in high_risk_countries:
        risk_score += 25

    # Account age risk (0-20 points)
    if pd.notna(row['AccountAge']) and row['AccountAge'] < 30:
        risk_score += 20 - int(row['AccountAge'] / 30 * 20)

    # Previous flags risk (0-25 points)
    if pd.notna(row['PreviousFlags']) and row['PreviousFlags'] > 0:
        risk_score += min(25, row['PreviousFlags'] * 5)

    return risk_score

# Calculate transaction velocity
transaction_counts = event_table[event_table['ActivityName'] == 'Transaction Initiated'].groupby('CaseId').size()
case_table['TransactionVelocity'] = case_table['CaseId'].map(transaction_counts).fillna(0)

# Calculate risk scores
case_table['ComplianceRiskScore'] = case_table.apply(calculate_risk_score, axis=1)

# Assign risk levels
def assign_risk_level(score):
    if score >= 70:
        return 'High'
    elif score >= 40:
        return 'Medium'
    else:
        return 'Low'

case_table['RiskLevel'] = case_table['ComplianceRiskScore'].apply(assign_risk_level)

# Clean up temporary columns
case_table = case_table.drop(['TransactionVelocity'], axis=1)
  • Python Image: LOCAL

出力: 以下の包括的なリスク評価属性を作成します:

  • 「Compliance Risk Score」:0〜100の数値によるリスクスコア
  • 「Risk Level」:リスクのカテゴリ分類(高、中、低)

インサイト: このリスクスコアリングはコンプライアンスチームに取引レビューの優先順位付け、リスクレベルに基づく承認ワークフローの自動化、規制遵守の確保と誤検知の低減を支援します。

出力

Pythonエンリッチメントオペレーターは、カスタムコードに基づいて新規または変更された属性を生成します:

新規ケース属性: case_table DataFrameに追加された定義済みNew Columnsに該当するカラムは、ケースレベル属性としてデータセットに作成されます。これらはフィルター、計算機、他のエンリッチメントで即時に利用可能です。

新規イベント属性: event_table DataFrameに追加された定義済みNew Columnsに該当するカラムは、イベントレベル属性として作成されます。イベントごとの計算や分類を表現できます。

変更された属性: Change Columnsで指定された既存カラムの値が更新される場合があります。元のデータ型は維持されますが、ビジネスロジックに従って値は変換可能です。

データ型の取り扱い: オペレーターはPython と mindzieStudio間の型変換を自動で行います:

  • Python文字列 → mindzieStudio 文字列
  • Python int32/int64 → mindzieStudio 整数
  • Python float → mindzieStudio 倍精度浮動小数点
  • Python datetime → mindzieStudio 日時
  • Python bool → mindzieStudio ブール

ケースとイベントの除外: データフレームから除外することでケースやイベントを削除する高度な利用も可能です。出力case_tableに存在しないケースはデータセットから削除されます。

これらのエンリッチ属性はmindzieStudioの他の全機能とシームレスに統合され、プロセスマイニング分析ワークフローにおいてカスタムPython変換を活用できます。

関連項目


このドキュメントはmindzie Studioプロセスマイニングプラットフォームの一部です。