Decision Treeに戻る。
ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local[*] --driver-memory 6g 2018-10-09 12:49:43 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://************:4040 Spark context available as 'sc' (master = local[*], app id = local-1539056991319). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type : help for more information. scala> import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg._ scala> import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.evaluation._ scala> val rawData = sc.textFile("hdfs://localhost/user/*******/ds/covtype.data") rawData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/*******/ds/covtype.data MapPartitionsRDD[7] at textFile at <console>:33 scala> rawData.first res9: String = 2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5 scala> val data = rawData.map { line => | val values = line.split(',').map(_.toDouble) | val featureVector = Vectors.dense(values.init) | val label = values.last - 1 | LabeledPoint(label, featureVector) | } data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at map at <console>:34 scala> data.first res10: org.apache.spark.mllib.regression.LabeledPoint = (4.0,[2596.0,51.0,3.0,258.0,0.0,510.0,221.0,232.0,148.0,6279.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]) scala> val Array(trainData, cvData, testData) = | data.randomSplit(Array(0.8, 0.1, 0.1)) trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[9] at randomSplit at <console>:35 cvData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[10] at randomSplit at <console>:35 testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[11] at randomSplit at <console>:35 scala> trainData.cache() res11: trainData.type = MapPartitionsRDD[9] at randomSplit at <console>:35 scala> cvData.cache() res12: cvData.type = MapPartitionsRDD[10] at randomSplit at <console>:35 scala> testData.cache() res13: testData.type = MapPartitionsRDD[11] at randomSplit at <console>:35 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
scala> import org.apache.spark.mllib.evaluation._ import org.apache.spark.mllib.evaluation._ scala> import org.apache.spark.mllib.tree._ import org.apache.spark.mllib.tree._ scala> import org.apache.spark.mllib.tree.model._ import org.apache.spark.mllib.tree.model._ scala> import org.apache.spark.rdd._ import org.apache.spark.rdd._ scala> def getMetrics(model: DecisionTreeModel, data: RDD[LabeledPoint]): | MulticlassMetrics = { | val predictionsAndLabels = data.map(example => | (model.predict(example.features), example.label) | ) | new MulticlassMetrics(predictionsAndLabels) | } getMetrics: (model: org.apache.spark.mllib.tree.model.DecisionTreeModel, data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])org.apache.spark.mllib.evaluation.MulticlassMetrics scala> val model = DecisionTree.trainClassifier( | trainData, 7, Map[Int,Int](), "gini", 4, 100) model: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 4 with 31 nodes scala> val metrics = getMetrics(model, cvData) metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@39d34c44 scala> metrics.confusionMatrix res14: org.apache.spark.mllib.linalg.Matrix = 14221.0 6399.0 7.0 0.0 0.0 0.0 367.0 5686.0 22293.0 346.0 18.0 0.0 5.0 38.0 0.0 490.0 3041.0 74.0 0.0 12.0 0.0 0.0 1.0 153.0 130.0 0.0 0.0 0.0 0.0 906.0 31.0 0.0 0.0 0.0 0.0 0.0 483.0 1166.0 33.0 0.0 55.0 0.0 1125.0 34.0 0.0 0.0 0.0 0.0 882.0 scala> metrics.precision warning: there was one deprecation warning; re-run with -deprecation for details res15: Double = 0.7004276156976343 scala> (0 until 7).map( | cat => (metrics.precision(cat), metrics.recall(cat)) | ).foreach(println) (0.6761601369341955,0.6773840144803277) (0.7283865908645364,0.7853519340519974) (0.6410202360876898,0.8407520044235555) (0.5098039215686274,0.45774647887323944) (0.0,0.0) (0.7638888888888888,0.03166378814047208) (0.6853146853146853,0.43214110730034294) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> import org.apache.spark.rdd._ import org.apache.spark.rdd._ scala> def classProbabilities(data: RDD[LabeledPoint]): Array[Double] = { | val countsByCategory = data.map(_.label).countByValue() | val counts = countsByCategory.toArray.sortBy(_._1).map(_._2) | counts.map(_.toDouble / counts.sum) | } classProbabilities: (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])Array[Double] scala> val trainPriorProbabilities = classProbabilities(trainData) trainPriorProbabilities: Array[Double] = Array(0.3653828791064219, 0.487007347598595, 0.061534525974556414, 0.004690596879527759, 0.016391292945187517, 0.029790234443856867, 0.03520312305185453) scala> val cvPriorProbabilities = classProbabilities(cvData) cvPriorProbabilities: Array[Double] = Array(0.36199048210221396, 0.489447548106766, 0.062366370094489275, 0.00489688944065108, 0.016156286640457963, 0.02995034140285537, 0.03519208221256638) scala> trainPriorProbabilities.zip(cvPriorProbabilities).map { | case (trainProb, cvProb) => trainProb * cvProb | }.sum res19: Double = 0.37688625242646306 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> val evaluations = | for (impurity <- Array("gini", "entropy"); | depth <- Array(1, 20); | bins <- Array(10, 300)) | yield { | val model = DecisionTree.trainClassifier( | trainData, 7, Map[Int,Int](), impurity, depth, bins) | val predictionsAndLabels = cvData.map(example => | (model.predict(example.features), example.label) | ) | val accuracy = | new MulticlassMetrics(predictionsAndLabels).precision | ((impurity, depth, bins), accuracy) | } warning: there was one deprecation warning; re-run with -deprecation for details evaluations: Array[((String, Int, Int), Double)] = Array(((gini,1,10),0.633216083867853), ((gini,1,300),0.6333885095523829), ((gini,20,10),0.8893371956686668), ((gini,20,300),0.9055624525829368), ((entropy,1,10),0.489447548106766), ((entropy,1,300),0.489447548106766), ((entropy,20,10),0.8945961790468309), ((entropy,20,300),0.9120111731843575)) scala> evaluations.sortBy(_._2).reverse.foreach(println) ((entropy,20,300),0.9120111731843575) ((gini,20,300),0.9055624525829368) ((entropy,20,10),0.8945961790468309) ((gini,20,10),0.8893371956686668) ((gini,1,300),0.6333885095523829) ((gini,1,10),0.633216083867853) ((entropy,1,300),0.489447548106766) ((entropy,1,10),0.489447548106766) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> val model = DecisionTree.trainClassifier( | trainData.union(cvData), 7, Map[Int,Int](), "entropy", 20, 300) model: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 20 with 30175 nodes scala> val metrics = getMetrics(model, cvData) metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@742795af scala> metrics.confusionMatrix res21: org.apache.spark.mllib.linalg.Matrix = 19704.0 1272.0 0.0 0.0 3.0 1.0 14.0 1143.0 27177.0 16.0 0.0 30.0 17.0 3.0 0.0 26.0 3559.0 0.0 1.0 31.0 0.0 0.0 0.0 3.0 280.0 0.0 1.0 0.0 1.0 102.0 0.0 0.0 834.0 0.0 0.0 1.0 19.0 20.0 0.0 1.0 1696.0 0.0 61.0 3.0 0.0 0.0 0.0 0.0 1977.0 scala> metrics.precision warning: there was one deprecation warning; re-run with -deprecation for details res22: Double = 0.952255327953652 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> val data = rawData.map { line => | val values = line.split(',').map(_.toDouble) | val wilderness = values.slice(10,14).indexOf(1.0).toDouble | val soil = values.slice(14,54).indexOf(1.0).toDouble | val featureVector = | Vectors.dense(values.slice(0,10) :+ wilderness :+ soil) | val label = values.last - 1 | LabeledPoint(label, featureVector) | } data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[515] at map at <console>:49 scala> val evaluations = | for (impurity <- Array("gini", "entropy"); | depth <- Array(10, 20, 30); | bins <- Array(40, 300)) | yield { | val model = DecisionTree.trainClassifier( | trainData, 7, Map(10 -> 4, 11 -> 40), | impurity, depth, bins) | val trainAccuracy = getMetrics(model, trainData).precision | val cvAccuracy = getMetrics(model, cvData).precision | ((impurity, depth, bins), (trainAccuracy, cvAccuracy)) | } warning: there were two deprecation warnings; re-run with -deprecation for details evaluations: Array[((String, Int, Int), (Double, Double))] = Array(((gini,10,40),(0.7802298435464524,0.7775191392509828)), ((gini,10,300),(0.780801657831491,0.7759845506586661)), ((gini,20,40),(0.9410730331523305,0.9047003241602869)), ((gini,20,300),(0.9413309944839269,0.9055624525829368)), ((gini,30,40),(0.9968679194988671,0.9313228498517139)), ((gini,30,300),(0.9965626652564781,0.9345127250155183)), ((entropy,10,40),(0.7762271435511817,0.7731567694323747)), ((entropy,10,300),(0.7747524646055556,0.771053176081109)), ((entropy,20,40),(0.9522728542991405,0.9103214014759639)), ((entropy,20,300),(0.9537991255110859,0.9120111731843575)), ((entropy,30,40),(0.998923011440585,0.935736947375681)), ((entropy,30,300),(0.9989488075737447,0.9389268225394855))) |
最後にRandomForest
1 2 3 4 5 |
scala> val forest = RandomForest.trainClassifier( | trainData, 7, Map(10 -> 4, 11 -> 40), 20, | "auto", "entropy", 30, 300) forest: org.apache.spark.mllib.tree.model.RandomForestModel = TreeEnsembleModel classifier with 20 trees |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
scala> def getMetrics2(forest: RandomForestModel, data: RDD[LabeledPoint]): | MulticlassMetrics = { | val predictionsAndLabels = data.map(example => | (forest.predict(example.features), example.label) | ) | new MulticlassMetrics(predictionsAndLabels) | } getMetrics2: (forest: org.apache.spark.mllib.tree.model.RandomForestModel, data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])org.apache.spark.mllib.evaluation.MulticlassMetrics scala> val metrics = getMetrics2(forest, cvData) metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@4f427f4e scala> metrics.confusionMatrix res5: org.apache.spark.mllib.linalg.Matrix = 17330.0 3545.0 5.0 0.0 1.0 1.0 147.0 1655.0 26418.0 192.0 1.0 16.0 39.0 15.0 0.0 187.0 3265.0 14.0 0.0 44.0 0.0 0.0 0.0 63.0 184.0 0.0 3.0 0.0 45.0 516.0 17.0 0.0 356.0 4.0 0.0 7.0 205.0 577.0 6.0 1.0 937.0 0.0 400.0 11.0 0.0 0.0 0.0 0.0 1621.0 scala> metrics.precision warning: there was one deprecation warning; re-run with -deprecation for details res6: Double = 0.8665525351041018 scala> (0 until 7).map( | cat => (metrics.precision(cat), metrics.recall(cat)) | ).foreach(println) (0.891598497710552,0.8241000523087165) (0.8554497765688751,0.9323122529644269) (0.7926681233309055,0.9301994301994302) (0.8975609756097561,0.736) (0.9518716577540107,0.3795309168443497) (0.9114785992217899,0.5406809001731102) (0.9091418956814358,0.797736220472441) |
構築したRandom Forestモデルforestを用いて、実際の判定を行ってみる。テキストの通りだと以下の通りのエラー:
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> val input = "2709, 125, 28, 67, 23, 3224, 253, 207, 61, 6094, 0, 29" input: String = 2709, 125, 28, 67, 23, 3224, 253, 207, 61, 6094, 0, 29 scala> val vector = Vectors.dense(input.split(',').map(_.toDouble)) vector: org.apache.spark.mllib.linalg.Vector = [2709.0,125.0,28.0,67.0,23.0,3224.0,253.0,207.0,61.0,6094.0,0.0,29.0] scala> forest.predict(vector) java.lang.ArrayIndexOutOfBoundsException: 13 at org.apache.spark.mllib.linalg.DenseVector.apply(Vectors.scala:632) at org.apache.spark.mllib.tree.model.Node.predict(Node.scala:70) at org.apache.spark.mllib.tree.model.DecisionTreeModel.predict(DecisionTreeModel. ・・・・・・ |
で、よく考えてみると、ラストのタイプ分け数値を除いて、全部の数値を入れないと行けないはずということで、以下のように試してみる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> val input = "2709,125,28,67,23,3224,253,207,61,6094,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0" input: String = 2709,125,28,67,23,3224,253,207,61,6094,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0 scala> val vector = Vectors.dense(input.split(',').map(_.toDouble)) vector: org.apache.spark.mllib.linalg.Vector = [2709.0,125.0,28.0,67.0,23.0,3224.0,253.0,207.0,61.0,6094.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] scala> forest.predict(vector) res14: Double = 1.0 scala> val input = "2511,92,7,182,18,722,231,229,131,5494,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0" input: String = 2511,92,7,182,18,722,231,229,131,5494,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 scala> val vector = Vectors.dense(input.split(',').map(_.toDouble)) vector: org.apache.spark.mllib.linalg.Vector = [2511.0,92.0,7.0,182.0,18.0,722.0,231.0,229.0,131.0,5494.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] scala> forest.predict(vector) res20: Double = 4.0 scala> val input = "3325,273,5,600,117,864,208,241,172,983,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0" input: String = 3325,273,5,600,117,864,208,241,172,983,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0 scala> val vector = Vectors.dense(input.split(',').map(_.toDouble)) vector: org.apache.spark.mllib.linalg.Vector = [3325.0,273.0,5.0,600.0,117.0,864.0,208.0,241.0,172.0,983.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0] scala> forest.predict(vector) res21: Double = 6.0 |
初めの例以外は、最後のタイプ分け数値-1=判定値で無事に判定された。