scorecardpipeline.auto_report 源代码

# -*- coding: utf-8 -*-
"""
@Time    : 2023/12/29 11:17
@Author  : itlubber
@Site    : itlubber.art
"""
import traceback
import warnings

warnings.filterwarnings("ignore")

import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from openpyxl.worksheet.worksheet import Worksheet

from .utils import *
from .processing import *
from .excel_writer import ExcelWriter, dataframe2excel


[文档]def auto_data_testing_report(data: pd.DataFrame, features=None, target="target", overdue=None, dpd=None, date=None, data_summary_comment="", freq="M", excel_writer=None, sheet="分析报告", start_col=2, start_row=2, writer_params={}, bin_params={}, feature_map={}, corr=False, pictures=["bin", "ks", "hist"], suffix=""): """自动数据测试报告,用于三方数据评估或自有评分效果评估 :param suffix: 用于避免未保存excel时,同名图片被覆盖的图片后缀名称 :param corr: 是否需要评估数值类变量之间的相关性,默认为 False,设置为 True 后会输出变量相关性图和表 :param pictures: 需要包含的图片,支持 ["ks", "hist", "bin"] :param data: 需要评估的数据集,需要包含目标变量 :param features: 需要进行分析的特征名称,支持单个字符串传入或列表传入 :param target: 目标变量名称 :param overdue: 逾期天数字段名称, 当传入 overdue 时,会忽略 target 参数 :param dpd: 逾期定义方式,逾期天数 > DPD 为 1,其他为 0,仅 overdue 字段起作用时有用 :param date: 日期列,通常为借款人申请日期或放款日期,可选字段,传入的情况下,结合字段 freq 参数输出不同时间粒度下的好坏客户分布情况 :param freq: 结合 date 日期使用,输出需要统计的粒度,默认 M,即按月统计 :param data_summary_comment: 数据样本概况中需要填入的备注信息,例如 "去除了历史最大逾期天数[0, dpd]内的灰客户" 等 :param excel_writer: 需要保存的excel文件名称或写入器 :param sheet: 需要保存的 sheet 名称,可传入已有的 worksheet 或 文字信息 :param start_col: 开始列 :param start_row: 开始行 :param writer_params: excel写入器初始化参数,仅在 excel_writer 为字符串时有效 :param bin_params: 统计分箱的参数,支持 `feature_bin_stats` 方法的参数 :param feature_map: 特征字典,增加文档可读性使用,默认 {} **参考样例** >>> import numpy as np >>> from scorecardpipeline import * >>> >>> # 加载数据集,标签转换为 0 和 1 >>> target = "creditability" >>> data = germancredit() >>> data[target] = data[target].map({"good": 0, "bad": 1}) >>> data["MOB1"] = [np.random.randint(0, 30) for i in range(len(data))] >>> features = data.columns.drop([target, "MOB1"]).tolist() >>> >>> # 测试报告输出 >>> auto_data_testing_report(data >>> , features=features >>> , target=target >>> , date=None # 传入日期列名,会按 freq 统计不同时间维度好坏样本的分布情况 >>> , freq="M" >>> , data_summary_comment="三方数据测试报告样例,支持同时评估多个不同标签定义下的数据有效性" >>> , excel_writer="三方数据测试报告.xlsx" >>> , sheet="分析报告" >>> , start_col=2 >>> , start_row=2 >>> , writer_params={} >>> , overdue=["MOB1"] >>> , dpd=[15, 7, 3] >>> , bin_params={"method": "dt", "min_bin_size": 0.05, "max_n_bins": 10, "return_cols": ["坏样本数", "坏样本占比", "坏样本率", "LIFT值", "坏账改善", "累积LIFT值", "分档KS值"]} # feature_bin_stats 函数的相关参数 >>> , pictures=['bin', 'ks', 'hist'] # 类别型变量不支持 ks 和 hist >>> , corr=True >>> ) """ init_setting() data = data.copy() if not isinstance(features, (list, tuple)): features = [features] if overdue and not isinstance(overdue, list): overdue = [overdue] if dpd and not isinstance(dpd, list): dpd = [dpd] if overdue: target = f"{overdue[0]} {dpd[0]}+" data[target] = (data[overdue[0]] > dpd[0]).astype(int) if isinstance(excel_writer, ExcelWriter): writer = excel_writer else: writer = ExcelWriter(**writer_params) worksheet = writer.get_sheet_by_name(sheet) if bin_params and "del_grey" in bin_params and bin_params.get("del_grey"): merge_columns = ["指标名称", "指标含义", "分箱"] else: merge_columns = ["指标名称", "指标含义", "分箱", "样本总数", "样本占比"] return_cols = [] if bin_params: if "return_cols" in bin_params and bin_params.get("return_cols"): return_cols = bin_params.pop("return_cols") if not isinstance(return_cols, (list, np.ndarray)): return_cols = [return_cols] return_cols = list(set(return_cols) - set(merge_columns)) else: return_cols = [] max_columns_len = len(merge_columns) + len(return_cols) * len(overdue) * len(dpd) if overdue and len(overdue) > 0 else len(merge_columns) + len(return_cols) end_row, end_col = writer.insert_value2sheet(worksheet, (start_row, start_col), value="数据有效性分析报告", style="header_middle", end_space=(start_row, start_col + max_columns_len - 1)) if date is not None and date in data.columns: if data[date].dtype.name in ["str", "object"]: start_date = pd.to_datetime(data[date]).min().strftime("%Y-%m-%d") end_date = pd.to_datetime(data[date]).max().strftime("%Y-%m-%d") else: start_date = data[date].min().strftime("%Y-%m-%d") end_date = data[date].max().strftime("%Y-%m-%d") dataset_summary = pd.DataFrame( [ ["回溯数据集", start_date, end_date, len(data), data[target].sum(), data[target].sum() / len(data), data_summary_comment] ], columns=["数据集", "开始时间", "结束时间", "样本总数", "坏客户数", "坏客户占比", "备注"], ) end_row, end_col = dataframe2excel(dataset_summary, writer, worksheet, percent_cols=["样本占比", "坏客户占比"], start_row=end_row + 2, title="样本总体分布情况") distribution = distribution_plot(data, date=date, freq=freq, target=target, save=f"model_report/sample_time_distribution{suffix}.png", result=True) end_row, end_col = writer.insert_value2sheet(worksheet, (end_row + 2, start_col), value="样本时间分布情况", style="header", end_space=(end_row + 2, start_col + len(distribution.columns) - 1)) end_row, end_col = writer.insert_pic2sheet(worksheet, f"model_report/sample_time_distribution{suffix}.png", (end_row + 1, start_col), figsize=(720, 370)) end_row, end_col = dataframe2excel(distribution, writer, worksheet, percent_cols=["样本占比", "好样本占比", "坏样本占比", "坏样本率"], condition_cols=["坏样本率"], start_row=end_row) end_row += 2 else: dataset_summary = pd.DataFrame( [ ["回溯数据集", len(data), data[target].sum(), data[target].sum() / len(data), data_summary_comment] ], columns=["数据集", "样本总数", "坏客户数", "坏客户占比", "备注"], ) end_row, end_col = dataframe2excel(dataset_summary, writer, worksheet, percent_cols=["样本占比", "坏客户占比"], start_row=end_row + 2, title="样本总体分布情况") end_row += 2 # 变量相关性 if corr: temp = data[features].select_dtypes(include="number") corr_plot(temp, save=f"model_report/auto_report_corr_plot{suffix}.png", annot=True if len(temp.columns) <= 10 else False, fontsize=14 if len(temp.columns) <= 10 else 12) end_row, end_col = dataframe2excel(temp.corr(), writer, worksheet, color_cols=list(temp.columns), start_row=end_row, figures=[f"model_report/auto_report_corr_plot{suffix}.png"], title="数值类变量相关性", figsize=(min(60 * len(temp.columns), 1080), min(55 * len(temp.columns), 950)), index=True, custom_cols=list(temp.columns), custom_format="0.00") end_row += 2 end_row, end_col = writer.insert_value2sheet(worksheet, (end_row, start_col), value="数值类特征 OR 评分效果评估", style="header_middle", end_space=(end_row, start_col + max_columns_len - 1)) features_iter = tqdm(features) for col in features_iter: features_iter.set_postfix(feature=feature_map.get(col, col)) try: if overdue is None: temp = data[[col, target]] else: temp = data[list(set([col, target] + overdue))] score_table_train = feature_bin_stats(temp, col, overdue=overdue, dpd=dpd, desc=f"{feature_map.get(col, col)}", target=target, **bin_params) if pictures and len(pictures) > 0: if "bin" in pictures: if score_table_train.columns.nlevels > 1: _ = score_table_train[["分箱详情", target]] _.columns = [c[-1] for c in _.columns] else: _ = score_table_train.copy() bin_plot(_, desc=f"{feature_map.get(col, col)}", figsize=(10, 5), anchor=0.935, save=f"model_report/feature_bins_plot_{col}{suffix}.png") if temp[col].dtypes.name not in ['object', 'str', 'category']: if "ks" in pictures: _ = temp.dropna().reset_index(drop=True) has_ks = len(_) > 0 and _[col].nunique() > 1 and _[target].nunique() > 1 if has_ks: ks_plot(_[col], _[target], figsize=(10, 5), title=f"{feature_map.get(col, col)}", save=f"model_report/feature_ks_plot_{col}{suffix}.png") if "hist" in pictures: _ = temp.dropna().reset_index(drop=True) if len(_) > 0: hist_plot(_[col], y_true=_[target], figsize=(10, 6), desc=f"{feature_map.get(col, col)} 好客户 VS 坏客户", bins=30, anchor=1.11, fontsize=14, labels={0: "好客户", 1: "坏客户"}, save=f"model_report/feature_hist_plot_{col}{suffix}.png") end_row, end_col = writer.insert_value2sheet(worksheet, (end_row + 2, start_col), value=f"数据字段: {feature_map.get(col, col)}", style="header", end_space=(end_row + 2, start_col + max_columns_len - 1)) if pictures and len(pictures) > 0: ks_row = end_row + 1 if "bin" in pictures: end_row, end_col = writer.insert_pic2sheet(worksheet, f"model_report/feature_bins_plot_{col}{suffix}.png", (ks_row, start_col), figsize=(600, 350)) if temp[col].dtypes.name not in ['object', 'str', 'category'] and temp[col].isnull().sum() != len(temp): if "ks" in pictures and has_ks: end_row, end_col = writer.insert_pic2sheet(worksheet, f"model_report/feature_ks_plot_{col}{suffix}.png", (ks_row, end_col - 1), figsize=(600, 350)) if "hist" in pictures: end_row, end_col = writer.insert_pic2sheet(worksheet, f"model_report/feature_hist_plot_{col}{suffix}.png", (ks_row, end_col - 1), figsize=(600, 350)) if return_cols: if score_table_train.columns.nlevels > 1 and not isinstance(merge_columns[0], tuple): merge_columns = [("分箱详情", c) for c in merge_columns] end_row, end_col = dataframe2excel(score_table_train[merge_columns + [c for c in score_table_train.columns if (isinstance(c, (tuple, list)) and c[-1] in return_cols) or (not isinstance(c, (tuple, list)) and c in return_cols) or (isinstance(return_cols[0], (tuple, list)) and isinstance(c, (tuple, list)) and c in return_cols)]], writer, worksheet, percent_cols=["样本占比", "好样本占比", "坏样本占比", "坏样本率", "LIFT值", "坏账改善", "累积LIFT值", "累积坏账改善"], condition_cols=["坏样本率", "LIFT值"], merge_column=["指标名称", "指标含义"], merge=True, fill=True, start_row=end_row) else: end_row, end_col = dataframe2excel(score_table_train, writer, worksheet, percent_cols=["样本占比", "好样本占比", "坏样本占比", "坏样本率", "LIFT值", "坏账改善", "累积LIFT值", "累积坏账改善"], condition_cols=["坏样本率", "LIFT值"], merge_column=["指标名称", "指标含义"], merge=True, fill=True, start_row=end_row) except: print(f"数据字段 {col} 分析时发生异常,请排查数据中是否存在异常:\n{traceback.format_exc()}") if not isinstance(excel_writer, ExcelWriter) and not isinstance(sheet, Worksheet): writer.save(excel_writer) return end_row, end_col