PySparkの学習を通じて、Sparkの解析プロセスがRDDから、DataFrameへ移行していること、さらにMLlibの開発は終了し、MLに移行していることなどを学ぶ。
今回は、799万件の米国における2014年と2015年の出生データbirths_train.csv分析をPySparkとMLlibで行う学習(入門PySpark Ch-05)。
———————————————
births_train.csvのデータ構造を眺めてみる。
1 2 3 4 5 6 7 8 9 |
INFANT_ALIVE_AT_REPORT,BIRTH_YEAR,BIRTH_MONTH,BIRTH_PLACE,MOTHER_AGE_YEARS,MOTHER_RACE_6CODE,MOTHER_EDUCATION,FATHER_COMBINED_AGE,FATHER_EDUCATION,MONTH_PRECARE_RECODE,CIG_BEFORE,CIG_1_TRI,CIG_2_TRI,CIG_3_TRI,MOTHER_HEIGHT_IN,MOTHER_BMI_RECODE,MOTHER_PRE_WEIGHT,MOTHER_DELIVERY_WEIGHT,MOTHER_WEIGHT_GAIN,DIABETES_PRE,DIABETES_GEST,HYP_TENS_PRE,HYP_TENS_GEST,PREV_BIRTH_PRETERM,NO_RISK,NO_INFECTIONS_REPORTED,LABOR_IND,LABOR_AUGM,STEROIDS,ANTIBIOTICS,ANESTHESIA,DELIV_METHOD_RECODE_COMB,ATTENDANT_BIRTH,APGAR_5,APGAR_5_RECODE,APGAR_10,APGAR_10_RECODE,INFANT_SEX,OBSTETRIC_GESTATION_WEEKS,INFANT_WEIGHT_GRAMS,INFANT_ASSIST_VENTI,INFANT_ASSIST_VENTI_6HRS,INFANT_NICU_ADMISSION,INFANT_SURFACANT,INFANT_ANTIBIOTICS,INFANT_SEIZURES,INFANT_NO_ABNORMALITIES,INFANT_ANCEPHALY,INFANT_MENINGOMYELOCELE,INFANT_LIMB_REDUCTION,INFANT_DOWN_SYNDROME,INFANT_SUSPECTED_CHROMOSOMAL_DISORDER,INFANT_NO_CONGENITAL_ANOMALIES_CHECKED,INFANT_BREASTFED N,2015,2,1,29,3,9,99,9,4,99,99,99,99,99,9,999,999,99,N,N,N,N,N,1,1,N,N,N,Y,N,2,1,4,2,3,1,F,35,2770,N,N,Y,N,N,N,0,N,N,N,N,N,0,N N,2015,2,1,22,1,3,29,4,1,0,0,0,0,65,4,180,198,18,N,N,N,N,N,1,1,N,N,N,N,N,2,1,5,2,7,3,F,35,1191,N,N,Y,N,N,N,0,N,N,N,N,C,0,N N,2015,2,1,38,1,4,40,3,1,0,0,0,0,63,3,155,167,12,N,N,N,N,N,1,1,N,N,N,Y,N,1,1,1,1,1,1,F,18,9999,U,U,U,U,U,U,9,N,N,N,N,N,1,N N,2015,4,1,39,2,7,42,6,1,0,0,0,0,60,3,128,152,24,N,N,N,N,Y,0,1,N,N,N,Y,Y,1,1,8,3,88,5,F,37,1925,N,N,N,N,N,Y,0,Y,N,N,N,N,0,N N,2015,4,1,18,3,2,99,9,2,6,4,2,2,61,2,110,130,20,N,N,N,N,N,1,1,N,N,N,N,N,1,1,3,1,3,1,M,32,670,U,U,U,U,U,U,9,N,N,N,N,N,1,N N,2015,4,1,32,1,4,37,4,1,0,0,0,0,66,2,150,162,12,N,N,N,N,N,0,1,N,N,N,Y,Y,1,1,1,1,99,5,F,21,434,N,N,N,N,N,N,1,N,N,N,N,N,1,N N,2015,5,1,22,3,3,25,2,3,0,0,0,0,68,2,155,191,36,N,N,N,N,N,0,1,N,N,N,N,N,2,1,8,3,88,5,M,35,3810,Y,N,Y,N,N,N,0,N,N,N,N,N,0,N ....... |
と、一行目はフィールド名、各フィールドには整数もあれば、Y or Nもある。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
1: INFANT_ALIVE_AT_REPORT, 2: BIRTH_YEAR, 3: BIRTH_MONTH, 4: BIRTH_PLACE, 5: MOTHER_AGE_YEARS, 6: MOTHER_RACE_6CODE, 7: MOTHER_EDUCATION, 8: FATHER_COMBINED_AGE, 9: FATHER_EDUCATION, 10: MONTH_PRECARE_RECODE, 11: CIG_BEFORE, 12: CIG_1_TRI, 13: CIG_2_TRI, 14: CIG_3_TRI, 15: MOTHER_HEIGHT_IN, 16: MOTHER_BMI_RECODE, 17: MOTHER_PRE_WEIGHT, 18: MOTHER_DELIVERY_WEIGHT, 19: MOTHER_WEIGHT_GAIN, 20: DIABETES_PRE, 21: DIABETES_GEST, 22: HYP_TENS_PRE, 23: HYP_TENS_GEST, 24: PREV_BIRTH_PRETERM, 25: NO_RISK, 26: NO_INFECTIONS_REPORTED, 27: LABOR_IND, 28: LABOR_AUGM, 29: STEROIDS, 30: ANTIBIOTICS, 31: ANESTHESIA, 32: DELIV_METHOD_RECODE_COMB, 33: ATTENDANT_BIRTH, 34: APGAR_5, 35: APGAR_5_RECODE, 36: APGAR_10, 37: APGAR_10_RECODE, 38: INFANT_SEX, 39: OBSTETRIC_GESTATION_WEEKS, 40: INFANT_WEIGHT_GRAMS, 41: INFANT_ASSIST_VENTI, 42: INFANT_ASSIST_VENTI_6HRS, 43: INFANT_NICU_ADMISSION, 44: INFANT_SURFACANT, 45: INFANT_ANTIBIOTICS, 46: INFANT_SEIZURES, 47: INFANT_NO_ABNORMALITIES, 48: INFANT_ANCEPHALY, 49: INFANT_MENINGOMYELOCELE, 50: INFANT_LIMB_REDUCTION, 51: INFANT_DOWN_SYNDROME, 52: INFANT_SUSPECTED_CHROMOSOMAL_DISORDER, 53: INFANT_NO_CONGENITAL_ANOMALIES_CHECKED, 54: INFANT_BREASTFED |
スキーマを登録して、データをDataFrame birthsへロードする。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT', typ.StringType()), ('BIRTH_YEAR', typ.IntegerType()), ('BIRTH_MONTH', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS', typ.IntegerType()), ('MOTHER_RACE_6CODE', typ.StringType()), ('MOTHER_EDUCATION', typ.StringType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('FATHER_EDUCATION', typ.StringType()), ('MONTH_PRECARE_RECODE', typ.StringType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), ('MOTHER_BMI_RECODE', typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.StringType()), ('DIABETES_GEST', typ.StringType()), ('HYP_TENS_PRE', typ.StringType()), ('HYP_TENS_GEST', typ.StringType()), ('PREV_BIRTH_PRETERM', typ.StringType()), ('NO_RISK', typ.StringType()), ('NO_INFECTIONS_REPORTED', typ.StringType()), ('LABOR_IND', typ.StringType()), ('LABOR_AUGM', typ.StringType()), ('STEROIDS', typ.StringType()), ('ANTIBIOTICS', typ.StringType()), ('ANESTHESIA', typ.StringType()), ('DELIV_METHOD_RECODE_COMB', typ.StringType()), ('ATTENDANT_BIRTH', typ.StringType()), ('APGAR_5', typ.IntegerType()), ('APGAR_5_RECODE', typ.StringType()), ('APGAR_10', typ.IntegerType()), ('APGAR_10_RECODE', typ.StringType()), ('INFANT_SEX', typ.StringType()), ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()), ('INFANT_WEIGHT_GRAMS', typ.IntegerType()), ('INFANT_ASSIST_VENTI', typ.StringType()), ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()), ('INFANT_NICU_ADMISSION', typ.StringType()), ('INFANT_SURFACANT', typ.StringType()), ('INFANT_ANTIBIOTICS', typ.StringType()), ('INFANT_SEIZURES', typ.StringType()), ('INFANT_NO_ABNORMALITIES', typ.StringType()), ('INFANT_ANCEPHALY', typ.StringType()), ('INFANT_MENINGOMYELOCELE', typ.StringType()), ('INFANT_LIMB_REDUCTION', typ.StringType()), ('INFANT_DOWN_SYNDROME', typ.StringType()), ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()), ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()), ('INFANT_BREASTFED', typ.StringType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) births = spark.read.csv('/Users/*******/Documents/PySpark_data/births_train.csv.gz', header=True, schema=schema) |
Category型(Y, N, U)を数値に変換する辞書作成。
1 2 3 4 5 6 7 |
recode_dictionary = { 'YNU': { 'Y': 1, 'N': 0, 'U': 0 } } |
解析の目標は、INFANT_ALIVE_AT_REPORTが1になるか、0になるかを予測する。そのために、乳児自身のデータは削除して、両親、出生地に関するデータ17フィールドのみを用いて、生存確率を予想する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
selected_features = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_trimmed = births.select(selected_features) |
母親の喫煙歴等で1-97は本数、98はそれ以上、99は不明ということこことは不明を0とみなす。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import pyspark.sql.functions as func def recode(col, key): return recode_dictionary[key][col] def correct_cig(feat): return func \ .when(func.col(feat) != 99, func.col(feat))\ .otherwise(0) rec_integer = func.udf(recode, typ.IntegerType()) births_transformed = births_trimmed \ .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\ .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\ .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\ .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI')) cols = [(col.name, col.dataType) for col in births_trimmed.schema] YNU_cols = [] for i, s in enumerate(cols): if s[1] == typ.StringType(): dis = births.select(s[0]) \ .distinct() \ .rdd \ .map(lambda row: row[0]) \ .collect() if 'Y' in dis: YNU_cols.append(s[0]) births.select([ 'INFANT_NICU_ADMISSION', rec_integer( 'INFANT_NICU_ADMISSION', func.lit('YNU') ) \ .alias('INFANT_NICU_ADMISSION_RECODE')] ).take(5) [Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1), Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1), Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0), Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0), Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
exprs_YNU = [ rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x for x in births_transformed.columns ] births_transformed = births_transformed.select(exprs_YNU) births_transformed.select(YNU_cols[-5:]).show(5) +------------+-------------+------------+-------------+------------------+ |DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM| +------------+-------------+------------+-------------+------------------+ | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 0| | 0| 0| 0| 0| 1| | 0| 0| 0| 0| 0| +------------+-------------+------------+-------------+------------------+ only showing top 5 rows |
記述統計を求める
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import pyspark.mllib.stat as st import numpy as np numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE', 'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI', 'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN' ] numeric_rdd = births_transformed\ .select(numeric_cols)\ .rdd \ .map(lambda row: [e for e in row]) mllib_stats = st.Statistics.colStats(numeric_rdd) for col, m, v in zip(numeric_cols, mllib_stats.mean(), mllib_stats.variance()): print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v))) MOTHER_AGE_YEARS: 28.30 6.08 FATHER_COMBINED_AGE: 44.55 27.55 CIG_BEFORE: 1.43 5.18 CIG_1_TRI: 0.91 3.83 CIG_2_TRI: 0.70 3.31 CIG_3_TRI: 0.58 3.11 MOTHER_HEIGHT_IN: 65.12 6.45 MOTHER_PRE_WEIGHT: 214.50 210.21 MOTHER_DELIVERY_WEIGHT: 223.63 180.01 MOTHER_WEIGHT_GAIN: 30.74 26.23 categorical_cols = [e for e in births_transformed.columns if e not in numeric_cols] categorical_rdd = births_transformed\ .select(categorical_cols)\ .rdd \ .map(lambda row: [e for e in row]) for i, col in enumerate(categorical_cols): agg = categorical_rdd \ .groupBy(lambda row: row[i]) \ .map(lambda row: (row[0], len(row[1]))) print(col, sorted(agg.collect(), key=lambda el: el[1], reverse=True)) INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)] BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)] DIABETES_PRE [(0, 44881), (1, 548)] DIABETES_GEST [(0, 43451), (1, 1978)] HYP_TENS_PRE [(0, 44348), (1, 1081)] HYP_TENS_GEST [(0, 43302), (1, 2127)] PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)] |
各項目の相関を求める。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
corrs = st.Statistics.corr(numeric_rdd) for i, el in enumerate(corrs > 0.5): correlated = [ (numeric_cols[j], corrs[i][j]) for j, e in enumerate(el) if e == 1.0 and j != i] if len(correlated) > 0: for e in correlated: print('{0}-to-{1}: {2:.2f}' \ .format(numeric_cols[i], e[0], e[1])) CIG_BEFORE-to-CIG_1_TRI: 0.83 CIG_BEFORE-to-CIG_2_TRI: 0.72 CIG_BEFORE-to-CIG_3_TRI: 0.62 CIG_1_TRI-to-CIG_BEFORE: 0.83 CIG_1_TRI-to-CIG_2_TRI: 0.87 CIG_1_TRI-to-CIG_3_TRI: 0.76 CIG_2_TRI-to-CIG_BEFORE: 0.72 CIG_2_TRI-to-CIG_1_TRI: 0.87 CIG_2_TRI-to-CIG_3_TRI: 0.89 CIG_3_TRI-to-CIG_BEFORE: 0.62 CIG_3_TRI-to-CIG_1_TRI: 0.76 CIG_3_TRI-to-CIG_2_TRI: 0.89 MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54 MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65 MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54 MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60 MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65 MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60 |
相関の強いファクターを統一して、以下の12フィールドを選択。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
features_to_keep = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_1_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_transformed = births_transformed.select([e for e in features_to_keep]) |
統計学的検定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import pyspark.mllib.linalg as ln for cat in categorical_cols[1:]: agg = births_transformed \ .groupby('INFANT_ALIVE_AT_REPORT') \ .pivot(cat) \ .count() agg_rdd = agg \ .rdd\ .map(lambda row: (row[1:])) \ .flatMap(lambda row: [0 if e == None else e for e in row]) \ .collect() row_length = len(agg.collect()[0]) - 1 agg = ln.Matrices.dense(row_length, 2, agg_rdd) test = st.Statistics.chiSqTest(agg) print(cat, round(test.pValue, 4)) BIRTH_PLACE 0.0 DIABETES_PRE 0.0 DIABETES_GEST 0.0 HYP_TENS_PRE 0.0 HYP_TENS_GEST 0.0 PREV_BIRTH_PRETERM 0.0 |
これで最終の解析用データ・セットが完成。
DataFrameをLabelsPointsのRDDへ変換する。
BIRTH_PLACEが文字列のまま残っているので、ハッシュ化を行う。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import pyspark.mllib.feature as ft import pyspark.mllib.regression as reg hashing = ft.HashingTF(7) births_hashed = births_transformed \ .rdd \ .map(lambda row: [ list(hashing.transform(row[1]).toArray()) if col == 'BIRTH_PLACE' else row[i] for i, col in enumerate(features_to_keep)]) \ .map(lambda row: [[e] if type(e) == int else e for e in row]) \ .map(lambda row: [item for sublist in row for item in sublist]) \ .map(lambda row: reg.LabeledPoint( row[0], ln.Vectors.dense(row[1:])) ) |
いよいよ機械学習
1 2 3 4 5 6 7 8 9 10 11 12 |
births_hashed.take(10) [LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,29.0,99.0,0.0,99.0,999.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,22.0,29.0,0.0,65.0,180.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,38.0,40.0,0.0,63.0,155.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,39.0,42.0,0.0,60.0,128.0,0.0,0.0,0.0,0.0,1.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,18.0,99.0,4.0,61.0,110.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,32.0,37.0,0.0,66.0,150.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,22.0,25.0,0.0,68.0,155.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,25.0,26.0,0.0,64.0,136.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,26.0,32.0,0.0,64.0,140.0,0.0,0.0,0.0,0.0,0.0]), LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,39.0,66.0,0.0,65.0,140.0,0.0,0.0,0.0,0.0,0.0])] |
データをトレーニング用60%とテスト評価用40%に分ける。
1 |
births_train, births_test = births_hashed.randomSplit([0.6, 0.4]) |
pyspark.mllib.classificationのLogisticRegressionWithLBFGSをインポートして、
繰り返し10回で、LogisticRegressionWithLBFGS()にトレーニングデータをセットする。
1 2 3 4 5 |
from pyspark.mllib.classification \ import LogisticRegressionWithLBFGS LR_Model = LogisticRegressionWithLBFGS \ .train(births_train, iterations=10) |
次に、テスト評価用データをモデルにアプライ。
1 2 3 4 5 6 7 8 9 10 |
LR_results = ( births_test.map(lambda row: row.label) \ .zip(LR_Model \ .predict(births_test\ .map(lambda row: row.features))) ).map(lambda row: (row[0], row[1] * 1.0)) LR_results.take(5) [(0.0, 0.0), (0.0, 1.0), (0.0, 1.0), (0.0, 1.0), (0.0, 1.0)] |
1 2 3 4 5 6 7 8 9 10 11 |
import pyspark.mllib.evaluation as ev LR_evaluation = ev.BinaryClassificationMetrics(LR_results) print('Area under PR: {0:.2f}' \ .format(LR_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(LR_evaluation.areaUnderROC)) LR_evaluation.unpersist() Area under PR: 0.85 Area under ROC: 0.63 |
Selecting only the most predictable features
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
selector = ft.ChiSqSelector(4).fit(births_train) topFeatures_train = ( births_train.map(lambda row: row.label) \ .zip(selector \ .transform(births_train \ .map(lambda row: row.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1])) topFeatures_test = ( births_test.map(lambda row: row.label) \ .zip(selector \ .transform(births_test \ .map(lambda row: row.features))) ).map(lambda row: reg.LabeledPoint(row[0], row[1])) |
Random Forest in Spark
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
from pyspark.mllib.tree import RandomForest RF_model = RandomForest \ .trainClassifier(data=topFeatures_train, numClasses=2, categoricalFeaturesInfo={}, numTrees=6, featureSubsetStrategy='all', seed=666) RF_results = ( topFeatures_test.map(lambda row: row.label) \ .zip(RF_model \ .predict(topFeatures_test \ .map(lambda row: row.features))) ) RF_evaluation = ev.BinaryClassificationMetrics(RF_results) print('Area under PR: {0:.2f}' \ .format(RF_evaluation.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(RF_evaluation.areaUnderROC)) RF_evaluation.unpersist() Area under PR: 0.83 Area under ROC: 0.62 LR_Model_2 = LogisticRegressionWithLBFGS \ .train(topFeatures_train, iterations=10) LR_results_2 = ( topFeatures_test.map(lambda row: row.label) \ .zip(LR_Model_2 \ .predict(topFeatures_test \ .map(lambda row: row.features))) ).map(lambda row: (row[0], row[1] * 1.0)) LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2) print('Area under PR: {0:.2f}' \ .format(LR_evaluation_2.areaUnderPR)) print('Area under ROC: {0:.2f}' \ .format(LR_evaluation_2.areaUnderROC)) LR_evaluation_2.unpersist() Area under PR: 0.87 Area under ROC: 0.63 |