Mastering Machine Learning with Spark 2.xを学習する。
———————————————————
———————————————————
まずはサンプルコードを以下のGitHubサイトからダウンロード。
https://github.com/PacktPublishing/Mastering-Machine-Learning-with-Spark-2.x
記載の通り、gradleでビルド
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
MacBook-Pro-5:Mastering-Machine-Learning-with-Spark-2.x-master $ ./gradlew projects > Task :projects ------------------------------------------------------------ Root project ------------------------------------------------------------ Root project 'Mastering-Machine-Learning-with-Spark-2.x' +--- Project ':mastering-ml-w-spark-chapter02' +--- Project ':mastering-ml-w-spark-chapter03' +--- Project ':mastering-ml-w-spark-chapter04' +--- Project ':mastering-ml-w-spark-chapter05' +--- Project ':mastering-ml-w-spark-chapter06' +--- Project ':mastering-ml-w-spark-chapter07' +--- Project ':mastering-ml-w-spark-chapter08' \--- Project ':mastering-ml-w-spark-utils' To see a list of the tasks of a project, run gradlew <project-path>:tasks For example, try running gradlew :mastering-ml-w-spark-chapter02:tasks BUILD SUCCESSFUL in 1s 1 actionable task: 1 executed |
Ch02: ヒッグス粒子(Higgs-Boson Particle)の解析:
まずは、datasetをUCIの機械学習サイトからダウンロード:
https://archive.ics.uci.edu/ml/datasets/HIGGS
8Gの巨大なHIIGGS.csvを得る。
このファイルは、データが0-background noiseなのか、1-Higgs-Bosonのシグナルなのかを1千100万個、記録することを含む29フィールドで構成される。
Field #1: Class label(0-background noiseなのか、1-Higgs-Boson)
Field #2-#22: “low-level” feastures 衝突検知器からのシグナル
Field #23-29: 7つの”high-level features” 素粒子物理学者による0-background noiseなのか、1-Higgs-Bosonのシグナルなのかを分類するもの。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
1.000000000000000000e+00, 8.692932128906250000e-01, -6.350818276405334473e-01, 2.256902605295181274e-01, 3.274700641632080078e-01, -6.899932026863098145e-01, 7.542022466659545898e-01, -2.485731393098831177e01, -1.092063903808593750e+00, 0.000000000000000000e+00, 1.374992132186889648e+00, -6.536741852760314941e-01, 9.303491115570068359e01, 1.107436060905456543e+00, 1.138904333114624023e+00, -1.578198313713073730e+00, -1.046985387802124023e+00, 0.000000000000000000e+00, 6.579295396804809570e-01, -1.045456994324922562e-02, -4.576716944575309753e02, 3.101961374282836914e+00, 1.353760004043579102e+00, 9.795631170272827148e-01, 9.780761599540710449e-01, 9.200048446655273438e-01, 7.216574549674987793e-01, 9.887509346008300781e-01, 8.766783475875854492e-01 |
このファイルをHDFSへコピーする
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
MacBook-Pro-5:~ $ cd /usr/local/Cellar/hadoop/3.1.1/ MacBook-Pro-5:3.1.1 $ ls INSTALL_RECEIPT.json README.txt libexec LICENSE.txt bin sbin NOTICE.txt input MacBook-Pro-5:3.1.1 $ cd bin MacBook-Pro-5:bin $ ls container-executor hdfs test-container-executor hadoop mapred yarn MacBook-Pro-5:bin $ ./hadoop fs -put /Users/*******/HIGGS.csv /user/teijisw/ds MacBook-Pro-5:bin $ ./hadoop fs -ls /user/*******/ds/ Found 6 items -rw-r--r-- 3 ******* supergroup 8035497980 2018-10-21 22:05 /user/ *******/ds/HIGGS.csv -rw-r--r-- 3 ******* supergroup 2932731 2018-09-29 15:02 /user/ *******/ds/artist_alias.txt -rw-r--r-- 3 ******* supergroup 55963575 2018-09-29 15:02 /user/ *******/ds/artist_data.txt -rw-r--r-- 3 ******* supergroup 75169317 2018-10-08 08:35 /user/ *******/ds/covtype.data -rw-r--r-- 3 ******* supergroup 742579829 2018-10-10 23:27 /user/ *******/ds/kddcup.data -rw-r--r-- 3 *******supergroup 426761761 2018-09-29 15:02 /user/ *******/ds/user_artist_data.txt |
Buildしたサンプルコードから、utils/build/libs/astering-ml-w-spark-utils_*.jarファイル3つを/opt/spark-2.3.1-bin-hadoop2.7/jarsにコピーしておく。
注:理由はcom.packtpub:mastering-sml-w-spark-utils:1.0.0がパッケージロードできないことと、import org.apache.spark.uilts.Tabulizer._も使えないため。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
MacBook-Pro-5:~ $ export SPARKLING_WATER_VERSION="2.3.16" MacBook-Pro-5:~ $ export SPARK_PACKAGES="ai.h2o:sparkling-water-core_2.11:${SPARKLING_WATER_VERSION},\ ai.h2o:sparkling-water-repl_2.11:${SPARKLING_WATER_VERSION},\ ai.h2o:sparkling-water-ml_2.11:${SPARKLING_WATER_VERSION}" MacBook-Pro-5:bin $ ${SPARK_HOME}/bin/spark-shell --master local[*] --driver-memory 4g --executor-memory 4g --packages "$SPARK_PACKAGES" Ivy Default Cache set to: /Users/******/.ivy2/cache The jars for the packages stored in: /Users/*******/.ivy2/jars :: loading settings :: url = jar:file:/opt/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml ai.h2o#sparkling-water-core_2.11 added as a dependency ai.h2o#sparkling-water-repl_2.11 added as a dependency ai.h2o#sparkling-water-ml_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-9efb3205-a018-4cdf-9839-c0b520d3ce84;1.0 ......... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> import org.apache.spark.mllib import org.apache.spark.mllib scala> import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.LabeledPoint scala> import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg._ scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.distributed.RowMatrix scala> import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.MLUtils scala> import org.apache.spark.mllib.evaluation._ import org.apache.spark.mllib.evaluation._ scala> import org.apache.spark.mllib.tree._ import org.apache.spark.mllib.tree._ scala> import org.apache.spark.mllib.tree.model._ import org.apache.spark.mllib.tree.model._ scala> import org.apache.spark.rdd._ import org.apache.spark.rdd._ |
HDFSに保存したHIGGS.csvデータをRDD rawDataへ収める。
1 2 |
scala> val rawData = sc.textFile("hdfs://localhost/user/*******/ds/HIGGS.csv") rawData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/*******/ds/HIGGS.csv MapPartitionsRDD[1] at textFile at <console>:43 |
rawDataのデータ数をカウントしておく。1千百万個!
1 2 |
scala> println(s"Numer of rows: ${rawData.count}") Numer of rows: 11000000 |
rawDataの始めの2つを表示させると:
1 2 3 4 5 6 |
scala> println("Rows") Rows scala> println(rawData.take(2).mkString("\n")) 1.000000000000000000e+00,8.692932128906250000e-01,-6.350818276405334473e-01,2.256902605295181274e-01,3.274700641632080078e-01,-6.899932026863098145e-01,7.542022466659545898e-01,-2.485731393098831177e-01,-1.092063903808593750e+00,0.000000000000000000e+00,1.374992132186889648e+00,-6.536741852760314941e-01,9.303491115570068359e-01,1.107436060905456543e+00,1.138904333114624023e+00,-1.578198313713073730e+00,-1.046985387802124023e+00,0.000000000000000000e+00,6.579295396804809570e-01,-1.045456994324922562e-02,-4.576716944575309753e-02,3.101961374282836914e+00,1.353760004043579102e+00,9.795631170272827148e-01,9.780761599540710449e-01,9.200048446655273438e-01,7.216574549674987793e-01,9.887509346008300781e-01,8.766783475875854492e-01 1.000000000000000000e+00,9.075421094894409180e-01,3.291472792625427246e-01,3.594118654727935791e-01,1.497969865798950195e+00,-3.130095303058624268e-01,1.095530629158020020e+00,-5.575249195098876953e-01,-1.588229775428771973e+00,2.173076152801513672e+00,8.125811815261840820e-01,-2.136419266462326050e-01,1.271014571189880371e+00,2.214872121810913086e+00,4.999939501285552979e-01,-1.261431813240051270e+00,7.321561574935913086e-01,0.000000000000000000e+00,3.987008929252624512e-01,-1.138930082321166992e+00,-8.191101951524615288e-04,0.000000000000000000e+00,3.022198975086212158e-01,8.330481648445129395e-01,9.856996536254882812e-01,9.780983924865722656e-01,7.797321677207946777e-01,9.923557639122009277e-01,7.983425855636596680e-01 |
RDD[Array[Double]]として、各データをArrayに収める。
1 2 |
scala> val data = rawData.map(line => line.split(',').map(_.toDouble)) data: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2] at map at <console>:44 |
フィールド#1のデータを整数として、RDD responseへ収め、フィールド#2以下をRDD Vectorとしてfeaturesへ収める
1 2 3 4 5 6 7 8 9 10 11 |
scala> val response: RDD[Int] = data.map(row => row(0).toInt) response: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:44 scala> response.first res14: Int = 1 scala> val features: RDD[Vector] = data.map(line => Vectors.dense(line.slice(1, line.size))) features: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[4] at map at <console>:44 scala> features.first res13: org.apache.spark.mllib.linalg.Vector = [0.869293212890625,-0.6350818276405334,0.22569026052951813,0.327470064163208,-0.6899932026863098,0.7542022466659546,-0.24857313930988312,-1.0920639038085938,0.0,1.3749921321868896,-0.6536741852760315,0.9303491115570068,1.1074360609054565,1.138904333114624,-1.5781983137130737,-1.046985387802124,0.0,0.657929539680481,-0.010454569943249226,-0.0457671694457531,3.101961374282837,1.353760004043579,0.9795631170272827,0.978076159954071,0.9200048446655273,0.7216574549674988,0.9887509346008301,0.8766783475875854] |
featuresの統計学的データを求めるために、RowMatirx()とcomputeColumnSummaryStatistics()を用いて:
1 2 3 4 5 |
scala> val featuresMatrix = new RowMatrix(features) featuresMatrix: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@431782fe scala> val featuresSummary = featuresMatrix.computeColumnSummaryStatistics() featuresSummary: org.apache.spark.mllib.stat.MultivariateStatisticalSummary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@7a6ef10b |
ビルドしたutilsのjarライブラリーをインポートして、Table形式表示機能を使う:
1 2 |
scala> import com.packtpub.mmlwspark.utils.Tabulizer.table import com.packtpub.mmlwspark.utils.Tabulizer.table |
featuresの平均値、分散を求める。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
scala> println(s"Higgs Features Mean Values = ${table(featuresSummary.mean, 8)}") Higgs Features Mean Values = +-----+------+------+------+-----+-----+------+-----+ |0.991|-0.000|-0.000| 0.999|0.000|0.991|-0.000|0.000| |1.000| 0.993|-0.000|-0.000|1.000|0.992| 0.000|0.000| |1.000| 0.986|-0.000| 0.000|1.000|1.034| 1.025|1.051| |1.010| 0.973| 1.033| 0.960| -| -| -| -| +-----+------+------+------+-----+-----+------+-----+ scala> println(s"Higgs Features Variance Values = ${featuresSummary.variance}") Higgs Features Variance Values = [0.3196519449112381,1.017730961868819,1.0127329351587082,0.3600221904165112,1.0126924404453506,0.22560101074555566,1.0186925432458278,1.0118370903984704,1.056388410032272,0.249993863013093,1.0187480855950728,1.0123467492973912,1.1012362604858514,0.2378145656244775,1.017570015894045,1.0126497344555911,1.4248613805624162,0.2558110682016633,1.0154477450444686,1.0127717879832583,1.9605863249337765,0.4551328799615973,0.14501428530792224,0.02708534249305763,0.15796277985571208,0.2760517515914354,0.13341166899797843,0.09818057123675344] scala> println(s"Higgs Features Variance Values = ${table(featuresSummary.variance, 8)}") Higgs Features Variance Values = +-----+-----+-----+-----+-----+-----+-----+-----+ |0.320|1.018|1.013|0.360|1.013|0.226|1.019|1.012| |1.056|0.250|1.019|1.012|1.101|0.238|1.018|1.013| |1.425|0.256|1.015|1.013|1.961|0.455|0.145|0.027| |0.158|0.276|0.133|0.098| -| -| -| -| +-----+-----+-----+-----+-----+-----+-----+-----+ scala> val nonZeros = featuresSummary.numNonzeros nonZeros: org.apache.spark.mllib.linalg.Vector = [1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,5605389.0,1.1E7,1.1E7,1.1E7,5476088.0,1.1E7,1.1E7,1.1E7,4734760.0,1.1E7,1.1E7,1.1E7,3869383.0,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7] |
ゼロデータのフィールドをカウントする:
1 2 3 4 5 6 7 8 |
scala> println(s"Non-zero values count per column: ${table(nonZeros, cols = 8, format = "%.0f")}") Non-zero values count per column: +--------+--------+--------+--------+--------+--------+--------+--------+ |11000000|11000000|11000000|11000000|11000000|11000000|11000000|11000000| | 5605389|11000000|11000000|11000000| 5476088|11000000|11000000|11000000| | 4734760|11000000|11000000|11000000| 3869383|11000000|11000000|11000000| |11000000|11000000|11000000|11000000| -| -| -| -| +--------+--------+--------+--------+--------+--------+--------+--------+ |
行数、列数を確認:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> val numRows = featuresMatrix.numRows numRows: Long = 11000000 scala> val numCols = featuresMatrix.numCols numCols: Long = 28 scala> val colsWithZeros = nonZeros colsWithZeros: org.apache.spark.mllib.linalg.Vector = [1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,5605389.0,1.1E7,1.1E7,1.1E7,5476088.0,1.1E7,1.1E7,1.1E7,4734760.0,1.1E7,1.1E7,1.1E7,3869383.0,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7,1.1E7] scala> .toArray res7: Array[Double] = Array(1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 5605389.0, 1.1E7, 1.1E7, 1.1E7, 5476088.0, 1.1E7, 1.1E7, 1.1E7, 4734760.0, 1.1E7, 1.1E7, 1.1E7, 3869383.0, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7, 1.1E7) scala> .zipWithIndex res8: Array[(Double, Int)] = Array((1.1E7,0), (1.1E7,1), (1.1E7,2), (1.1E7,3), (1.1E7,4), (1.1E7,5), (1.1E7,6), (1.1E7,7), (5605389.0,8), (1.1E7,9), (1.1E7,10), (1.1E7,11), (5476088.0,12), (1.1E7,13), (1.1E7,14), (1.1E7,15), (4734760.0,16), (1.1E7,17), (1.1E7,18), (1.1E7,19), (3869383.0,20), (1.1E7,21), (1.1E7,22), (1.1E7,23), (1.1E7,24), (1.1E7,25), (1.1E7,26), (1.1E7,27)) scala> .filter { case (rows, idx) => rows != numRows } res9: Array[(Double, Int)] = Array((5605389.0,8), (5476088.0,12), (4734760.0,16), (3869383.0,20)) |
sparsity希薄さ、まばらさの評価:ゼロのない頻度
1 2 3 4 5 |
scala> val sparsity = nonZeros.toArray.sum / (numRows * numCols) sparsity: Double = 0.9210572077922078 scala> println(f"Data sparsity: ${sparsity}%.2f") Data sparsity: 0.92 |
responseには、0か1か、頻度チェック:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> val responseValues = response.distinct.collect responseValues: Array[Int] = Array(0, 1) scala> println(s"Response values: ${responseValues.mkString(", ")}") Response values: 0, 1 scala> val responseDistribution = response.map(v => (v,1)).countByKey responseDistribution: scala.collection.Map[Int,Long] = Map(0 -> 5170877, 1 -> 5829123) scala> println(s"Response distribution:\n${table(responseDistribution)}") Response distribution: +-------+-------+ | 0| 1| +-------+-------+ |5170877|5829123| +-------+-------+ |