${SPARK_HOME}/bin/spark-shell –master local[*]と[*]指定すれば、自分のマシンのCPUのコア数に合わせた、スレッドがローカルクラスタで使われるとのこと。私のMacBook ProはCore7iの2コア。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local[*] 2018-09-24 16:42:48 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://macbook-pro-5:4040 Spark context available as 'sc' (master = local[*], app id = local-1537774978329). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@531ec2ca scala> sc. accumulable getCheckpointDir parallelize accumulableCollection getConf range accumulator getExecutorMemoryStatus register addFile getExecutorStorageStatus removeSparkListener addJar getLocalProperty requestExecutors addSparkListener getPersistentRDDs requestTotalExecutors appName getPoolForName runApproximateJob applicationAttemptId getRDDStorageInfo runJob applicationId getSchedulingMode sequenceFile binaryFiles hadoopConfiguration setCallSite binaryRecords hadoopFile setCheckpointDir broadcast hadoopRDD setJobDescription cancelAllJobs isLocal setJobGroup cancelJob isStopped setLocalProperty cancelJobGroup jars setLogLevel cancelStage killExecutor sparkUser clearCallSite killExecutors startTime clearJobGroup killTaskAttempt statusTracker collectionAccumulator listFiles stop defaultMinPartitions listJars submitJob defaultParallelism longAccumulator textFile deployMode makeRDD uiWebUrl doubleAccumulator master union emptyRDD newAPIHadoopFile version files newAPIHadoopRDD wholeTextFiles getAllPools objectFile |
次にSparkContext scに対して、耐障害性分散データ・セットRDD (Resilient Distributted Dataset) を生成するメソッドの中から、テキストファイルに保存された1行1データへの参照をRDDにとして得るためのtextFileメソッドを用いて、先にHadoopでHDFSのlinkageフォルダに置いたUCIデータbloc_*.csvを指定し、変数rawblocksへ
1 2 |
scala> val rawblocks = sc.textFile("hdfs://localhost/linkage") rawblocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost/linkage MapPartitionsRDD[1] at textFile at <console>:24 |
1 2 |
scala> rawblocks.first res1: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
scala> val head = rawblocks.take(10) head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE) scala> head.foreach(println) "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE 39086,47614,1,?,1,?,1,1,1,1,1,TRUE 70031,70237,1,?,1,?,1,1,1,1,1,TRUE 84795,97439,1,?,1,?,1,1,1,1,1,TRUE 36950,42116,1,?,1,1,1,1,1,1,1,TRUE 42413,48491,1,?,1,?,1,1,1,1,1,TRUE 25965,64753,1,?,1,?,1,1,1,1,1,TRUE 49451,90407,1,?,1,?,1,1,1,1,0,TRUE 39932,40902,1,?,1,?,1,1,1,1,1,TRUE |
1 2 3 4 5 6 7 |
scala> def isHeader(line: String): Boolean = { | line.contains("id_1") | } isHeader: (line: String)Boolean scala> head.filter(isHeader).foreach(println) "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" |
1 2 3 4 5 6 7 8 |
scala> head.filterNot(isHeader).length res5: Int = 9 scala> head.filter(x => !isHeader(x)).length res6: Int = 9 scala> head.filter(!isHeader(_)).length res7: Int = 9 |
クラスタ上データ全体に対して、ヘッダを除くフィルタリングかけたnoheader RDDを作成し、先頭行を確かめてみると、
1 2 3 4 5 |
scala> val noheader = rawblocks.filter(x => !isHeader(x)) noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27 scala> noheader.first res8: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE |
1 2 |
scala> val line = head(5) line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE |
1 2 3 4 5 6 7 8 9 10 11 |
scala> val pieces = line.split(',') pieces: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE) scala> val id1 = pieces(0).toInt id1: Int = 36950 scala> val id1 = pieces(1).toInt id1: Int = 42116 scala> val matched = pieces(11).toBoolean matched: Boolean = true |
1 2 |
scala> val rawscores = pieces.slice(2, 11) rawscores: Array[String] = Array(1, ?, 1, 1, 1, 1, 1, 1, 1) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
scala> rawscores.map(s => s.toDouble) java.lang.NumberFormatException: For input string: "?" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at $anonfun$1.apply(<console>:26) at $anonfun$1.apply(<console>:26) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ... 49 elided |
1 2 3 4 5 6 7 |
scala> def toDouble(s: String) = { | if ("?".equals(s)) Double.NaN else s.toDouble | } toDouble: (s: String)Double scala> val scores = rawscores.map(toDouble) scores: Array[Double] = Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val id2 = pieces(1).toInt | val scores = pieces.slice(2, 11).map(toDouble) | val matched = pieces(11).toBoolean | (id1, id2, scores, matched) | } parse: (line: String)(Int, Int, Array[Double], Boolean) scala> val tup = parse(line) tup: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true) scala> tup._1 res11: Int = 36950 scala> tup.productElement(0) res12: Any = 36950 scala> tup.productArity res13: Int = 4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scala> case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean) defined class MatchData scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val id2 = pieces(1).toInt | val scores = pieces.slice(2, 11).map(toDouble) | val matched = pieces(11).toBoolean | MatchData(id1, id2, scores, matched) | } parse: (line: String)MatchData scala> val md = parse(line) md: MatchData = MatchData(36950,42116,[D@33cd31cf,true) |
1 2 |
scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x)) mds: Array[MatchData] = Array(MatchData(37291,53113,[D@578ce232,true), MatchData(39086,47614,[D@3bfd73e7,true), MatchData(70031,70237,[D@33225b82,true), MatchData(84795,97439,[D@10370297,true), MatchData(36950,42116,[D@209aefd1,true), MatchData(42413,48491,[D@70a8c26e,true), MatchData(25965,64753,[D@51f779b2,true), MatchData(49451,90407,[D@797cfaa1,true), MatchData(39932,40902,[D@e6f7638,true)) |
1 2 3 4 5 6 7 8 9 10 11 |
scala> md.matched res15: Boolean = true scala> md.id1 res16: Int = 36950 scala> val parsed = noheader.map(line => parse(line)) parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[3] at map at <console>:29 scala> parsed.cache() res14: parsed.type = MapPartitionsRDD[3] at map at <console>:29 |
1 2 3 4 5 |
scala> val grouped = mds.groupBy(md => md.matched) grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@578ce232,true), MatchData(39086,47614,[D@3bfd73e7,true), MatchData(70031,70237,[D@33225b82,true), MatchData(84795,97439,[D@10370297,true), MatchData(36950,42116,[D@209aefd1,true), MatchData(42413,48491,[D@70a8c26e,true), MatchData(25965,64753,[D@51f779b2,true), MatchData(49451,90407,[D@797cfaa1,true), MatchData(39932,40902,[D@e6f7638,true))) scala> grouped.mapValues(x => x.size).foreach(println) (true,9) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> val matchCounts = parsed.map(md => md.matched).countByValue() [Stage 3:> (0 + 4) / 10]2018-09-24 18:01:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 70.0 MB so far) 2018-09-24 18:01:19 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:19 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:01:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 70.0 MB so far) 2018-09-24 18:01:19 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:19 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 3:=======================> (4 + 4) / 10]2018-09-24 18:01:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:23 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:23 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:01:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:23 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:23 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 3:==============================================> (8 + 2) / 10]2018-09-24 18:01:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:25 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:25 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:01:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:25 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:25 WARN BlockManager:66 - Putting block rdd_3_8 failed matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
scala> val matchCountSeq = matchCounts.toSeq matchCountSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201)) scala> matchCountSeq.sortBy(_._1).foreach(println) (false,5728201) (true,20931) scala> matchCountSeq.sortBy(_._2).foreach(println) (true,20931) (false,5728201) scala> matchCountSeq.sortBy(_._2).reverse.foreach(println) (false,5728201) (true,20931) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> parsed.map(md => md.scores(0)).stats() [Stage 5:=======================> (4 + 4) / 10]2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:10:05 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 70.0 MB so far) 2018-09-24 18:10:05 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:05 WARN BlockManager:66 - Putting block rdd_3_0 failed [Stage 5:==============================================> (8 + 2) / 10]2018-09-24 18:10:07 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:10:07 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:07 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:10:07 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 70.0 MB so far) 2018-09-24 18:10:07 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:07 WARN BlockManager:66 - Putting block rdd_3_8 failed res21: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
scala> import java.lang.Double.isNaN import java.lang.Double.isNaN scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats() [Stage 6:=======================> (4 + 4) / 10]2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 6:==============================================> (8 + 2) / 10]2018-09-24 18:12:55 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:55 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:55 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:12:55 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:12:55 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:55 WARN BlockManager:66 - Putting block rdd_3_9 failed res22: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
scala> val stats = (0 until 9).map(i => { | parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() | }) [Stage 7:=======================> (4 + 4) / 10]2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:10 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:10 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:10 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 7:==============================================> (8 + 2) / 10]2018-09-24 18:16:11 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:11 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:11 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:11 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:11 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:11 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 8:=======================> (4 + 4) / 10]2018-09-24 18:16:14 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:14 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:14 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:14 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:14 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:14 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:15 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:15 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:15 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:15 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:15 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:15 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 8:==============================================> (8 + 2) / 10]2018-09-24 18:16:16 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:16 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:16 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:17 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:17 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:17 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 9:=======================> (4 + 4) / 10]2018-09-24 18:16:18 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:18 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:18 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:19 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:19 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:19 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:19 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:20 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:20 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:20 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 9:==============================================> (8 + 2) / 10]2018-09-24 18:16:21 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:21 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:21 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:21 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:21 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:21 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 10:======================> (4 + 4) / 10]2018-09-24 18:16:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:23 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:23 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:23 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:23 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:24 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:24 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:24 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:25 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:25 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 10:=============================================> (8 + 2) / 10]2018-09-24 18:16:26 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:26 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:26 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:16:26 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:26 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:26 WARN BlockManager:66 - Putting block rdd_3_8 failed [Stage 11:======================> (4 + 4) / 10]2018-09-24 18:16:28 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:28 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:28 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:28 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:28 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:28 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:29 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:29 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:29 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:29 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:29 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:29 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 11:=============================================> (8 + 2) / 10]2018-09-24 18:16:31 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:31 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:31 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:31 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:31 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:31 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 12:======================> (4 + 4) / 10]2018-09-24 18:16:33 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:33 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:33 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:33 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:33 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:33 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:34 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:34 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:34 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:34 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:34 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:34 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 12:=============================================> (8 + 2) / 10]2018-09-24 18:16:36 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:36 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:36 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:16:36 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:36 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:36 WARN BlockManager:66 - Putting block rdd_3_8 failed [Stage 13:======================> (4 + 4) / 10]2018-09-24 18:16:38 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:38 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:38 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:38 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:38 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:38 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:39 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:39 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:39 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:39 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:39 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:39 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 13:=============================================> (8 + 2) / 10]2018-09-24 18:16:41 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:41 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:41 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:41 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:41 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:41 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 14:======================> (4 + 4) / 10]2018-09-24 18:16:43 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:43 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:43 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:43 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:43 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:43 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:44 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:44 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:44 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:44 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:44 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:44 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 14:=============================================> (8 + 2) / 10]2018-09-24 18:16:46 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:46 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:46 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:46 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:46 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:46 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 15:======================> (4 + 4) / 10]2018-09-24 18:16:48 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:48 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:48 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:48 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:48 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:48 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:49 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:49 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:49 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:49 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:49 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:49 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 15:=============================================> (8 + 2) / 10]2018-09-24 18:16:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:52 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:52 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:53 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:53 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:53 WARN BlockManager:66 - Putting block rdd_3_9 failed stats: scala.collection.immutable.IndexedSeq[org.apache.spark.util.StatCounter] = Vector((count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000), (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000), (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000), (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000,... scala> |
${SPARK_HOME}/bin/spark-shell –master local[*] –driver-memory 2g
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local[*] --driver-memory 2g 2018-09-24 18:21:12 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2018-09-24 18:21:20 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Spark context Web UI available at http://macbook-pro-5:4041 Spark context available as 'sc' (master = local[*], app id = local-1537780881289). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
scala> val matchCounts = parsed.map(md => md.matched).countByValue() matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201) scala> val matchCountSeq = matchCounts.toSeq matchCountSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201)) scala> import java.lang.Double.isNaN import java.lang.Double.isNaN scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats() res7: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) scala> val stats = (0 until 9).map(i => { | parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() | }) stats: scala.collection.immutable.IndexedSeq[org.apache.spark.util.StatCounter] = Vector((count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000), (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000), (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000), (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000,... scala> scala> stats(1) res8: org.apache.spark.util.StatCounter = (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) scala> stats(8) res9: org.apache.spark.util.StatCounter = (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import org.apache.spark.util.StatCounter class NAStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var missing: Long = 0 def add(x: Double): NAStatCounter = { if (java.lang.Double.isNaN(x)) { missing += 1 } else { stats.merge(x) } this } def merge(other: NAStatCounter): NAStatCounter = { stats.merge(other.stats) missing += other.missing this } override def toString = { "stats: " + stats.toString + " NaN: " + missing } } object NAStatCounter extends Serializable { def apply(x: Double) = new NAStatCounter().add(x) } |
1 2 3 4 5 6 7 8 |
scala> :load StatsWithMissing.scala Loading StatsWithMissing.scala... import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD defined class NAStatCounter defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. |
1 2 3 4 5 6 7 8 9 10 11 |
scala> val nas1 = NAStatCounter(10.0) nas1: NAStatCounter = stats: (count: 1, mean: 10.000000, stdev: 0.000000, max: 10.000000, min: 10.000000) NaN: 0 scala> nas1.add(2.1) res10: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 0 scala> val nas2 = NAStatCounter(Double.NaN) nas2: NAStatCounter = stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1 scala> nas1.merge(nas2) res11: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 1 |
1 2 3 4 5 |
scala> val arr = Array(1.0, Double.NaN, 17.29) arr: Array[Double] = Array(1.0, NaN, 17.29) scala> val nas = arr.map(d => NAStatCounter(d)) nas: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 17.290000, stdev: 0.000000, max: 17.290000, min: 17.290000) NaN: 0) |
scala> val parsed = noheader.map(line => parse(line))
と、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて、nasRDを生成させる。
1 2 3 4 |
scala> val nasRDD = parsed.map(md => { | md.scores.map(d => NAStatCounter(d)) | }) nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MapPartitionsRDD[38] at map at <console>:32 |
ここで、複数のNASataCounterインスタンスで構成されるArray nas1とnas2に対して、zip()関数で
1 2 3 4 5 6 7 8 9 10 11 |
scala> val nas1 = Array(1.0, Double.NaN).map(d => NAStatCounter(d)) nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1) scala> val nas2 = Array(Double.NaN, 2.0).map(d => NAStatCounter(d)) nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0) scala> val merged = nas1.zip(nas2).map(p => p._1.merge(p._2)) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) scala> val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) } merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) |
List[Array[NAStatCounter]] のインスタンスとして、reduce()関数に当てはめ、merge()関数に送れば、それぞれのArray[NAStatCounter]要素ごとにmerge()を行うことができる。
1 2 3 4 5 6 7 |
scala> val nas = List(nas1, nas2) nas: List[Array[NAStatCounter]] = List(Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1), Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)) scala> val merged = nas.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) } | }) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 3, stats: (count: 3, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) |
このreduce() => merge()作戦を、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて生成したnasRDDに当てはめれば、各MachDataごとに、merge()が適応されて、一気にArray[NAStatCounter]の全データに対する統計サマリーが取得できる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scala> val reduced = nasRDD.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) } | }) reduced: Array[NAStatCounter] = Array(stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.... scala> reduced.foreach(println) stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007 stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434 stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668 stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000) NaN: 12843 |
そこで、このreduce() => merge()のメカニズムをstatsWithMissing()関数としてStatsWithMissing.scalaに組み込む改良して、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD class NAStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var missing: Long = 0 def add(x: Double): NAStatCounter = { if (java.lang.Double.isNaN(x)) { missing += 1 } else { stats.merge(x) } this } def merge(other: NAStatCounter): NAStatCounter = { stats.merge(other.stats) missing += other.missing this } override def toString = { "stats: " + stats.toString + " NaN: " + missing } } object NAStatCounter extends Serializable { def apply(x: Double) = new NAStatCounter().add(x) } def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = { val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => { val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d)) iter.foreach(arr => { nas.zip(arr).foreach { case (n, d) => n.add(d) } }) Iterator(nas) }) nastats.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } }) } |
1 2 3 4 5 6 7 8 9 10 11 |
scala> :load StatsWithMissing.scala Loading StatsWithMissing.scala... import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD defined class NAStatCounter warning: previously defined object NAStatCounter is not a companion to class NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. statsWithMissing: (rdd: org.apache.spark.rdd.RDD[Array[Double]])Array[NAStatCounter] |
1 2 3 4 5 |
scala> val statsm = statsWithMissing(parsed.filter(_.matched).map(_.scores)) statsm: Array[NAStatCounter] = Array(stats: (count: 20922, mean: 0.997316, stdev: 0.036506, max: 1.000000, min: 0.000000) NaN: 9, stats: (count: 1333, mean: 0.989890, stdev: 0.082489, max: 1.000000, min: 0.000000) NaN: 19598, stats: (count: 20931, mean: 0.997015, stdev: 0.043118, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 475, mean: 0.969370, stdev: 0.153291, max: 1.000000, min: 0.000000) NaN: 20456, stats: (count: 20931, mean: 0.987292, stdev: 0.112013, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 20925, mean: 0.997085, stdev: 0.053914, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.997945, stdev: 0.045285, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.996129, stdev: 0.062097, max: 1.000000, min: 0.000000) NaN: 6, stats: (cou... scala> val statsn = statsWithMissing(parsed.filter(!_.matched).map(_.scores)) statsn: Array[NAStatCounter] = Array(stats: (count: 5727203, mean: 0.711863, stdev: 0.389081, max: 1.000000, min: 0.000000) NaN: 998, stats: (count: 102365, mean: 0.898847, stdev: 0.272720, max: 1.000000, min: 0.000000) NaN: 5625836, stats: (count: 5728201, mean: 0.313138, stdev: 0.332281, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 1989, mean: 0.162955, stdev: 0.192975, max: 1.000000, min: 0.000000) NaN: 5726212, stats: (count: 5728201, mean: 0.954883, stdev: 0.207560, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5727412, mean: 0.221643, stdev: 0.415352, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.486995, stdev: 0.499831, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.219923, stdev: 0.414194, max: 1.000000, min: 0.00... |
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> statsm.zip(statsn).map { case(m, n) => | (m.missing + n.missing, m.stats.mean - n.stats.mean) | }.foreach(println) (1007,0.285452905746686) (5645434,0.09104268062279897) (0,0.6838772482597568) (5746668,0.8064147192926266) (0,0.03240818525033473) (795,0.7754423117834044) (795,0.5109496938298719) (795,0.7762059675300521) (12843,0.9563812499852178) |
1 2 3 4 5 6 7 8 9 |
3. cmp_fname_c1: agreement of first name, first component 4. cmp_fname_c2: agreement of first name, second component 5. cmp_lname_c1: agreement of family name, first component 6. cmp_lname_c2: agreement of family name, second component 7. cmp_sex: agreement sex 8. cmp_bd: agreement of date of birth, day component 9. cmp_bm: agreement of date of birth, month component 10. cmp_by: agreement of date of birth, year component 11. cmp_plz: agreemen of postal code |
1 2 3 4 5 6 7 8 9 10 11 |
Missing Attribute Values: cmp_fname_c1: 1007 cmp_fname_c2: 5645434 cmp_lname_c1: 0 cmp_lname_c2: 5746668 cmp_sex: 0 cmp_bd: 795 cmp_bm: 795 cmp_by: 795 cmp_plz: 12843 |
結果を分析してみると、 cmp_fname_c2やcmp_lname_c2は、missingが多すぎて使い物にならないし、sexはmissingは皆無だが2群間での平均値の差が0.03と殆ど無い。2つの患者データが同一患者かどうかを判別するものには、以下の5項目を利用が適当ということがわかる。
1 2 3 4 5 |
2. 5. cmp_lname_c1: agreement of family name, first component 5 8. cmp_bd: agreement of date of birth, day component 6 9. cmp_bm: agreement of date of birth, month component 7 10. cmp_by: agreement of date of birth, year component 8 11. cmp_plz: agreemen of postal code |
1 2 3 4 5 |
scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d naz: (d: Double)Double scala> case class Scored(md: MatchData, score:Double) defined class Scored |
全データのMatchData RDDであるparsedに適応させる。
1 2 3 4 5 |
scala> val ct = parsed.map(md => { | val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum | Scored(md, score) | }) ct: org.apache.spark.rdd.RDD[Scored] = MapPartitionsRDD[45] at map at <console>:46 |
このスコア値が4以上、あるいは2以上の場合に、md RDDでmatchedかどうかをcountByValue()でカウントさせる。
1 2 3 4 5 |
scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue() res17: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637) scala> ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue() res18: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414) |