Published on 2018年10月23 Mastering Machine Learning with Spark 2.xを学習する(続編)。Giggs粒子の判定モデル構築へ
———————————————————–
メモリ割り当てを8Gに増やして、再度、一からやり直す。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
MacBook-Pro-5:~ $ ${SPARK_HOME}/bin/spark-shell --master localr-memory 8g --executor-memory 8g --packages "$SPARK_PACKAGES" Ivy Default Cache set to: /Users/******/.ivy2/cache The jars for the packages stored in: /Users/******/.ivy2/jars :: loading settings :: url = jar:file:/opt/spark-2.1.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml ai.h2o#sparkling-water-core_2.11 added as a dependency ai.h2o#sparkling-water-repl_2.11 added as a dependency ai.h2o#sparkling-water-ml_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 ....... --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 77 | 0 | 0 | 11 || 62 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 62 already retrieved (0kB/42ms) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/10/23 08:46:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/10/23 08:46:52 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://***.***.***.***:4040 Spark context available as 'sc' (master = local[*], app id = local-1540252005964). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg._ scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.distributed.RowMatrix scala> import org.apache.spark.mllib.evaluation._ import org.apache.spark.mllib.evaluation._ scala> import org.apache.spark.mllib.tree._ import org.apache.spark.mllib.tree._ val rawData = sc.textFile("hdfs://localhost/user/*******/ds/HIGGS.csv") rawData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/*******/ds/HIGGS.csv MapPartitionsRDD[1] at textFile at <console>:36 scala> println(s"Number of rows: ${rawData.count}") Number of rows: 11000000 scala> println("Rows") Rows scala> println(rawData.take(2).mkString("\n")) 1.000000000000000000e+00,8.692932128906250000e-01,-6.350818276405334473e-01,2.256902605295181274e-01,3.274700641632080078e-01,-6.899932026863098145e-01,7.542022466659545898e-01,-2.485731393098831177e-01,-1.092063903808593750e+00,0.000000000000000000e+00,1.374992132186889648e+00,-6.536741852760314941e-01,9.303491115570068359e-01,1.107436060905456543e+00,1.138904333114624023e+00,-1.578198313713073730e+00,-1.046985387802124023e+00,0.000000000000000000e+00,6.579295396804809570e-01,-1.045456994324922562e-02,-4.576716944575309753e-02,3.101961374282836914e+00,1.353760004043579102e+00,9.795631170272827148e-01,9.780761599540710449e-01,9.200048446655273438e-01,7.216574549674987793e-01,9.887509346008300781e-01,8.766783475875854492e-01 1.000000000000000000e+00,9.075421094894409180e-01,3.291472792625427246e-01,3.594118654727935791e-01,1.497969865798950195e+00,-3.130095303058624268e-01,1.095530629158020020e+00,-5.575249195098876953e-01,-1.588229775428771973e+00,2.173076152801513672e+00,8.125811815261840820e-01,-2.136419266462326050e-01,1.271014571189880371e+00,2.214872121810913086e+00,4.999939501285552979e-01,-1.261431813240051270e+00,7.321561574935913086e-01,0.000000000000000000e+00,3.987008929252624512e-01,-1.138930082321166992e+00,-8.191101951524615288e-04,0.000000000000000000e+00,3.022198975086212158e-01,8.330481648445129395e-01,9.856996536254882812e-01,9.780983924865722656e-01,7.797321677207946777e-01,9.923557639122009277e-01,7.983425855636596680e-01 scala> val data = rawData.map(line => line.split(',').map(_.toDouble)) data: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[8] at map at <console>:38 scala> val response: RDD[Int] = data.map(row => row(0).toInt) response: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:40 scala> val features: RDD[Vector] = data.map(line => Vectors.dense(line.slice(1, line.size))) features: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[10] at map at <console>:40 scala> val featuresMatrix = new RowMatrix(features) featuresMatrix: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@10bc326 scala> val featuresSummary = featuresMatrix.computeColumnSummaryStatistics() featuresSummary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@4f1bfab4 scala> import com.packtpub.mmlwspark.utils.Tabulizer.table import com.packtpub.mmlwspark.utils.Tabulizer.table scala> println(s"Higgs Features Mean Values = ${table(featuresSummary.mean, 8)}") Higgs Features Mean Values = +-----+------+------+------+-----+-----+------+-----+ |0.991|-0.000|-0.000| 0.999|0.000|0.991|-0.000|0.000| |1.000| 0.993|-0.000|-0.000|1.000|0.992| 0.000|0.000| |1.000| 0.986|-0.000| 0.000|1.000|1.034| 1.025|1.051| |1.010| 0.973| 1.033| 0.960| -| -| -| -| +-----+------+------+------+-----+-----+------+-----+ scala> println(s"Higgs Features Variance Values = ${table(featuresSummary.variance, 8)}") Higgs Features Variance Values = +-----+-----+-----+-----+-----+-----+-----+-----+ |0.320|1.018|1.013|0.360|1.013|0.226|1.019|1.012| |1.056|0.250|1.019|1.012|1.101|0.238|1.018|1.013| |1.425|0.256|1.015|1.013|1.961|0.455|0.145|0.027| |0.158|0.276|0.133|0.098| -| -| -| -| +-----+-----+-----+-----+-----+-----+-----+-----+ scala> val nonZeros = featuresSummary.numNonzeros nonZeros: org.apache.spark.mllib.linalg.Vector = [1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,5605389.0,1.1E7,1.1E7,1.1E7,5476088.0,1.1E7,1.1E7,1.1E7,4734760.0,1.1E7,1.1E7,1.1E7,3869383.0,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7] scala> println(s"Non-zero values count per column: ${table(nonZeros, cols = 8, format = "%.0f")}") Non-zero values count per column: +--------+--------+--------+--------+--------+--------+--------+--------+ |11000000|11000000|11000000|11000000|11000000|11000000|11000000|11000000| | 5605389|11000000|11000000|11000000| 5476088|11000000|11000000|11000000| | 4734760|11000000|11000000|11000000| 3869383|11000000|11000000|11000000| |11000000|11000000|11000000|11000000| -| -| -| -| +--------+--------+--------+--------+--------+--------+--------+--------+ scala> val numRows = featuresMatrix.numRows numRows: Long = 11000000 scala> val numCols = featuresMatrix.numCols numCols: Long = 28 scala> val colsWithZeros = nonZeros colsWithZeros: org.apache.spark.mllib.linalg.Vector = [1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,5605389.0,1.1E7,1.1E7,1.1E7,5476088.0,1.1E7,1.1E7,1.1E7,4734760.0,1.1E7,1.1E7,1.1E7,3869383.0,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7] scala> val sparsity = nonZeros.toArray.sum / (numRows * numCols) sparsity: Double = 0.9210572077922078 scala> println(f"Data sparsity: ${sparsity}%.2f") Data sparsity: 0.92 scala> val responseValues = response.distinct.collect responseValues: Array[Int] = Array(0, 1) scala> println(s"Response values: ${responseValues.mkString(", ")}") Response values: 0, 1 scala> val responseDistribution = response.map(v => (v,1)).countByKey responseDistribution: scala.collection.Map[Int,Long] = Map(0 -> 5170877, 1 -> 5829123) scala> println(s"Response distribution:\n${table(responseDistribution)}") Response distribution: +-------+-------+ | 0| 1| +-------+-------+ |5170877|5829123| +-------+-------+ scala> val higgs = response.zip(features).map { case (response, features) => LabeledPoint(response, features) } higgs: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[22] at map at <console>:45 scala> higgs.setName("higgs").cache() res12: higgs.type = higgs MapPartitionsRDD[22] at map at <console>:45 scala> val trainTestSplits = higgs.randomSplit(Array(0.8, 0.2)) trainTestSplits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[23] at randomSplit at <console>:47, MapPartitionsRDD[24] at randomSplit at <console>:47) scala> val (trainingData, testData) = (trainTestSplits(0), trainTestSplits(1)) trainingData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[23] at randomSplit at <console>:47 testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[24] at randomSplit at <console>:47 |
ここでは、ひとまずH2Oは、置いていいて、先にSaprk Mllibを用いて、決定木モデル構築へ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
scala> println("""| | | === Tree Model === | |""".stripMargin) === Tree Model === scala> val dtNumClasses = 2 dtNumClasses: Int = 2 scala> val dtCategoricalFeaturesInfo = Map[Int, Int]() dtCategoricalFeaturesInfo: scala.collection.immutable.Map[Int,Int] = Map() scala> val dtImpurity = "gini" dtImpurity: String = gini scala> val dtMaxDepth = 5 dtMaxDepth: Int = 5 scala> val dtMaxBins = 10 dtMaxBins: Int = 10 scala> val dtreeModel = DecisionTree.trainClassifier(trainingData, | dtNumClasses, | dtCategoricalFeaturesInfo, | dtImpurity, | dtMaxDepth, | dtMaxBins) 18/10/23 08:55:41 WARN Executor: 1 block locks were not released by TID = 369: [rdd_22_0] dtreeModel: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 5 with 63 nodes scala> println(s"Decision Tree Model:\n${dtreeModel.toDebugString}") Decision Tree Model: DecisionTreeModel classifier of depth 5 with 63 nodes If (feature 25 <= 1.0449790954589844) If (feature 25 <= 0.6215437650680542) If (feature 27 <= 0.8684858083724976) If (feature 5 <= 0.8952773809432983) If (feature 26 <= 0.7845427989959717) Predict: 0.0 Else (feature 26 > 0.7845427989959717) Predict: 1.0 Else (feature 5 > 0.8952773809432983) If (feature 5 <= 1.5556738376617432) Predict: 1.0 Else (feature 5 > 1.5556738376617432) Predict: 1.0 Else (feature 27 > 0.8684858083724976) If (feature 22 <= 1.0426501035690308) If (feature 24 <= 1.0874927043914795) Predict: 0.0 Else (feature 24 > 1.0874927043914795) Predict: 0.0 Else (feature 22 > 1.0426501035690308) If (feature 9 <= 1.2842156887054443) Predict: 0.0 Else (feature 9 > 1.2842156887054443) Predict: 1.0 Else (feature 25 > 0.6215437650680542) If (feature 26 <= 0.7845427989959717) If (feature 5 <= 0.8117315769195557) If (feature 22 <= 1.0426501035690308) Predict: 0.0 Else (feature 22 > 1.0426501035690308) Predict: 1.0 Else (feature 5 > 0.8117315769195557) If (feature 27 <= 0.7879425287246704) Predict: 1.0 Else (feature 27 > 0.7879425287246704) Predict: 0.0 Else (feature 26 > 0.7845427989959717) If (feature 27 <= 0.9255462884902954) If (feature 26 <= 0.8456243276596069) Predict: 1.0 Else (feature 26 > 0.8456243276596069) Predict: 1.0 Else (feature 27 > 0.9255462884902954) If (feature 22 <= 1.0426501035690308) Predict: 1.0 Else (feature 22 > 1.0426501035690308) Predict: 1.0 Else (feature 25 > 1.0449790954589844) If (feature 22 <= 1.0426501035690308) If (feature 27 <= 1.0037001371383667) If (feature 5 <= 1.09553062915802) If (feature 21 <= 0.9807248115539551) Predict: 0.0 Else (feature 21 > 0.9807248115539551) Predict: 0.0 Else (feature 5 > 1.09553062915802) If (feature 25 <= 1.5675833225250244) Predict: 1.0 Else (feature 25 > 1.5675833225250244) Predict: 0.0 Else (feature 27 > 1.0037001371383667) If (feature 24 <= 1.2299858331680298) If (feature 24 <= 1.0874927043914795) Predict: 0.0 Else (feature 24 > 1.0874927043914795) Predict: 0.0 Else (feature 24 > 1.2299858331680298) If (feature 25 <= 1.5675833225250244) Predict: 0.0 Else (feature 25 > 1.5675833225250244) Predict: 0.0 Else (feature 22 > 1.0426501035690308) If (feature 25 <= 1.5675833225250244) If (feature 5 <= 1.09553062915802) If (feature 27 <= 1.1181418895721436) Predict: 1.0 Else (feature 27 > 1.1181418895721436) Predict: 0.0 Else (feature 5 > 1.09553062915802) If (feature 3 <= 1.7546443939208984) Predict: 1.0 Else (feature 3 > 1.7546443939208984) Predict: 0.0 Else (feature 25 > 1.5675833225250244) If (feature 0 <= 1.3568294048309326) If (feature 3 <= 1.7546443939208984) Predict: 0.0 Else (feature 3 > 1.7546443939208984) Predict: 0.0 Else (feature 0 > 1.3568294048309326) If (feature 24 <= 1.0874927043914795) Predict: 0.0 Else (feature 24 > 1.0874927043914795) Predict: 0.0 scala> val treeLabelAndPreds = testData.map { point => | val prediction = dtreeModel.predict(point.features) | (point.label.toInt, prediction.toInt) | } treeLabelAndPreds: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[49] at map at <console>:51 scala> val treeTestErr = treeLabelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() treeTestErr: Double = 0.33241460209783535 scala> println(f"Tree Model: Test Error = ${treeTestErr}%.3f") Tree Model: Test Error = 0.332 scala> val cm = treeLabelAndPreds.combineByKey( | createCombiner = (label: Int) => if (label == 0) (1,0) else (0,1), | mergeValue = (v: (Int,Int), label: Int) => if (label == 0) (v._1 +1, v._2) else (v._1, v._2 + 1), | mergeCombiners = (v1: (Int,Int), v2: (Int,Int)) => (v1._1 + v2._1, v1._2 + v2._2)).collect cm: Array[(Int, (Int, Int))] = Array((0,(616950,417940)), (1,(313176,851344))) scala> val (tn, tp, fn, fp) = (cm(0)._2._1, cm(1)._2._2, cm(1)._2._1, cm(0)._2._2) tn: Int = 616950 tp: Int = 851344 fn: Int = 313176 fp: Int = 417940 scala> println(f"""Confusion Matrix | | ${0}%5d ${1}%5d ${"Err"}%10s | |0 ${tn}%5d ${fp}%5d ${tn+fp}%5d ${fp.toDouble/(tn+fp)}%5.4f | |1 ${fn}%5d ${tp}%5d ${fn+tp}%5d ${fn.toDouble/(fn+tp)}%5.4f | | ${tn+fn}%5d ${fp+tp}%5d ${tn+fp+fn+tp}%5d ${(fp+fn).toDouble/(tn+fp+fn+tp)}%5.4f | |""".stripMargin) Confusion Matrix 0 1 Err 0 616950 417940 1034890 0.4038 1 313176 851344 1164520 0.2689 930126 1269284 2199410 0.3324 scala> type Predictor = { | def predict(features: Vector): Double | } defined type alias Predictor scala> def computeMetrics(model: Predictor, data: RDD[LabeledPoint]): BinaryClassificationMetrics = { | val predAndLabels = data.map(newData => (model.predict(newData.features), newData.label)) | new BinaryClassificationMetrics(predAndLabels) | } warning: there was one feature warning; re-run with -feature for details computeMetrics: (model: Predictor, data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])org.apache.spark.mllib.evaluation.BinaryClassificationMetrics scala> val treeMetrics = computeMetrics(dtreeModel, testData) treeMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@31092e1f scala> println(f"Tree Model: AUC on Test Data = ${treeMetrics.areaUnderROC()}%.3f") Tree Model: AUC on Test Data = 0.664 |
続いて、Random Forrest Model構築へ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
scala> println("""| | | === Random Forest Model === | |""".stripMargin) === Random Forest Model === scala> val numClasses = 2 numClasses: Int = 2 scala> val categoricalFeaturesInfo = Map[Int, Int]() categoricalFeaturesInfo: scala.collection.immutable.Map[Int,Int] = Map() scala> val numTrees = 10 numTrees: Int = 10 scala> val featureSubsetStrategy = "auto" featureSubsetStrategy: String = auto scala> val impurity = "gini" impurity: String = gini scala> val maxDepth = 5 maxDepth: Int = 5 scala> val maxBins = 10 maxBins: Int = 10 scala> val seed = 42 seed: Int = 42 scala> val rfModel = RandomForest.trainClassifier(trainingData, | numClasses, | categoricalFeaturesInfo, | numTrees, | featureSubsetStrategy, | impurity, | maxDepth, | maxBins, | seed) 18/10/23 09:02:37 WARN Executor: 1 block locks were not released by TID = 1549: [rdd_22_0] rfModel: org.apache.spark.mllib.tree.model.RandomForestModel = TreeEnsembleModel classifier with 10 trees scala> def computeError(model: Predictor, data: RDD[LabeledPoint]): Double = { | val labelAndPreds = data.map { point => | val prediction = model.predict(point.features) | (point.label, prediction) | } | labelAndPreds.filter(r => r._1 != r._2).count.toDouble/data.count | } warning: there was one feature warning; re-run with -feature for details computeError: (model: Predictor, data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])Double scala> val rfTestErr = computeError(rfModel, testData) rfTestErr: Double = 0.33492209274305385 scala> println(f"RF Model: Test Error = ${rfTestErr}%.3f") RF Model: Test Error = 0.335 scala> val rfMetrics = computeMetrics(rfModel, testData) rfMetrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@1af6bfe7 scala> println(f"RF Model: AUC on Test Data = ${rfMetrics.areaUnderROC}%.3f") RF Model: AUC on Test Data = 0.657 scala> val rfGrid = | for ( | gridNumTrees <- Array(15, 20); | gridImpurity <- Array("entropy", "gini"); | gridDepth <- Array(20, 30); | gridBins <- Array(20, 50)) yield { | val gridModel = RandomForest.trainClassifier(trainingData, 2, Map[Int, Int](), gridNumTrees, "auto", gridImpurity, gridDepth, gridBins) | val gridAUC = computeMetrics(gridModel, testData).areaUnderROC | val gridErr = computeError(gridModel, testData) | ((gridNumTrees, gridImpurity, gridDepth, gridBins), gridAUC, gridErr) | } 18/10/23 09:07:53 WARN Executor: 1 block locks were not released by TID = 2609: [rdd_22_0] ........................ |