scorecardpipeline.rule 源代码

# -*- coding: utf-8 -*-
"""
@Time    : 2024/2/26 12:00
@Author  : itlubber
@Site    : itlubber.art
"""
import numpy as np
import numexpr as ne
from enum import Enum

import pandas as pd
from pandas import DataFrame
from sklearn.utils import check_array
from sklearn.metrics import f1_score, recall_score, accuracy_score, precision_score

from .processing import feature_bin_stats, Combiner
from .excel_writer import dataframe2excel


def _get_context(X, feature_names):
    return {name: X[:, i] for i, name in enumerate(feature_names)}


def _apply_expr_on_array(expr, X, feature_names):
    ctx = _get_context(X, feature_names)
    return ne.evaluate(expr, local_dict=ctx)


class RuleState(str, Enum):
    INITIALIZED = "initialized"
    APPLIED = "applied"


class RuleStateError(RuntimeError):
    pass


class RuleUnAppliedError(RuleStateError):
    pass


# 中級操作符
op_dict = {"GT": ">", "LT": "<", "EQ": "==", "ADD": "+", "GE": ">=", "LE": "<=", "SUBTRACT": "-", "MULTIPLY": "*", "DIVIDE": "/", "OR": "|", "AND": "&"}
# 数据类型 int, float-->float, 目前不支持 str
value_type_dict = {"int": float, "float": float, "string": str, "bool": bool}
# if_part, then_part, else_part
part_dict = ["if", "then", "else"]


# max_index: 数据的列数     feature_list: 列的名称
[文档]def json2expr(data, max_index, feature_list): if data.keys()._contains_("operator"): op = data.get("operator") params = data.get("params") if op == "FEATURE_INDEX": # 取变量,一个值{判断变量,索引是否正常} feature = params[0].get("feature") if params[0].get("index") >= max_index: # json中的索引异常: index >= 数据列数 raise ValueError("index error") if feature not in feature_list: # 变量异常:变量名不在数据的列名中 raise ValueError("{} do not belong to the data ".format(feature)) return feature elif op in op_dict: # 两个值,递归 value_list = [json2expr(params[0], max_index, feature_list), json2expr(params[1], max_index, feature_list)] return "(" + str(value_list[0]) + op_dict[op] + str(value_list[1]) + ")" else: # op 不在op_dict报错 raise TypeError("The operator: {} is invalid".format(op)) if data.keys().__contains__("value"): value_type = data.get("value_type") value = data["value"] # 对取到值的类型做转换, 不在类型字典中的值报错 if not value_type_dict.get(value_type): raise ValueError("Data type error!") return value_type_dict.get(value_type)(value)
[文档]class Rule:
[文档] def __init__(self, expr): # expr 既可以传递字符串,也可以传递dict """规则集 :param expr: 类似 DataFrame 的 query 方法传参方式即可,目前仅支持数值型变量规则 **参考样例** >>> from scorecardpipeline import * >>> target = "creditability" >>> data = germancredit() >>> data[target] = data[target].map({"good": 0, "bad": 1}) >>> data = data.select_dtypes("number") # 暂不支持字符型规则 >>> rule1 = Rule("duration_in_month < 10") >>> rule2 = Rule("credit_amount < 500") >>> rule1.report(data, target=target) >>> rule2.report(data, target=target) >>> (rule1 | rule2).report(data, target=target) >>> (rule1 & rule2).report(data, target=target) """ self._state = RuleState.INITIALIZED self.expr = expr
def __str__(self): return f"Rule({repr(self.expr)})" def __repr__(self): return f"Rule({repr(self.expr)})"
[文档] def predict(self, X: DataFrame, part=""): # dict预测对应part_dict 、字符串表达式对应"、"其他情况报错 if not isinstance(X, DataFrame): raise ValueError("Rule can only predict on DataFrame.") feature_names = X.columns.values.tolist() # 取数据的列名 X = check_array(X, dtype=None, ensure_2d=True, force_all_finite="allow-nan") if isinstance(self.expr, dict): # dict部分 if part not in part_dict: raise TypeError("Part : {} not in ['if','then','else']".format(part)) if not self.expr[part]: # 没有返回值的情况[] return list() dict2expr = json2expr(self.expr[part], X.shape[1], feature_names) if not isinstance(dict2expr, str): # 返回Value (类型已经做过转换),对其扩充 --> [value] * Len(X) result = [dict2expr] * len(X) else: # 表达式在进行计算 result = _apply_expr_on_array(dict2expr, X, feature_names) result = result.tolist() if not isinstance(result, list): # result 只有一个数值时,对其扩充 --> [value] * len(X) result = [result] * len(X) elif isinstance(self.expr, str): # 字符串表达式部分 if part != "": raise TypeError('The part of the expression must be ""') result = _apply_expr_on_array(self.expr, X, feature_names) else: raise TypeError("Rule currently only supports dict and expression") self.result_ = result return result
[文档] def report(self, datasets: pd.DataFrame, target="target", overdue=None, dpd=None, del_grey=False, desc="", filter_cols=None, prior_rules=None) -> pd.DataFrame: """规则效果报告表格输出 :param datasets: 数据集,需要包含 目标变量 或 逾期天数,当不包含目标变量时,会通过逾期天数计算目标变量,同时需要传入逾期定义的DPD天数 :param target: 目标变量名称,默认 target :param desc: 规则相关的描述,会出现在返回的表格当中 :param filter_cols: 指定返回的字段列表,默认不传 :param prior_rules: 先验规则,可以传入先验规则先筛选数据后再评估规则效果 :param overdue: 逾期天数字段名称 :param dpd: 逾期定义方式,逾期天数 > DPD 为 1,其他为 0,仅 overdue 字段起作用时有用 :param del_grey: 是否删除逾期天数 (0, dpd] 的数据,仅 overdue 字段起作用时有用 :return: pd.DataFrame,规则效果评估表 """ return_cols = ['指标名称', "指标含义", '分箱', '样本总数', '样本占比', '好样本数', '好样本占比', '坏样本数', '坏样本占比', '坏样本率', 'LIFT值', '坏账改善'] if desc is None or desc == "" and "指标含义" in return_cols: return_cols.remove("指标含义") rule_expr = self.expr def _report_one_rule(data, target, desc='', prior_rules=None): if prior_rules: prior_tables = prior_rules.report(data, target=target, desc=desc, prior_rules=None) prior_tables["规则分类"] = "先验规则" temp = data[~prior_rules.predict(data)] rule_result = pd.DataFrame({rule_expr: np.where(self.predict(temp), "命中", "未命中"), "target": temp[target].tolist()}) else: prior_tables = pd.DataFrame(columns=return_cols) rule_result = pd.DataFrame({rule_expr: np.where(self.predict(data), "命中", "未命中"), "target": data[target].tolist()}) combiner = Combiner(target=target) combiner.load({rule_expr: [["命中"], ["未命中"]]}) table = feature_bin_stats(rule_result, rule_expr, combiner=combiner, desc=desc, return_cols=return_cols) # 准确率、精确率、召回率、F1分数 metrics = pd.DataFrame({ "分箱": ["命中", "未命中"], "准确率": [accuracy_score(rule_result["target"], rule_result[rule_expr].map({"命中": 1, "未命中": 0})), accuracy_score(rule_result["target"], rule_result[rule_expr].map({"命中": 0, "未命中": 1}))], "精确率": [precision_score(rule_result["target"], rule_result[rule_expr].map({"命中": 1, "未命中": 0})), precision_score(rule_result["target"], rule_result[rule_expr].map({"命中": 0, "未命中": 1}))], "召回率": [recall_score(rule_result["target"], rule_result[rule_expr].map({"命中": 1, "未命中": 0})), recall_score(rule_result["target"], rule_result[rule_expr].map({"命中": 0, "未命中": 1}))], "F1分数": [f1_score(rule_result["target"], rule_result[rule_expr].map({"命中": 1, "未命中": 0})), f1_score(rule_result["target"], rule_result[rule_expr].map({"命中": 0, "未命中": 1}))], }) table = table.merge(metrics, on="分箱", how="left") if prior_rules: # prior_tables.insert(loc=0, column="规则分类", value=["先验规则"] * len(prior_tables)) table.insert(loc=0, column="规则分类", value=["验证规则"] * len(table)) table = pd.concat([prior_tables, table]) #.set_index(["规则分类"]) else: table.insert(loc=0, column="规则分类", value=["验证规则"] * len(table)) return table if isinstance(del_grey, bool) and del_grey: merge_columns = ["规则分类", "指标名称", "分箱"] else: merge_columns = ["规则分类", "指标名称", "分箱", "样本总数", "样本占比"] if overdue is not None: if not isinstance(overdue, list): overdue = [overdue] if not isinstance(dpd, list): dpd = [dpd] for i, col in enumerate(overdue): for j, d in enumerate(dpd): _datasets = datasets.copy() _datasets[f"{col}_{d}"] = (_datasets[col] > d).astype(int) if isinstance(del_grey, bool) and del_grey: _datasets = _datasets.query(f"({col} > {d}) | ({col} == 0)").reset_index(drop=True) if "指标含义" in return_cols: merge_columns.insert(0, "指标含义") if i == 0 and j == 0: table = _report_one_rule(_datasets, f"{col}_{d}", desc=desc, prior_rules=prior_rules) #.rename(columns={"坏账改善": f"{col} {d}+改善"}) table.columns = pd.MultiIndex.from_tuples([("规则详情", c) if c in merge_columns else (f"{col} DPD{d}+", c) for c in table.columns]) else: _table = _report_one_rule(_datasets, f"{col}_{d}", desc=desc, prior_rules=prior_rules) #.rename(columns={"坏账改善": f"{col} {d}+改善"}) _table.columns = pd.MultiIndex.from_tuples([("规则详情", c) if c in merge_columns else (f"{col} DPD{d}+", c) for c in _table.columns]) # table = table.merge(_table[["规则分类", "分箱", f"{col} {d}+改善"]], on=["规则分类", "分箱"]) table = table.merge(_table, on=[("规则详情", c) for c in merge_columns]) else: _datasets = datasets.copy() table = _report_one_rule(_datasets, target, desc=desc, prior_rules=prior_rules) if filter_cols: if not isinstance(filter_cols, list): filter_cols = [filter_cols] return table[[c for c in table.columns if (isinstance(c, tuple) and c[-1] in filter_cols + merge_columns) or (not isinstance(c, tuple) and c in filter_cols + merge_columns)]] return table
[文档] def result(self): if self._state != RuleState.APPLIED: raise RuleUnAppliedError("Invoke `predict` to make a rule applied.") return self.result_
def __eq__(self, other): if not isinstance(other, Rule): raise TypeError(f"Input should be of type Rule, got {type(other)} instead.") if self._state != other._state: raise RuleStateError(f"Input rule should be of the same state.") res = self.expr == other.expr if self._state == RuleState.INITIALIZED: return res return res and np.all(self.result() == other.result()) # rule combinations def __or__(self, other): if not isinstance(other, Rule): raise TypeError(f"Input should be of type Rule, got {type(other)} instead.") if self._state != other._state: raise RuleStateError(f"Input rule should be of the same state.") if isinstance(self.expr, str): r = Rule(f"({self.expr}) | ({other.expr})") if self._state == RuleState.INITIALIZED: return r r.result_ = np.logical_or(self.result(), other.result()) r._state = RuleState.APPLIED return r elif isinstance(self.expr, dict): self.new_dict = {} # 汇总成新的json self.new_dict["name"] = str(self.expr.get("name")) + str(other.expr.get("name")) self.new_dict["description"] = str(self.expr.get("description")) + " || " + str(other.expr.get("description")) self.new_dict["output"] = self.expr.get("output") # if_part if_dict = {} if_dict["value_type"] = "bool" if_dict["operator"] = "OR" if_dict["params"] = list() if_dict["params"].append(self.expr.get("if")) if_dict["params"].append(other.expr.get("if")) self.new_dict["if"] = if_dict # then_part then_part = {} if not self.expr.get("then") and not other.expr.get("then"): # 两条规则的then都为空 then_part = {} elif not self.expr.get("then"): # 一条规则的then存在 then_part = other.expr.get("then") elif not other.expr.get("then"): then_part = self.expr.get("then") else: # 两条规则的then都存在 if self.expr.get("then").get("value_type") != other.expr.get("then").get("value_type"): raise TypeError("两个规则then_part类型要一致") if self.expr.get("then").get("value_type") != "bool": raise TypeError("两个规则之间or运算, 类型需要设置为bool类型") then_part["value_type"] = "bool" then_part["operator"] = "OR" then_part["params"] = list() then_part["params"].append(self.expr.get("then")) then_part["params"].append(other.expr.get("then")) self.new_dict["then"] = then_part # else_part else_part = {} # self.else或者other.else存在为空的情况 if not self.expr.get("else") and not other.expr.get("else"): else_part = {} elif not self.expr.get("else"): # 一条规则的then存在 else_part = other.expr.get("else") elif not other.expr.get("else"): else_part = self.expr.get("else") else: if self.expr.get("then").get("value_type") != other.expr.get("then").get("value_type"): raise TypeError("两个规则else part类型要一致") if self.expr.get("then").get("value_type") != "bool": raise TypeError("两个规则之间or运算, 类型需要设置为bool类型") else_part["value_type"] = "bool" else_part["operator"] = "OR" else_part["params"] = list() else_part["params"].append(self.expr.get("else")) else_part["params"].append(other.expr.get("else")) self.new_dict["else"] = else_part return Rule(self.new_dict) def __and__(self, other): if not isinstance(other, Rule): raise TypeError(f"Input should be of type Rule, got {type(other)} instead.") if self._state != other._state: raise RuleStateError(f"Input rule should be of the same state.") if isinstance(self.expr, str): # 表达式 r = Rule(f"({self.expr}) & ({other.expr})") if self._state == RuleState.INITIALIZED: return r r.result_ = np.logical_and(self.result(), other.result()) r._state = RuleState.APPLIED return r elif isinstance(self.expr, dict): # dict self.new_dict = {} # 汇总成新的json self.new_dict["name"] = str(self.expr.get("name")) + str(other.expr.get("name")) self.new_dict["description"] = str(self.expr.get("description")) + " && " + str(other.expr.get("description")) self.new_dict["output"] = self.expr.get("output") # if_part if_dict = {} if_dict["value_type"] = "bool" if_dict["operator"] = "AND" if_dict["params"] = list() if_dict["params"].append(self.expr.get("if")) if_dict["params"].append(other.expr.get("if")) self.new_dict["if"] = if_dict # then_part then_part = {} if not self.expr.get("then") and not other.expr.get("then"): # 两条规则的then都为空 then_part = {} elif not self.expr.get("then"): # 一条规则的then存在 then_part = other.expr.get("then") elif not other.expr.get("then"): then_part = self.expr.get("then") else: # 两条规则的then都存在 if self.expr["then"].get("value_type") != other.expr["then"].get("value_type"): raise TypeError("两个规则then_part类型要一致") if self.expr.get("then").get("value_type") != "bool": raise TypeError("两个规则之间and运算, 类型需要设置为bool类型") then_part["value_type"] = "bool" then_part["operator"] = "AND" then_part["params"] = list() then_part["params"].append(self.expr.get("then")) then_part["params"].append(other.expr.get("then")) self.new_dict["then"] = then_part # else_part else_part = {} # self.else 或者other.else 存在为空的情况 if not self.expr.get("else") and not other.expr.get("else"): else_part = {} elif not self.expr.get("else"): # 一条规则的then存在 else_part = other.expr.get("else") elif not other.expr.get("else"): else_part = self.expr.get("else") else: if self.expr.get("else").get("value_type") != other.expr.get("else").get("value_type"): raise TypeError("两个规则else_part类型要一致") if self.expr.get("then").get("value_type") != "bool": raise TypeError("两个规则之间and运算, 类型需要设置为bool类型") else_part["value_type"] = "bool" else_part["operator"] = "AND" else_part["params"] = list() else_part["params"].append(self.expr.get("else")) else_part["params"].append(other.expr.get("else")) self.new_dict["else"] = else_part return Rule(self.new_dict) def __xor__(self, other): if not isinstance(other, Rule): raise TypeError(f"Input should be of type Rule, got {type(other)} instead.") if self._state != other._state: raise RuleStateError(f"Input rule should be of the same state.") r = Rule(f"({self.expr}) ^ ({other.expr})") if self._state == RuleState.INITIALIZED: return r r.result_ = np.logical_xor(self.result(), other.result()) r._state = RuleState.APPLIED return r def __mul__(self, other): return self.__or__(other) def __invert__(self): r = Rule(f"~({self.expr})") if self._state == RuleState.INITIALIZED: return r r.result_ = np.logical_not(self.result()) r._state = RuleState.APPLIED return r
[文档] @staticmethod def save(report, excel_writer, sheet_name=None, merge_column=None, percent_cols=None, condition_cols=None, custom_cols=None, custom_format="#,##0", color_cols=None, start_col=2, start_row=2, **kwargs): """保存规则结果至excel中,参数与 https://scorecardpipeline.itlubber.art/scorecardpipeline.html#scorecardpipeline.dataframe2excel 一致 """ if merge_column: merge_column = [c for c in report.columns if (isinstance(c, tuple) and c[-1] in merge_column) or (not isinstance(c, tuple) and c in merge_column)] if percent_cols: percent_cols = [c for c in report.columns if (isinstance(c, tuple) and c[-1] in percent_cols) or (not isinstance(c, tuple) and c in percent_cols)] if condition_cols: condition_cols = [c for c in report.columns if (isinstance(c, tuple) and c[-1] in condition_cols) or (not isinstance(c, tuple) and c in condition_cols)] if custom_cols: custom_cols = [c for c in report.columns if (isinstance(c, tuple) and c[-1] in custom_cols) or (not isinstance(c, tuple) and c in custom_cols)] if color_cols: color_cols = [c for c in report.columns if (isinstance(c, tuple) and c[-1] in color_cols) or (not isinstance(c, tuple) and c in color_cols)] end_row, end_col = dataframe2excel(report, excel_writer, sheet_name=sheet_name, merge_column=merge_column, percent_cols=percent_cols, condition_cols=condition_cols, custom_cols=custom_cols, custom_format=custom_format, color_cols=color_cols, start_col=start_col, start_row=start_row, **kwargs) return end_row, end_col