前回に続いて、23,534件の麻酔データ:エホチール、エフェドリン、ネオシネジンのいずれかを使用したどうかを術前データから予想。今回は、RDDとMLlibを用いる。
ーーーーーーーーーーーーーーーーーーーーーーーー
MLib package of PySpark
Load and transform the data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import pyspark.sql.types as typ labels = [ ('pressor', typ.IntegerType()), ('dept', typ.IntegerType()), ('operoom', typ.IntegerType()), ('register', typ.IntegerType()), ('anesthesia', typ.IntegerType()), ('ASA', typ.IntegerType()), ('age', typ.IntegerType()), ('sex', typ.IntegerType()), ('height', typ.DoubleType()), ('weight', typ.DoubleType()), ('age_cat', typ.IntegerType()), ('time_cat', typ.IntegerType()), ('ane_start', typ.IntegerType()), ('ope_time', typ.IntegerType()), ('ope_portion', typ.IntegerType()), ('position', typ.IntegerType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) pressor = spark.read.csv('/Users/*******/Documents/PySpark_Data/stats_codes_r4.csv', header=True, schema=schema) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import pyspark.mllib.stat as st import numpy as np numeric_cols = ['pressor','dept', 'operoom','register','anesthesia','ASA', 'age','sex','height','weight','age_cat','time_cat', 'ane_start','ope_time','ope_portion','position' ] numeric_rdd = pressor\ .select(numeric_cols)\ .rdd \ .map(lambda row: [e for e in row]) mllib_stats = st.Statistics.colStats(numeric_rdd) for col, m, v in zip(numeric_cols, mllib_stats.mean(), mllib_stats.variance()): print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v))) |
pressor: 0.54 0.50
dept: 13.81 6.57
operoom: 6.96 3.92
register: 1.34 0.82
anesthesia: 2.97 0.78
ASA: 2.07 1.53
age: 48.53 27.04
sex: 0.50 0.50
height: 149.28 35.97
weight: 51.32 28.35
age_cat: 5.37 2.67
time_cat: 10.38 2.93
ane_start: 10.46 2.93
ope_time: 199.68 146.16
ope_portion: 23.26 13.05
position: 4.87 0.67
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
categorical_cols = [e for e in pressor.columns if e not in numeric_cols] categorical_rdd = pressor\ .select(categorical_cols)\ .rdd \ .map(lambda row: [e for e in row]) for i, col in enumerate(categorical_cols): agg = categorical_rdd \ .groupBy(lambda row: row[i]) \ .map(lambda row: (row[0], len(row[1]))) print(col, sorted(agg.collect(), key=lambda el: el[1], reverse=True)) |
1 2 3 4 5 6 7 8 9 10 11 12 |
corrs = st.Statistics.corr(numeric_rdd) for i, el in enumerate(corrs > 0.5): correlated = [ (numeric_cols[j], corrs[i][j]) for j, e in enumerate(el) if e == 1.0 and j != i] if len(correlated) > 0: for e in correlated: print('{0}-to-{1}: {2:.2f}' \ .format(numeric_cols[i], e[0], e[1])) |
pressor-to-age: 0.53
pressor-to-age_cat: 0.52
register-to-ASA: 0.60
ASA-to-register: 0.60
age-to-pressor: 0.53
age-to-height: 0.52
age-to-age_cat: 0.96
height-to-age: 0.52
height-to-weight: 0.84
weight-to-height: 0.84
age_cat-to-pressor: 0.52
age_cat-to-age: 0.96
time_cat-to-ane_start: 0.99
ane_start-to-time_cat: 0.99
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
features_to_keep = [ 'pressor', 'dept', 'operoom', 'register', 'anesthesia', 'ASA', 'age', 'sex', 'height', 'time_cat', 'ope_time', 'ope_portion', 'position' ] pressor_transformed = pressor.select([e for e in features_to_keep]) pressor_transformed.take(5) |
[Row(pressor=0, dept=23, operoom=10, register=4, anesthesia=0, ASA=0, age=2, sex=0, height=87.5, time_cat=0, ope_time=90, ope_portion=0, position=0),
Row(pressor=0, dept=24, operoom=13, register=1, anesthesia=0, ASA=0, age=80, sex=0, height=157.0, time_cat=11, ope_time=180, ope_portion=0, position=0),
Row(pressor=0, dept=18, operoom=1, register=1, anesthesia=0, ASA=0, age=64, sex=0, height=157.5, time_cat=12, ope_time=60, ope_portion=0, position=0),
Row(pressor=0, dept=9, operoom=1, register=4, anesthesia=0, ASA=0, age=71, sex=1, height=149.0, time_cat=1, ope_time=60, ope_portion=0, position=0),
Row(pressor=0, dept=22, operoom=14, register=3, anesthesia=0, ASA=0, age=38, sex=0, height=167.9, time_cat=4, ope_time=210, ope_portion=0, position=0)]
1 |
pressor.take(5) |
[Row(pressor=0, dept=23, operoom=10, register=4, anesthesia=0, ASA=0, age=2, sex=0, height=87.5, weight=12.8, age_cat=1, time_cat=0, ane_start=0, ope_time=90, ope_portion=0, position=0),
Row(pressor=0, dept=24, operoom=13, register=1, anesthesia=0, ASA=0, age=80, sex=0, height=157.0, weight=57.2, age_cat=9, time_cat=11, ane_start=0, ope_time=180, ope_portion=0, position=0),
Row(pressor=0, dept=18, operoom=1, register=1, anesthesia=0, ASA=0, age=64, sex=0, height=157.5, weight=52.2, age_cat=7, time_cat=12, ane_start=0, ope_time=60, ope_portion=0, position=0),
Row(pressor=0, dept=9, operoom=1, register=4, anesthesia=0, ASA=0, age=71, sex=1, height=149.0, weight=48.0, age_cat=8, time_cat=1, ane_start=2, ope_time=60, ope_portion=0, position=0),
Row(pressor=0, dept=22, operoom=14, register=3, anesthesia=0, ASA=0, age=38, sex=0, height=167.9, weight=62.8, age_cat=4, time_cat=4, ane_start=4, ope_time=210, ope_portion=0, position=0)]
1 |
import pyspark.mllib.linalg as ln |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import pyspark.mllib.feature as ft import pyspark.mllib.regression as reg #hashing = ft.HashingTF(7) pressor_hashed = pressor_transformed \ .rdd \ .map(lambda row: reg.LabeledPoint( row[0], ln.Vectors.dense(row[1:])) ) pressor_hashed.take(5) |
[LabeledPoint(0.0, [23.0,10.0,4.0,0.0,0.0,2.0,0.0,87.5,0.0,90.0,0.0,0.0]),
LabeledPoint(0.0, [24.0,13.0,1.0,0.0,0.0,80.0,0.0,157.0,11.0,180.0,0.0,0.0]),
LabeledPoint(0.0, [18.0,1.0,1.0,0.0,0.0,64.0,0.0,157.5,12.0,60.0,0.0,0.0]),
LabeledPoint(0.0, [9.0,1.0,4.0,0.0,0.0,71.0,1.0,149.0,1.0,60.0,0.0,0.0]),
LabeledPoint(0.0, [22.0,14.0,3.0,0.0,0.0,38.0,0.0,167.9,4.0,210.0,0.0,0.0])]
1 2 3 |
pressor_train, pressor_test = pressor_hashed.randomSplit([0.6, 0.4]) pressor_train.take(5) |
[LabeledPoint(0.0, [24.0,13.0,1.0,0.0,0.0,80.0,0.0,157.0,11.0,180.0,0.0,0.0]),
LabeledPoint(0.0, [18.0,1.0,1.0,0.0,0.0,64.0,0.0,157.5,12.0,60.0,0.0,0.0]),
LabeledPoint(0.0, [13.0,3.0,1.0,0.0,0.0,7.0,1.0,130.5,8.0,60.0,0.0,0.0]),
LabeledPoint(0.0, [3.0,13.0,1.0,0.0,0.0,5.0,1.0,106.4,8.0,180.0,0.0,0.0]),
LabeledPoint(0.0, [9.0,5.0,1.0,0.0,0.0,9.0,0.0,132.0,8.0,150.0,0.0,0.0])]
1 2 3 4 5 6 7 |
from pyspark.mllib.classification \ import LogisticRegressionWithLBFGS LR_Model = LogisticRegressionWithLBFGS \ .train(pressor_train, iterations=10) LR_Model |
(weights=[-0.0192025792795,-0.0044390822126,-0.052370072204,-0.401262031975,0.11966806915,0.0474597210393,0.316300662306,-0.00267376233066,-0.0613497401323,0.00509484568777,-0.000437665331756,-0.176261999147], intercept=0.0)
1 2 3 4 5 6 7 8 |
LR_results = ( pressor_test.map(lambda row: row.label) \ .zip(LR_Model \ .predict(pressor_test\ .map(lambda row: row.features))) ).map(lambda row: (row[0], row[1] * 1.0)) LR_results.take(5) |
[(0.0, 1.0), (0.0, 1.0), (0.0, 1.0), (0.0, 1.0), (0.0, 0.0)]
1 2 3 4 5 6 7 8 |
import pyspark.mllib.evaluation as ev LR_evaluation = ev.BinaryClassificationMetrics(LR_results) print('Area under PR: {0:.2f}' \ .format(LR_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(LR_evaluation.areaUnderROC)) LR_evaluation.unpersist() |
Area under PR: 0.87
Area under ROC: 0.78
1 2 3 4 5 |
from pyspark.mllib.evaluation import MulticlassMetrics metrics = MulticlassMetrics(LR_results) print('accuracy: {0:.2f}'.format(metrics.accuracy)) print('weightedFalsePositiveRate: {0:.2f}'.format(metrics.weightedFalsePositiveRate)) |
accuracy: 0.77
weightedFalsePositiveRate: 0.22
Selecting only the most predictable features
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
selector = ft.ChiSqSelector(4).fit(pressor_train) topFeatures_train = ( pressor_train.map(lambda row: row.label) \ .zip(selector \ .transform(pressor_train \ .map(lambda row: row.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1])) topFeatures_test = ( pressor_test.map(lambda row: row.label) \ .zip(selector \ .transform(pressor_test \ .map(lambda row: row.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1])) |
Random Forest in Spark
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
from pyspark.context import SparkContext from pyspark.mllib.tree import RandomForest, RandomForestModel from pyspark.mllib.util import MLUtils RF_model = RandomForest \ .trainClassifier(data=topFeatures_train, numClasses=2, categoricalFeaturesInfo={}, numTrees=6, featureSubsetStrategy='all', impurity = 'gini', maxDepth = 4, maxBins = 32, seed=666) RF_results = ( topFeatures_test.map(lambda row: row.label) \ .zip(RF_model \ .predict(topFeatures_test \ .map(lambda row: row.features))) ) RF_evaluation = ev.BinaryClassificationMetrics(RF_results) print('Area under PR: {0:.2f}' \ .format(RF_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(RF_evaluation.areaUnderROC)) RF_evaluation.unpersist() |
Area under PR: 0.86
Area under ROC: 0.71
1 |
print(RF_model.toDebugString()) |
TreeEnsembleModel classifier with 6 trees
Tree 0:
If (feature 3 <= 1.0)
If (feature 2 <= 2.0)
If (feature 2 <= 0.0)
If (feature 1 <= 11.0)
Predict: 0.0
Else (feature 1 > 11.0)
Predict: 0.0
Else (feature 2 > 0.0)
If (feature 0 <= 15.0)
Predict: 1.0
Else (feature 0 > 15.0)
Predict: 1.0
Else (feature 2 > 2.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
Predict: 0.0
Else (feature 0 > 11.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 1.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 2 <= 5.0)
Predict: 1.0
Else (feature 2 > 5.0)
Predict: 0.0
Tree 1:
If (feature 3 <= 1.0)
If (feature 2 <= 2.0)
If (feature 2 <= 0.0)
If (feature 0 <= 3.0)
Predict: 0.0
Else (feature 0 > 3.0)
Predict: 1.0
Else (feature 2 > 0.0)
If (feature 0 <= 15.0)
Predict: 1.0
Else (feature 0 > 15.0)
Predict: 1.0
Else (feature 2 > 2.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
Predict: 0.0
Else (feature 0 > 11.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 1.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 2 <= 5.0)
Predict: 1.0
Else (feature 2 > 5.0)
Predict: 0.0
Tree 2:
If (feature 3 <= 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 0 <= 3.0)
Predict: 0.0
Else (feature 0 > 3.0)
Predict: 0.0
Else (feature 0 > 11.0)
If (feature 2 <= 2.0)
Predict: 0.0
Else (feature 2 > 2.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 1.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 3 <= 4.0)
Predict: 1.0
Else (feature 3 > 4.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 2 <= 5.0)
Predict: 1.0
Else (feature 2 > 5.0)
Predict: 0.0
Tree 3:
If (feature 3 <= 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 0.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 1.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 3 <= 5.0)
Predict: 1.0
Else (feature 3 > 5.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 2 <= 5.0)
Predict: 1.0
Else (feature 2 > 5.0)
Predict: 0.0
Tree 4:
If (feature 3 <= 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 0.0
Else (feature 0 > 11.0)
If (feature 1 <= 1.0)
Predict: 0.0
Else (feature 1 > 1.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 1.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 2 <= 5.0)
Predict: 1.0
Else (feature 2 > 5.0)
Predict: 0.0
Tree 5:
If (feature 3 <= 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 0.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 0 <= 17.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 17.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 0.0
Else (feature 3 > 1.0)
If (feature 0 <= 14.0)
If (feature 0 <= 11.0)
If (feature 2 <= 2.0)
Predict: 1.0
Else (feature 2 > 2.0)
Predict: 1.0
Else (feature 0 > 11.0)
If (feature 0 <= 13.0)
Predict: 0.0
Else (feature 0 > 13.0)
Predict: 0.0
Else (feature 0 > 14.0)
If (feature 2 <= 5.0)
If (feature 0 <= 17.0)
Predict: 1.0
Else (feature 0 > 17.0)
Predict: 1.0
Else (feature 2 > 5.0)
If (feature 1 <= 8.0)
Predict: 0.0
Else (feature 1 > 8.0)
Predict: 0.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
LR_Model_2 = LogisticRegressionWithLBFGS \ .train(topFeatures_train, iterations=10) LR_results_2 = ( topFeatures_test.map(lambda row: row.label) \ .zip(LR_Model_2 \ .predict(topFeatures_test \ .map(lambda row: row.features))) ).map(lambda row: (row[0], row[1] * 1.0)) LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2) print('Area under PR: {0:.2f}' \ .format(LR_evaluation_2.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(LR_evaluation_2.areaUnderROC)) LR_evaluation_2.unpersist() |
Area under PR: 0.79
Area under ROC: 0.61
1 |
LR_Model_2 |
(weights=[0.0539572066071,-0.0139919738754,-0.27993351311,0.162644911917], intercept=0.0)