自前のデータでPySpark MLに挑戦:
23,534件の麻酔データ:エホチール、エフェドリン、ネオシネジンのいずれかを使用したどうかを術前データから予想。
術前データ:
ーーーーーーーーーーーーーーーーー
昇圧剤 pressor
診療科コード dept
手術室コード ope_room
申し込み区分コード resister
麻酔方法コード anesthesia
ASA分類コード ASA
年齢年 age
性別コード sex
身長 height
体重 weight
年齢区分コード age_cat
入室時刻コード time_cat
麻酔開始時刻コード ane_start
予定手術時間 ope_time
手術部位コード ope_portion
体位コード position
ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー
昇圧剤 診療科コード 手術室コード 申し込み区分コード 麻酔方法コード ASA分類コード 年齢年 性別コード 身長 体重 年齢区分コード 入室時刻コード 麻酔開始時刻コード 予定手術時間 手術部位コード 体位コード
0 23 10 4 0 0 2 0 87.5 12.8 1 0 0 90 0 0
0 24 13 1 0 0 80 0 157 57.2 9 11 0 180 0 0
0 18 1 1 0 0 64 0 157.5 52.2 7 12 0 60 0 0
0 9 1 4 0 0 71 1 149 48 8 1 2 60 0 0
0 22 14 3 0 0 38 0 167.9 62.8 4 4 4 210 0 0
ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import pyspark.sql.types as typ labels = [ ('pressor', typ.IntegerType()), ('dept', typ.IntegerType()), ('operoom', typ.IntegerType()), ('register', typ.IntegerType()), ('anesthesia', typ.IntegerType()), ('ASA', typ.IntegerType()), ('age', typ.IntegerType()), ('sex', typ.IntegerType()), ('height', typ.DoubleType()), ('weight', typ.DoubleType()), ('age_cat', typ.IntegerType()), ('time_cat', typ.IntegerType()), ('ane_start', typ.IntegerType()), ('ope_time', typ.IntegerType()), ('ope_portion', typ.IntegerType()), ('position', typ.IntegerType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) pressor = spark.read.csv('/Users/*******/Documents/PySpark_Data/stats_codes_r4.csv', header=True, schema=schema) pressor.take(5) [Row(pressor=0, dept=23, operoom=10, register=4, anesthesia=0, ASA=0, age=2, sex=0, height=87.5, weight=12.8, age_cat=1, time_cat=0, ane_start=0, ope_time=90, ope_portion=0, position=0), Row(pressor=0, dept=24, operoom=13, register=1, anesthesia=0, ASA=0, age=80, sex=0, height=157.0, weight=57.2, age_cat=9, time_cat=11, ane_start=0, ope_time=180, ope_portion=0, position=0), Row(pressor=0, dept=18, operoom=1, register=1, anesthesia=0, ASA=0, age=64, sex=0, height=157.5, weight=52.2, age_cat=7, time_cat=12, ane_start=0, ope_time=60, ope_portion=0, position=0), Row(pressor=0, dept=9, operoom=1, register=4, anesthesia=0, ASA=0, age=71, sex=1, height=149.0, weight=48.0, age_cat=8, time_cat=1, ane_start=2, ope_time=60, ope_portion=0, position=0), Row(pressor=0, dept=22, operoom=14, register=3, anesthesia=0, ASA=0, age=38, sex=0, height=167.9, weight=62.8, age_cat=4, time_cat=4, ane_start=4, ope_time=210, ope_portion=0, position=0)] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import pyspark.ml.feature as ft featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col in labels[2:]], outputCol='features' ) import pyspark.ml.classification as cl logistic = cl.LogisticRegression( maxIter=10, regParam=0.01, labelCol='pressor') from pyspark.ml import Pipeline pipeline = Pipeline(stages=[ featuresCreator, logistic ]) pressor_train, pressor_test = pressor \ .randomSplit([0.7, 0.3], seed=666) model = pipeline.fit(pressor_train) test_model = model.transform(pressor_test) test_model.take(1) [Row(pressor=0, dept=0, operoom=1, register=1, anesthesia=3, ASA=2, age=5, sex=0, height=116.5, weight=24.8, age_cat=1, time_cat=12, ane_start=12, ope_time=60, ope_portion=28, position=5, features=DenseVector([1.0, 1.0, 3.0, 2.0, 5.0, 0.0, 116.5, 24.8, 1.0, 12.0, 12.0, 60.0, 28.0, 5.0]), rawPrediction=DenseVector([2.3705, -2.3705]), probability=DenseVector([0.9146, 0.0854]), prediction=0.0)] |
1 2 3 4 5 6 7 8 9 10 11 12 |
import pyspark.ml.evaluation as ev evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='pressor') print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'})) 0.8468534434735694 0.84483898495682 |
Saving the model
1 2 3 4 5 6 7 8 9 10 |
pipelinePath = '/Users/*******/Documents/PySpark_Data/pressor_oneHotEncoder_Logistic_Pipeline' pipeline.write().overwrite().save(pipelinePath) loadedPipeline = Pipeline.load(pipelinePath) loadedPipeline \ .fit(pressor_train)\ .transform(pressor_test)\ .take(1) [Row(pressor=0, dept=0, operoom=1, register=1, anesthesia=3, ASA=2, age=5, sex=0, height=116.5, weight=24.8, age_cat=1, time_cat=12, ane_start=12, ope_time=60, ope_portion=28, position=5, features=DenseVector([1.0, 1.0, 3.0, 2.0, 5.0, 0.0, 116.5, 24.8, 1.0, 12.0, 12.0, 60.0, 28.0, 5.0]))] |
Parameter hyper-tuning
Grid search
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
logistic = cl.LogisticRegression( labelCol='pressor') grid = tune.ParamGridBuilder() \ .addGrid(logistic.maxIter, [2, 10, 50]) \ .addGrid(logistic.regParam, [0.01, 0.05, 0.3]) \ .build() evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='pressor') cv = tune.CrossValidator( estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator ) pipeline = Pipeline(stages=[featuresCreator]) data_transformer = pipeline.fit(pressor_train) cvModel = cv.fit(data_transformer.transform(pressor_train)) data_train = data_transformer \ .transform(pressor_test) results = cvModel.transform(data_train) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'})) 0.8507106590905532 0.85231052841506 results = [ ( [ {key.name: paramValue} for key, paramValue in zip( params.keys(), params.values()) ], metric ) for params, metric in zip( cvModel.getEstimatorParamMaps(), cvModel.avgMetrics ) ] sorted(results, key=lambda el: el[1], reverse=True)[0] ([{'maxIter': 50}, {'regParam': 0.01}], 0.8450850618921985) |