scorecardpipeline.scorecard 源代码
# -*- coding: utf-8 -*-
"""
@Time : 2024/4/15 16:52
@Author : itlubber
@Site : itlubber.art
"""
import math
from abc import abstractmethod
import numpy as np
import pandas as pd
from pandas import DataFrame
from scipy import stats
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
class BaseScoreTransformer(BaseEstimator, TransformerMixin):
def __init__(self, down_lmt=300, up_lmt=1000, greater_is_better=True, cutoff=None):
self.down_lmt = down_lmt
self.up_lmt = up_lmt
self.greater_is_better = greater_is_better
self.cutoff = cutoff
@abstractmethod
def predict(self, x):
pass
@staticmethod
def score_clip(score, clip=50):
"""传入评分分数,根据评分分布情况,返回评分等距分箱规则
:param score: 评分数据
:param clip: 区间间隔
:return: list,评分分箱规则
"""
clip_start = max(math.ceil(score.min() / clip) * clip, math.ceil(score.quantile(0.01) / clip) * clip)
clip_end = min(math.ceil(score.max() / clip) * clip, math.ceil(score.quantile(0.99) / clip) * clip)
return [i for i in range(clip_start, clip_end, clip)]
[文档]class StandardScoreTransformer(BaseScoreTransformer):
"""Stretch the predicted probability to a normal distributed score."""
[文档] def __init__(self, base_score=660, pdo=75, rate=2, bad_rate=0.15, down_lmt=300, up_lmt=1000, greater_is_better=True, cutoff=None):
super().__init__(down_lmt=down_lmt, up_lmt=up_lmt, greater_is_better=greater_is_better, cutoff=cutoff)
self.base_score = base_score
self.pdo = pdo
self.rate = rate
self.bad_rate = bad_rate
[文档] def fit(self, X, y=None, **fit_params):
self._validate_data(X, reset=True, accept_sparse=False, dtype="numeric", copy=False, force_all_finite=True)
base_score, down_lmt, up_lmt = self.base_score, self.down_lmt, self.up_lmt
if not down_lmt <= base_score <= up_lmt:
raise ValueError("base_score should be greater than {} and less than {}!".format(down_lmt, up_lmt))
bad_rate = self.bad_rate
if not 0.0 <= bad_rate <= 1.0:
raise ValueError("bad rate should be greater than e and less than 1!")
base_odds = bad_rate / (1. - bad_rate)
if self.greater_is_better:
B = self.pdo / np.log(self.rate)
else:
B = -self.pdo / np.log(self.rate)
A = base_score + B + np.log(base_odds)
self.A_ = A
self.B_ = B
self.base_odds = base_odds
return self
[文档] def scorecard_scale(self):
"""输出评分卡基准信息,包含 base_odds、base_score、rate、pdo、A、B
:return: pd.DataFrame,评分卡基准信息
"""
scorecard_kedu = pd.DataFrame(
[
["base_odds", self.base_odds, "根据业务经验设置的基础比率(违约概率/正常概率),估算方法:坏客户占比 / (1 - 样本坏客户占比)"],
["base_score", self.base_score, "基础ODDS对应的分数"],
["rate", self.rate, "设置分数的倍率"],
["pdo", self.pdo, "表示分数增长PDO时,ODDS值增长到RATE倍"],
["B", self.A_, "补偿值,计算方式:pdo / ln(rate)"],
["A", self.B_, "刻度,计算方式:base_score - B * ln(base_odds)"],
],
columns=["刻度项", "刻度值", "备注"],
)
return scorecard_kedu
def _transform(self, X):
check_is_fitted(self, ["A_", "B_"])
Xt = self._validate_data(X, reset=False, accept_sparse=False, dtype="numeric", copy=True, force_all_finite=True)
# if not np.all((0 <= Xt) & (Xt <= 1)):
# raise ValueError ("Input should be probabilities between 0 and 1.")
A, B = self.A_, self.B_
down_lmt, up_lmt = self.down_lmt, self.up_lmt
points = A - B * np.log(Xt / (1.0 - Xt))
points = np.clip(points, down_lmt, up_lmt)
return points
[文档] def transform(self, X):
data = self._transform(X)
if isinstance(X, DataFrame):
columns = X.columns
index = X.index
return DataFrame(data=data, columns=columns, index=index)
return data
[文档] def predict(self, X):
scores = np.ravel(self._transform(X))
if self.cutoff is None:
cutoff = self._transform([[0.5]])[0][0]
elif not self.down_lmt < self.cutoff < self.up_lmt:
raise ValueError("Cutoff point should be within down_lmt and up_lmt!")
else:
cutoff = self.cutoff
if self.greater_is_better:
return (scores < cutoff).astype(np.int)
else:
return (scores > cutoff).astype(np.int)
def _inverse_transform(self, X):
check_is_fitted(self, ["A_", "B_"])
Xt = check_array(X, accept_sparse=False, dtype="numeric", copy=True, force_all_finite=True)
down_lmt, up_lmt = self.down_lmt, self.up_lmt
if not np.all(np.logical_and((down_lmt <= Xt), (Xt <= up_lmt))):
raise ValueError("Input should be points between {} and {}".format(down_lmt, up_lmt))
A, B = self.A_, self.B_
probs = 1.0 - 1.0 / (np.exp((A - Xt) / B) + 1.0)
return probs
[文档] def inverse_transform(self, X):
data = self._inverse_transform(X)
if isinstance(X, DataFrame):
columns = X.columns
index = X.index
return DataFrame(data=data, columns=columns, index=index)
return data
def _more_tags(self):
return {
"allow_nan": False,
}
[文档]class NPRoundStandardScoreTransformer(StandardScoreTransformer):
[文档] def __init__(self, base_score=660, pdo=75, bad_rate=0.15, down_lmt=300, up_lmt=1000, round_decimals=0, greater_is_better=True, cutoff=None):
self.round_decimals = round_decimals
super(NPRoundStandardScoreTransformer, self).__init__(base_score=base_score, pdo=pdo, bad_rate=bad_rate, down_lmt=down_lmt, up_lmt=up_lmt,
greater_is_better=greater_is_better, cutoff=cutoff)
def _transform(self, X):
points = super()._transform(X)
decimals = self.round_decimals
points = np.round(points, decimals=decimals)
return points
[文档]class RoundStandardScoreTransformer(StandardScoreTransformer):
"""Stretch the predicted probability to a normal distributed score."""
[文档] def __init__(self, base_score=660, pdo=75, bad_rate=0.15, down_lmt=300, up_lmt=1000, round_decimals=0, greater_is_better=True, cutoff=None):
self.round_decimals = round_decimals
super(RoundStandardScoreTransformer, self).__init__(base_score=base_score, pdo=pdo, bad_rate=bad_rate, down_lmt=down_lmt, up_lmt=up_lmt,
greater_is_better=greater_is_better, cutoff=cutoff)
def _transform(self, X):
points = super()._transform(X)
decimals = self.round_decimals
points = np.array([[round(x[0], decimals)] for x in points])
return points
[文档]class BoxCoxScoreTransformer(BaseScoreTransformer):
[文档] def __init__(self, down_lmt=300, up_lmt=1000, greater_is_better=True, cutoff=None):
super().__init__(down_lmt=down_lmt, up_lmt=up_lmt, greater_is_better=greater_is_better, cutoff=cutoff)
@staticmethod
def _box_cox_optimize(x):
"""Find and return optimal lambda parameter of the Box-Cox transform by MLE, for observed data x.
We here use scipy builtins which uses the brent optimizer.
"""
# the computation of Lambda is influenced by NaNs so we need to get rid of them
_, lmbda = stats.boxcox(x, lmbda=None)
return lmbda
[文档] def fit(self, X, y=None, **fit_params):
X = check_array(X, accept_sparse=False, dtype="numeric", copy=True, force_all_finite=True)
if np.min(X) <= 0 or np.max(X) >= 1:
raise ValueError("The Box-Cox score transformation can only be applied to strictly positive probabilities")
if self.greater_is_better:
self.lambdas_ = np.array([self._box_cox_optimize(1.0 - col) for col in X.T])
else:
self.lambdas_ = np.array([self._box_cox_optimize(col) for col in X.T])
for i, lmbda in enumerate(self.lambdas_):
X[:, i] = stats.boxcox(X[:, i], lmbda)
self.scaler_ = MinMaxScaler(feature_range=(self.down_lmt, self.up_lmt)).fit(X)
return self
def _transform(self, X):
check_is_fitted(self, ["lambdas_", "scaler_"])
X = check_array(X, accept_sparse=False, dtype="numeric", copy=True, force_all_finite=True)
if np.min(X) < 0 or np.max(X) > 1:
raise ValueError("The Box-Cox score transformation can only be applied to strictly positive probabilities")
if self.greater_is_better:
X = 1.0 - X
for i, lmbda in enumerate(self.lambdas_):
X[:, i] = stats.boxcox(X[:, i], lmbda)
return self.scaler_.transform(X)
[文档] def transform(self, X):
data = self._transform(X)
if isinstance(X, DataFrame):
columns = X.columns
index = X.index
return DataFrame(data=data, index=index, columns=columns)
return data
[文档] def predict(self, X):
scores = np.ravel(self._transform(X))
if self.cutoff is None:
lmbda = self.lambdas_[0]
if lmbda != 0:
p = (0.5 ** lmbda - 1) / lmbda
else:
p = np.log(0.5)
scaler = self.scaler_
p *= scaler.scale_
p += scaler.min_
if scaler.clip:
if p < scaler.feature_range[0]:
p = scaler.feature_range[0]
elif p > scaler.feature_range[1]:
p = scaler.feature_range[1]
cutoff = p
elif not self.down_lmt < self.cutoff < self.up_lmt:
raise ValueError("Cutoff point should be within 'down_lmt' and 'up_lmt'!")
else:
cutoff = self.cutoff
if self.greater_is_better:
return (scores < cutoff).astype(np.int)
else:
return (scores > cutoff).astype(np.int)
def _inverse_transform(self, X):
check_is_fitted(self, ["lambdas_", "scaler_"])
X = check_array(X, accept_sparse=False, dtype="numeric", copy=True, force_all_finite=True)
if np.min(X) < self.down_lmt or np.max(X) > self.up_lmt:
raise ValueError("The Box-Cox score inverse transformation can only be applied to strictly bounded scores")
X_inv = self.scaler_.inverse_transform(X)
for i, lmbda in enumerate(self.lambdas_):
X_inv[:, i] = self._box_cox_inverse_tranform(X_inv[:, i], lmbda)
if self.greater_is_better:
X_inv = 1.0 - X_inv
return X_inv
[文档] def inverse_transform(self, X):
data = self._inverse_transform(X)
if isinstance(X, DataFrame):
columns = X.columns
index = X.index
return DataFrame(data=data, index=index, columns=columns)
return data
@staticmethod
def _box_cox_inverse_tranform(x, lmbda):
"""Return inverse-transformed input x following Box-Cox inverse transform with parameter lambda"""
if lmbda == 0:
x_inv = np.exp(x)
else:
x_inv = (x * lmbda + 1) ** (1 / lmbda)
return x_inv
if __name__ == '__main__':
import sys
sys.path.append("../")
from scorecardpipeline import *
import h2o
h2o.init()
test_select = h2o.H2OFrame(load_pickle("/Users/lubberit/Desktop/workspace/scorecardpipeline/examples/model_report/h2o_model/test_select.pkl"))
model_path = '/Users/lubberit/Desktop/workspace/scorecardpipeline/examples/model_report/h2o_model/StackedEnsemble_BestOfFamily_1_AutoML_1_20240415_162619'
best_model = h2o.load_model(model_path)
# score_transform = StandardScoreTransformer(base_score=400, pdo=50, bad_rate=test_select["target"].mean()[0], greater_is_better=True)
score_transform = BoxCoxScoreTransformer(greater_is_better=False)
y_pred = best_model.predict(test_select).as_data_frame()[["p1"]]
score_transform.fit(y_pred)
print(best_model.predict(test_select))
score = score_transform.transform(y_pred)
print(score)
print(score_transform.inverse_transform(score))
# print(score_transform.scorecard_scale())