DataFrameに対するMachine LearningライブラリーであるMLを試す。入門PySpark Ch-06
———————————————-
データは、前回と同じく乳児の生存確率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS', typ.IntegerType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.IntegerType()), ('DIABETES_GEST', typ.IntegerType()), ('HYP_TENS_PRE', typ.IntegerType()), ('HYP_TENS_GEST', typ.IntegerType()), ('PREV_BIRTH_PRETERM', typ.IntegerType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) births = spark.read.csv('/Users/*******/Documents/PySpark_Data/births_transformed.csv.gz', header=True, schema=schema) births.take(5) [Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=29, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=99, MOTHER_PRE_WEIGHT=999, MOTHER_DELIVERY_WEIGHT=999, MOTHER_WEIGHT_GAIN=99, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=22, FATHER_COMBINED_AGE=29, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=65, MOTHER_PRE_WEIGHT=180, MOTHER_DELIVERY_WEIGHT=198, MOTHER_WEIGHT_GAIN=18, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=38, FATHER_COMBINED_AGE=40, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=155, MOTHER_DELIVERY_WEIGHT=167, MOTHER_WEIGHT_GAIN=12, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=39, FATHER_COMBINED_AGE=42, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=60, MOTHER_PRE_WEIGHT=128, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=24, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=1), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=18, FATHER_COMBINED_AGE=99, CIG_BEFORE=6, CIG_1_TRI=4, CIG_2_TRI=2, CIG_3_TRI=2, MOTHER_HEIGHT_IN=61, MOTHER_PRE_WEIGHT=110, MOTHER_DELIVERY_WEIGHT=130, MOTHER_WEIGHT_GAIN=20, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0)] |
Create transformers
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import pyspark.ml.feature as ft births = births \ .withColumn( 'BIRTH_PLACE_INT', births['BIRTH_PLACE'] \ .cast(typ.IntegerType())) encoder = ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC') featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col in labels[2:]] + \ [encoder.getOutputCol()], outputCol='features' ) import pyspark.ml.classification as cl logistic = cl.LogisticRegression( maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT') |
Create a pipeline
1 2 3 4 5 6 7 |
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[ encoder, featuresCreator, logistic ]) |
Fit the model
1 2 3 4 5 6 7 8 9 |
births_train, births_test = births \ .randomSplit([0.7, 0.3], seed=666) model = pipeline.fit(births_train) test_model = model.transform(births_test) test_model.take(1) [Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0545, -1.0545]), probability=DenseVector([0.7416, 0.2584]), prediction=0.0)] |
Model performance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import pyspark.ml.evaluation as ev evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT') print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'})) import pyspark.ml.evaluation as ev evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT') print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'})) |
Saving the model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
pipelinePath = '/Users/*******/Documents/PySpark_Data/infant_oneHotEncoder_Logistic_Pipeline' pipeline.write().overwrite().save(pipelinePath) loadedPipeline = Pipeline.load(pipelinePath) loadedPipeline \ .fit(births_train)\ .transform(births_test)\ .take(1) [Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0545, -1.0545]), probability=DenseVector([0.7416, 0.2584]), prediction=0.0)] from pyspark.ml import PipelineModel modelPath = '/Users/*******/Documents/PySpark_Data/infant_oneHotEncoder_Logistic_PipelineModel' model.write().overwrite().save(modelPath) loadedPipelineModel = PipelineModel.load(modelPath) test_loadedModel = loadedPipelineModel.transform(births_test) |
Parameter hyper-tuning
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import pyspark.ml.tuning as tune logistic = cl.LogisticRegression( labelCol='INFANT_ALIVE_AT_REPORT') grid = tune.ParamGridBuilder() \ .addGrid(logistic.maxIter, [2, 10, 50]) \ .addGrid(logistic.regParam, [0.01, 0.05, 0.3]) \ .build() evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT') cv = tune.CrossValidator( estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator ) pipeline = Pipeline(stages=[encoder,featuresCreator]) data_transformer = pipeline.fit(births_train) cvModel = cv.fit(data_transformer.transform(births_train)) data_train = data_transformer \ .transform(births_test) results = cvModel.transform(data_train) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'})) 0.7404526641072416 0.7157767684747429 results = [ ( [ {key.name: paramValue} for key, paramValue in zip( params.keys(), params.values()) ], metric ) for params, metric in zip( cvModel.getEstimatorParamMaps(), cvModel.avgMetrics ) ] sorted(results, key=lambda el: el[1], reverse=True)[0] ([{'maxIter': 50}, {'regParam': 0.01}], 0.738652833807851) |
Train-Validation splitting
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
selector = ft.ChiSqSelector( numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(), outputCol='selectedFeatures', labelCol='INFANT_ALIVE_AT_REPORT' ) logistic = cl.LogisticRegression( labelCol='INFANT_ALIVE_AT_REPORT', featuresCol='selectedFeatures' ) pipeline = Pipeline(stages=[encoder,featuresCreator,selector]) data_transformer = pipeline.fit(births_train) tvs = tune.TrainValidationSplit( estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator ) tvsModel = tvs.fit( data_transformer \ .transform(births_train) ) data_train = data_transformer \ .transform(births_test) results = tvsModel.transform(data_train) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'})) 0.7294296314442145 0.7037759446410553 |
Regression
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT','DIABETES_PRE', 'DIABETES_GEST','HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM', 'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI' ] featuresCreator = ft.VectorAssembler( inputCols=[col for col in features[1:]], outputCol='features' ) selector = ft.ChiSqSelector( numTopFeatures=6, outputCol="selectedFeatures", labelCol='MOTHER_WEIGHT_GAIN' ) import pyspark.ml.regression as reg regressor = reg.GBTRegressor( maxIter=15, maxDepth=3, labelCol='MOTHER_WEIGHT_GAIN') pipeline = Pipeline(stages=[ featuresCreator, selector, regressor]) weightGain = pipeline.fit(births_train) evaluator = ev.RegressionEvaluator( predictionCol="prediction", labelCol='MOTHER_WEIGHT_GAIN') print(evaluator.evaluate( weightGain.transform(births_test), {evaluator.metricName: 'r2'})) 0.48862170400240335 |