scorecardpipeline.rule_extraction 源代码
# -*- coding: utf-8 -*-
"""
@Time : 2024/2/29 13:29
@Author : itlubber
@Site : itlubber.art
"""
import warnings
import os
import re
import graphviz
import dtreeviz
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import font_manager
from openpyxl.worksheet.worksheet import Worksheet
import category_encoders as ce
from optbinning import OptimalBinning
from sklearn.tree import DecisionTreeClassifier
from .rule import Rule
from .utils import init_setting
from .excel_writer import ExcelWriter, dataframe2excel
[文档]class DecisionTreeRuleExtractor:
[文档] def __init__(self, target="target", labels=["positive", "negative"], feature_map={}, nan=-1., max_iter=128, writer=None, seed=None, theme_color="2639E9", decimal=4):
"""决策树自动规则挖掘工具包
:param target: 数据集中好坏样本标签列名称,默认 target
:param labels: 好坏样本标签名称,传入一个长度为2的列表,第0个元素为好样本标签,第1个元素为坏样本标签,默认 ["positive", "negative"]
:param feature_map: 变量名称及其含义,在后续输出报告和策略信息时增加可读性,默认 {}
:param nan: 在决策树策略挖掘时,默认空值填充的值,默认 -1
:param max_iter: 最多支持在数据集上训练多少颗树模型,每次生成一棵树后,会剔除特征重要性最高的特征后,再生成树,默认 128
:param writer: 在之前程序运行时生成的 ExcelWriter,可以支持传入一个已有的writer,后续所有内容将保存至该workbook中,默认 None
:param seed: 随机种子,保证结果可复现使用,默认为 None
:param theme_color: 主题色,默认 2639E9 克莱因蓝,可设置位其他颜色
:param decimal: 精度,决策树分裂节点阈值的精度范围,默认 4,即保留4位小数
"""
self.decimal = decimal
self.seed = seed
self.nan = nan
self.target = target
self.labels = labels
self.theme_color = theme_color
self.feature_map = feature_map
self.decision_trees = []
self.max_iter = max_iter
self.target_enc = None
self.feature_names = None
self.dt_rules = pd.DataFrame()
self.end_row = 2
self.start_col = 2
self.describe_columns = ["组合策略", "命中数", "命中率", "好样本数", "好样本占比", "坏样本数", "坏样本占比", "坏样本率", "LIFT值", "坏账改善", "准确率", "精确率", "召回率", "F1分数", "样本整体坏率"]
init_setting()
if writer:
self.writer = writer
else:
self.writer = ExcelWriter(theme_color=self.theme_color)
[文档] def encode_cat_features(self, X, y):
cat_features = list(set(X.select_dtypes(include=[object, pd.CategoricalDtype]).columns))
cat_features_index = [i for i, f in enumerate(X.columns) if f in cat_features]
if len(cat_features) > 0:
if self.target_enc is None:
self.target_enc = ce.TargetEncoder(cols=cat_features)
self.target_enc.fit(X[cat_features], y)
self.target_enc.target_mapping = {}
X_TE = X.join(self.target_enc.transform(X[cat_features]).add_suffix('_target'))
for col in cat_features:
mapping = X_TE[[col, f"{col}_target"]].drop_duplicates()
self.target_enc.target_mapping[col] = dict(zip(mapping[col], mapping[f"{col}_target"]))
else:
X_TE = X.join(self.target_enc.transform(X[cat_features]).add_suffix('_target'))
X_TE = X_TE.drop(columns=cat_features)
return X_TE.rename(columns={f"{c}_target": c for c in cat_features})
else:
return X
[文档] def get_dt_rules(self, tree, feature_names):
tree_ = tree.tree_
left = tree.tree_.children_left
right = tree.tree_.children_right
feature_name = [feature_names[i] if i != -2 else "undefined!" for i in tree_.feature]
rules = dict()
def recurse(node, depth, parent): # 搜每个节点的规则
nonlocal rules
if tree_.feature[node] != -2: # 非叶子节点,搜索每个节点的规则
name = feature_name[node]
thd = np.round(tree_.threshold[node], self.decimal)
s = Rule("{} <= {}".format(name, thd))
# 左子
if node == 0:
rules[node] = s
else:
rules[node] = rules[parent] & s
recurse(left[node], depth + 1, node)
s = Rule("{} > {}".format(name, thd))
# 右子
if node == 0:
rules[node] = s
else:
rules[node] = rules[parent] & s
recurse(right[node], depth + 1, node)
recurse(0, 1, 0)
return list(rules.values())
[文档] def select_dt_rules(self, decision_tree, x, y, lift=0., max_samples=1., save=None, verbose=False, drop=False):
rules = self.get_dt_rules(decision_tree, x.columns)
try:
viz_model = dtreeviz.model(decision_tree,
X_train=x,
y_train=y,
feature_names=x.columns,
target_name=self.target,
class_names=self.labels,
)
except AttributeError:
raise "请检查 dtreeviz 版本"
rules_reports = pd.DataFrame()
for rule in rules:
rules_reports = pd.concat([rules_reports, rule.report(x.join(y), target=y.name).query("分箱 == '命中'")])
rules_reports = rules_reports.rename(columns={"指标名称": "组合策略", "样本总数": "命中数", "样本占比": "命中率"}).drop(columns=["分箱"])
rules_reports["样本整体坏率"] = round(y.mean(), self.decimal)
rules_reports = rules_reports.query(f"LIFT值 >= {lift} & 命中率 <= {max_samples}").reset_index(drop=True)
if len(rules_reports) > 0:
# font_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'matplot_chinese.ttf')
# font_manager.fontManager.addfont(font_path)
# plt.rcParams['font.family'] = font_manager.FontProperties(fname=font_path).get_name()
# plt.rcParams['axes.unicode_minus'] = False
decision_tree_viz = viz_model.view(
scale=1.5,
orientation='LR',
colors={
"classes": [None, None, ["#2639E9", "#F76E6C"], ["#2639E9", "#F76E6C", "#FE7715", "#FFFFFF"]],
"arrow": "#2639E9",
'text_wedge': "#F76E6C",
"pie": "#2639E9",
"tile_alpha": 1,
"legend_edge": "#FFFFFF",
},
ticks_fontsize=10,
label_fontsize=10,
fontname=plt.rcParams['font.family'],
)
if verbose:
from IPython.core.display_functions import display
if self.feature_map is not None and len(self.feature_map) > 0:
display(rules_reports.replace(self.feature_map, regex=True))
else:
display(rules_reports)
display(decision_tree_viz)
if save:
if os.path.dirname(save) and not os.path.exists(os.path.dirname(save)):
os.makedirs(os.path.dirname(save))
try:
decision_tree_viz.save("combine_rules_cache.svg")
except graphviz.backend.execute.ExecutableNotFound:
print("请确保您已安装 graphviz 程序并且正确配置了 PATH 路径。可参考: https://stackoverflow.com/questions/35064304/runtimeerror-make-sure-the-graphviz-executables-are-on-your-systems-path-aft")
try:
import cairosvg
cairosvg.svg2png(url="combine_rules_cache.svg", write_to=save, dpi=240)
except:
from reportlab.graphics import renderPDF
from svglib.svglib import svg2rlg
drawing = svg2rlg("combine_rules_cache.svg")
renderPDF.drawToFile(drawing, save, dpi=240, fmt="PNG")
if os.path.isfile("combine_rules_cache.svg"):
os.remove("combine_rules_cache.svg")
if os.path.isfile("combine_rules_cache"):
os.remove("combine_rules_cache")
if drop:
if len(rules_reports) > 0:
return rules_reports, decision_tree.feature_names_in_[list(decision_tree.feature_importances_).index(max(decision_tree.feature_importances_))], len(rules_reports)
else:
return rules_reports, decision_tree.feature_names_in_[list(decision_tree.feature_importances_).index(min(decision_tree.feature_importances_))], len(rules_reports)
else:
return rules_reports, len(rules_reports)
[文档] def query_dt_rules(self, x, y, parsed_rules=None):
if isinstance(parsed_rules, pd.DataFrame):
parsed_rules = [Rule(r) for r in parsed_rules["组合策略"].unique()]
rules_reports = pd.DataFrame()
for rule in parsed_rules:
rules_reports = pd.concat([rules_reports, rule.report(x.join(y), target=y.name).query("分箱 == '命中'")])
rules_reports = rules_reports.rename(columns={"指标名称": "组合策略", "样本总数": "命中数", "样本占比": "命中率"}).drop(columns=["分箱"])
rules_reports["样本整体坏率"] = round(y.mean(), self.decimal)
return rules_reports
[文档] def insert_dt_rules(self, parsed_rules, end_row, start_col, save=None, sheet=None, figsize=(500, 350)):
if isinstance(sheet, Worksheet):
worksheet = sheet
else:
worksheet = self.writer.get_sheet_by_name(sheet or "决策树组合策略挖掘")
end_row, end_col = dataframe2excel(parsed_rules, self.writer, sheet_name=worksheet, start_row=end_row + 1, start_col=start_col, percent_cols=['好样本占比', '坏样本占比', '命中率', '坏样本率', '样本整体坏率', 'LIFT值', '坏账改善', '准确率', '精确率', '召回率', 'F1分数'], condition_cols=["坏样本率", "LIFT值"])
if save is not None:
end_row, end_col = self.writer.insert_pic2sheet(worksheet, save, (end_row + 1, start_col), figsize=figsize)
return end_row, end_col
[文档] def fit(self, x, y=None, max_depth=2, lift=0., max_samples=1., min_score=None, verbose=False, *args, **kwargs):
"""组合策略挖掘
:param x: 包含标签的数据集
:param max_depth: 决策树最大深度,即最多组合的特征个数,默认 2
:param lift: 组合策略最小的lift值,默认 0.,即全部组合策略
:param max_samples: 每条组合策略的最大样本占比,默认 1.0,即全部组合策略
:param min_score: 决策树拟合时最小的auc,如果不满足则停止后续生成决策树
:param verbose: 是否调试模式,仅在 jupyter 环境有效
:param kwargs: DecisionTreeClassifier 参数,参考 https://scikit-learn.org/stable/modules/generated/sklearn.tree.DecisionTreeClassifier.html
"""
worksheet = self.writer.get_sheet_by_name("策略详情")
y = x[self.target]
X_TE = self.encode_cat_features(x.drop(columns=[self.target]), y)
X_TE = X_TE.fillna(self.nan)
self.feature_names = list(X_TE.columns)
for i in range(self.max_iter):
decision_tree = DecisionTreeClassifier(max_depth=max_depth, *args, **kwargs)
decision_tree = decision_tree.fit(X_TE, y)
if (min_score is not None and decision_tree.score(X_TE, y) < min_score) or len(X_TE.columns) < max_depth:
break
try:
parsed_rules, remove, total_rules = self.select_dt_rules(decision_tree, X_TE, y, lift=lift, max_samples=max_samples, verbose=verbose, save=f"model_report/auto_mining_rules/combiner_rules_{i}.png", drop=True)
if len(parsed_rules) > 0:
self.dt_rules = pd.concat([self.dt_rules, parsed_rules]).reset_index(drop=True)
if self.writer is not None:
if self.feature_map is not None and len(self.feature_map) > 0:
parsed_rules["组合策略"] = parsed_rules["组合策略"].replace(self.feature_map, regex=True)
self.end_row, _ = self.insert_dt_rules(parsed_rules, self.end_row, self.start_col, save=f"model_report/auto_mining_rules/combiner_rules_{i}.png", figsize=(500, 100 * total_rules), sheet=worksheet)
X_TE = X_TE.drop(columns=remove)
self.decision_trees.append(decision_tree)
except:
import traceback
traceback.print_exc()
if len(self.dt_rules) <= 0:
print(f"未挖掘到有效策略, 可以考虑适当调整预设的筛选参数, 降低 lift / 提高 max_samples, 当前筛选标准为: 提取 lift >= {lift} 且 max_samples <= {max_samples} 的策略")
return self
[文档] def transform(self, x, y=None):
y = x[self.target]
X_TE = self.encode_cat_features(x.drop(columns=[self.target]), y)
X_TE = X_TE.fillna(self.nan)
if self.dt_rules is not None and len(self.dt_rules) > 0:
parsed_rules = self.query_dt_rules(X_TE, y, parsed_rules=self.dt_rules)
if self.feature_map is not None and len(self.feature_map) > 0:
parsed_rules["组合策略"] = parsed_rules["组合策略"].replace(self.feature_map, regex=True)
return parsed_rules
else:
return pd.DataFrame(columns=self.describe_columns)
[文档] def report(self, valid=None, sheet="组合策略汇总", save=None):
"""组合策略插入excel文档
:param valid: 验证数据集
:param sheet: 保存组合策略的表格sheet名称
:param save: 保存报告的文件路径
:return: 返回每个数据集组合策略命中情况
"""
worksheet = self.writer.get_sheet_by_name(sheet or "决策树组合策略挖掘")
if sheet:
self.writer.workbook.move_sheet(sheet, -1)
parsed_rules_train = self.dt_rules.copy()
if self.feature_map is not None and len(self.feature_map) > 0:
parsed_rules_train["组合策略"] = parsed_rules_train["组合策略"].replace(self.feature_map, regex=True)
self.end_row, _ = self.writer.insert_value2sheet(worksheet, (2 if sheet else self.end_row + 2, self.start_col), value="组合策略: 训练集", style="header_middle", end_space=(2 if sheet else self.end_row + 2, self.start_col + len(parsed_rules_train.columns) - 1))
self.end_row, _ = self.insert_dt_rules(parsed_rules_train, self.end_row, self.start_col, sheet=worksheet)
outputs = (parsed_rules_train,)
if valid is not None:
if isinstance(valid, pd.DataFrame) and len(valid) > 0:
parsed_rules_val = self.transform(valid)
self.end_row, _ = self.writer.insert_value2sheet(worksheet, (self.end_row + 2, self.start_col), value="组合策略: 验证集", style="header_middle", end_space=(self.end_row + 2, self.start_col + len(parsed_rules_val.columns) - 1))
self.end_row, _ = self.insert_dt_rules(parsed_rules_val, self.end_row, self.start_col, sheet=worksheet)
outputs = outputs + (parsed_rules_val,)
elif isinstance(valid, (list, tuple)):
for i, dataset in enumerate(valid):
if isinstance(dataset, pd.DataFrame) and len(dataset) > 0:
parsed_rules_val = self.transform(dataset)
self.end_row, _ = self.writer.insert_value2sheet(worksheet, (self.end_row + 2, self.start_col), value=f"组合策略: 验证集 {i + 1}", style="header_middle", end_space=(self.end_row + 2, self.start_col + len(parsed_rules_val.columns) - 1))
self.end_row, _ = self.insert_dt_rules(parsed_rules_val, self.end_row, self.start_col, sheet=worksheet)
outputs = outputs + (parsed_rules_val,)
elif isinstance(valid, dict):
for k, dataset in valid.items():
if isinstance(dataset, pd.DataFrame) and len(dataset) > 0:
parsed_rules_val = self.transform(dataset)
self.end_row, _ = self.writer.insert_value2sheet(worksheet, (self.end_row + 2, self.start_col), value=f"组合策略: {k}", style="header_middle", end_space=(self.end_row + 2, self.start_col + len(parsed_rules_val.columns) - 1))
self.end_row, _ = self.insert_dt_rules(parsed_rules_val, self.end_row, self.start_col, sheet=worksheet)
outputs = outputs + (parsed_rules_val,)
if save:
self.writer.save(save)
return outputs