栗子是个Rapper 栗子是个Rapper
首页
  • OldSchool

    • 风控基础
  • NewSchool

    • 风控模型
  • 数据结构与算法

    • LeetCode
  • 机器学习

    • 机器学习
    • 深度学习
  • 编程

    • Python
    • Scala
  • 大数据

    • Spark
  • 基本面
  • 友情链接
关于
  • 常用网址
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

AKA栗子

不劳而获暴富,无本万利躺赢
首页
  • OldSchool

    • 风控基础
  • NewSchool

    • 风控模型
  • 数据结构与算法

    • LeetCode
  • 机器学习

    • 机器学习
    • 深度学习
  • 编程

    • Python
    • Scala
  • 大数据

    • Spark
  • 基本面
  • 友情链接
关于
  • 常用网址
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 基于Spark环境的模型分布式打分
  • 风控算法
aka-chestnut
2023-12-29

基于Spark环境的模型分布式打分

基于Spark环境的模型分布式打分,本质上是对RDD调用打分函数,或者使用udf/pandas udf函数对分布式打分进行封装后,在DataFrame上调用。

    *主要适用于特征数小于255个的版本,特征超过255,会因Python的函数参数限制而报错

    目前的模型特征基本都超过255,所以差不多已弃用

    # -*- coding:utf-8 -*-
    import sys
    import time
    import numpy as np
    import pandas as pd
    from sklearn.externals import joblib
    import xgboost as xgb
    import lightgbm as lgb
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.types import NumericType, DoubleType
     
    def printWithTime(msg, level='info'):
        if level == 'info':
            print(time.strftime("%Y-%m-%d %H:%M:%S",
                  time.localtime()), " [INFO]", msg)
        elif level == 'error':
            print(time.strftime("%Y-%m-%d %H:%M:%S",
                  time.localtime()), " [ERROR]", msg)
     
    def main():
        spark = SparkSession.builder.getOrCreate()
        sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        printWithTime("Spark Session Created.")
        model_type = sys.argv[1]
        model_path = sys.argv[2]
        feature_path = sys.argv[3]
        input_table_name = sys.argv[4]
        output_table_name = sys.argv[5]
        main_key_list = sys.argv[6]
        main_key_list = main_key_list.split(",")
        if not output_table_name.startswith("dp_data_db."):
            printWithTime("Output_table_name must start with dp_data_db.", 'error')
            sys.exit(1)
        printWithTime("Loading model from {}".format(model_path))
        if model_type in ["xgb-sklearn", "lgb-sklearn"]:
            model = joblib.load(model_path)
        elif model_type == "xgb-booster":
            model = xgb.Booster(model_file=model_path)
        elif model_type == "lgb-booster":
            model = lgb.Booster(model_file=model_path)
        else:
            printWithTime("Model type {} not supported.".format(
                model_type), 'error')
            sys.exit(1)
        printWithTime("Loading feature from {}".format(feature_path))
        feature_list = joblib.load(feature_path)
        printWithTime("Lazy Loading data from {}".format(input_table_name))
        data = spark.sql(f"select * from {input_table_name}")
        printWithTime("Convert all features to DoubleType")
        # get all string type columsn in data
        dtypes_dict = dict(data.dtypes)
        str_cols = [f.name for f in data.schema.fields if not isinstance(
            f.dataType, NumericType)]
        str_features = [f for f in feature_list if f in str_cols]
        # convert all string type features to double type
        for f in str_features:
            printWithTime("Converting {} to DoubleType".format(f))
            data = data.withColumn(f, data[f].cast(DoubleType()))
        model = sc.broadcast(model)
        feature_list = sc.broadcast(feature_list)
        if model_type in ["xgb-sklearn", "lgb-sklearn"]:
            def predict_(feature_list):
                @pandas_udf(returnType=DoubleType())
                def predict_score(*features):
                    X = pd.concat(features, axis=1).values
                    X = pd.DataFrame(X, columns=feature_list)
                    y = model.value.predict_proba(X)[:, 1]
                    return pd.Series(y)
                return predict_score
        elif model_type == "xgb-booster":
            def predict_(feature_list):
                @pandas_udf(returnType=DoubleType())
                def predict_score(*features):
                    X = pd.concat(features, axis=1).values
                    X = xgb.DMatrix(X, missing=np.nan)
                    y = model.value.predict(X)
                    return pd.Series(y)
                return predict_score
        elif model_type == "lgb-booster":
            def predict_(feature_list):
                @pandas_udf(returnType=DoubleType())
                def predict_score(*features):
                    X = pd.concat(features, axis=1).values
                    y = model.value.predict(X)
                    return pd.Series(y)
                return predict_score
        data = data.withColumn("pred", predict_(feature_list.value)(
            *[data[f] for f in feature_list.value]))
        data = data.select(*main_key_list, "pred")
        printWithTime("Start to predict...")
        data.repartition(10).write.format('hive').mode(
            "overwrite").saveAsTable(output_table_name)
        printWithTime("Model Socre Predict Finished.")
        printWithTime("Result has been saved to {}".format(output_table_name))
     
    if __name__ == '__main__':
        main()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100

    为了突破特征数的限制,思路是把特征压入array,再到udf里进行解析

    # step1 col->array
    data = data.withColumn("feature_group",F.array(*[F.col(c) for c in fake_feature]))
    # step2 define score function
    def predict_(feature_list):
        @pandas_udf(returnType=DoubleType())
        def predict_score(features):
            X = pd.DataFrame(list(features), columns=feature_list).astype(float)
            y = model.value.predict_proba(X)[:, 1]
            return pd.Series(y)
        return predict_score
    # step3 broadcast file
    model = sc.broadcast(fakemodel)
    feature_list = sc.broadcast(fake_feature)
    # step4 score
    data = data.withColumn('testscore',predict_(feature_list.value)(F.col("feature_group")))
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    编辑 (opens new window)
    #风控#风控算法#Spark#Python
    上次更新: 2023/12/29, 15:15:13
    最近更新
    01
    python、sql、pyspark计算常用风控指标
    12-27
    更多文章>
    Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式