基于Spark环境的模型分布式打分
基于Spark环境的模型分布式打分,本质上是对RDD调用打分函数,或者使用udf/pandas udf函数对分布式打分进行封装后,在DataFrame上调用。
*主要适用于特征数小于255个的版本,特征超过255,会因Python的函数参数限制而报错
目前的模型特征基本都超过255,所以差不多已弃用
# -*- coding:utf-8 -*-
import sys
import time
import numpy as np
import pandas as pd
from sklearn.externals import joblib
import xgboost as xgb
import lightgbm as lgb
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import NumericType, DoubleType
def printWithTime(msg, level='info'):
if level == 'info':
print(time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime()), " [INFO]", msg)
elif level == 'error':
print(time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime()), " [ERROR]", msg)
def main():
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
printWithTime("Spark Session Created.")
model_type = sys.argv[1]
model_path = sys.argv[2]
feature_path = sys.argv[3]
input_table_name = sys.argv[4]
output_table_name = sys.argv[5]
main_key_list = sys.argv[6]
main_key_list = main_key_list.split(",")
if not output_table_name.startswith("dp_data_db."):
printWithTime("Output_table_name must start with dp_data_db.", 'error')
sys.exit(1)
printWithTime("Loading model from {}".format(model_path))
if model_type in ["xgb-sklearn", "lgb-sklearn"]:
model = joblib.load(model_path)
elif model_type == "xgb-booster":
model = xgb.Booster(model_file=model_path)
elif model_type == "lgb-booster":
model = lgb.Booster(model_file=model_path)
else:
printWithTime("Model type {} not supported.".format(
model_type), 'error')
sys.exit(1)
printWithTime("Loading feature from {}".format(feature_path))
feature_list = joblib.load(feature_path)
printWithTime("Lazy Loading data from {}".format(input_table_name))
data = spark.sql(f"select * from {input_table_name}")
printWithTime("Convert all features to DoubleType")
# get all string type columsn in data
dtypes_dict = dict(data.dtypes)
str_cols = [f.name for f in data.schema.fields if not isinstance(
f.dataType, NumericType)]
str_features = [f for f in feature_list if f in str_cols]
# convert all string type features to double type
for f in str_features:
printWithTime("Converting {} to DoubleType".format(f))
data = data.withColumn(f, data[f].cast(DoubleType()))
model = sc.broadcast(model)
feature_list = sc.broadcast(feature_list)
if model_type in ["xgb-sklearn", "lgb-sklearn"]:
def predict_(feature_list):
@pandas_udf(returnType=DoubleType())
def predict_score(*features):
X = pd.concat(features, axis=1).values
X = pd.DataFrame(X, columns=feature_list)
y = model.value.predict_proba(X)[:, 1]
return pd.Series(y)
return predict_score
elif model_type == "xgb-booster":
def predict_(feature_list):
@pandas_udf(returnType=DoubleType())
def predict_score(*features):
X = pd.concat(features, axis=1).values
X = xgb.DMatrix(X, missing=np.nan)
y = model.value.predict(X)
return pd.Series(y)
return predict_score
elif model_type == "lgb-booster":
def predict_(feature_list):
@pandas_udf(returnType=DoubleType())
def predict_score(*features):
X = pd.concat(features, axis=1).values
y = model.value.predict(X)
return pd.Series(y)
return predict_score
data = data.withColumn("pred", predict_(feature_list.value)(
*[data[f] for f in feature_list.value]))
data = data.select(*main_key_list, "pred")
printWithTime("Start to predict...")
data.repartition(10).write.format('hive').mode(
"overwrite").saveAsTable(output_table_name)
printWithTime("Model Socre Predict Finished.")
printWithTime("Result has been saved to {}".format(output_table_name))
if __name__ == '__main__':
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
为了突破特征数的限制,思路是把特征压入array,再到udf里进行解析
# step1 col->array
data = data.withColumn("feature_group",F.array(*[F.col(c) for c in fake_feature]))
# step2 define score function
def predict_(feature_list):
@pandas_udf(returnType=DoubleType())
def predict_score(features):
X = pd.DataFrame(list(features), columns=feature_list).astype(float)
y = model.value.predict_proba(X)[:, 1]
return pd.Series(y)
return predict_score
# step3 broadcast file
model = sc.broadcast(fakemodel)
feature_list = sc.broadcast(fake_feature)
# step4 score
data = data.withColumn('testscore',predict_(feature_list.value)(F.col("feature_group")))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 01
- python、sql、pyspark计算常用风控指标12-27