前回の続き:
${SPARK_HOME}/bin/spark-shell –master local[*]と[*]指定すれば、自分のマシンのCPUのコア数に合わせた、スレッドがローカルクラスタで使われるとのこと。私のMacBook ProはCore7iの2コア。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local[*] 2018-09-24 16:42:48 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://macbook-pro-5:4040 Spark context available as 'sc' (master = local[*], app id = local-1537774978329). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> |
まずは、SparkContextのscとそのメソッドをリストしてみる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@531ec2ca scala> sc. accumulable getCheckpointDir parallelize accumulableCollection getConf range accumulator getExecutorMemoryStatus register addFile getExecutorStorageStatus removeSparkListener addJar getLocalProperty requestExecutors addSparkListener getPersistentRDDs requestTotalExecutors appName getPoolForName runApproximateJob applicationAttemptId getRDDStorageInfo runJob applicationId getSchedulingMode sequenceFile binaryFiles hadoopConfiguration setCallSite binaryRecords hadoopFile setCheckpointDir broadcast hadoopRDD setJobDescription cancelAllJobs isLocal setJobGroup cancelJob isStopped setLocalProperty cancelJobGroup jars setLogLevel cancelStage killExecutor sparkUser clearCallSite killExecutors startTime clearJobGroup killTaskAttempt statusTracker collectionAccumulator listFiles stop defaultMinPartitions listJars submitJob defaultParallelism longAccumulator textFile deployMode makeRDD uiWebUrl doubleAccumulator master union emptyRDD newAPIHadoopFile version files newAPIHadoopRDD wholeTextFiles getAllPools objectFile |
次にSparkContext scに対して、耐障害性分散データ・セットRDD (Resilient Distributted Dataset) を生成するメソッドの中から、テキストファイルに保存された1行1データへの参照をRDDにとして得るためのtextFileメソッドを用いて、先にHadoopでHDFSのlinkageフォルダに置いたUCIデータbloc_*.csvを指定し、変数rawblocksへ
1 2 |
scala> val rawblocks = sc.textFile("hdfs://localhost/linkage") rawblocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost/linkage MapPartitionsRDD[1] at textFile at <console>:24 |
RDDの先頭要素をfisrtメソッドで返さすと、
1 2 |
scala> rawblocks.first res1: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" |
RDDの最初の10行を読み取ると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
scala> val head = rawblocks.take(10) head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE) scala> head.foreach(println) "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE 39086,47614,1,?,1,?,1,1,1,1,1,TRUE 70031,70237,1,?,1,?,1,1,1,1,1,TRUE 84795,97439,1,?,1,?,1,1,1,1,1,TRUE 36950,42116,1,?,1,1,1,1,1,1,1,TRUE 42413,48491,1,?,1,?,1,1,1,1,1,TRUE 25965,64753,1,?,1,?,1,1,1,1,1,TRUE 49451,90407,1,?,1,?,1,1,1,1,0,TRUE 39932,40902,1,?,1,?,1,1,1,1,1,TRUE |
ヘッダ行の指定を関数を使って行うと、
1 2 3 4 5 6 7 |
scala> def isHeader(line: String): Boolean = { | line.contains("id_1") | } isHeader: (line: String)Boolean scala> head.filter(isHeader).foreach(println) "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" |
ヘッダ以外の行数を求める様々なScalaの記述方法は、
1 2 3 4 5 6 7 8 |
scala> head.filterNot(isHeader).length res5: Int = 9 scala> head.filter(x => !isHeader(x)).length res6: Int = 9 scala> head.filter(!isHeader(_)).length res7: Int = 9 |
クラスタ上データ全体に対して、ヘッダを除くフィルタリングかけたnoheader RDDを作成し、先頭行を確かめてみると、
1 2 3 4 5 |
scala> val noheader = rawblocks.filter(x => !isHeader(x)) noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27 scala> noheader.first res8: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE |
次に、行内のデータを取り出して、配列に収める作業:5行目を取り出して、
1 2 |
scala> val line = head(5) line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE |
コンマでラインを分割し、
1 2 3 4 5 6 7 8 9 10 11 |
scala> val pieces = line.split(',') pieces: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE) scala> val id1 = pieces(0).toInt id1: Int = 36950 scala> val id1 = pieces(1).toInt id1: Int = 42116 scala> val matched = pieces(11).toBoolean matched: Boolean = true |
1 2 |
scala> val rawscores = pieces.slice(2, 11) rawscores: Array[String] = Array(1, ?, 1, 1, 1, 1, 1, 1, 1) |
”?”が含まれているため、そのままではエラーが、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
scala> rawscores.map(s => s.toDouble) java.lang.NumberFormatException: For input string: "?" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at $anonfun$1.apply(<console>:26) at $anonfun$1.apply(<console>:26) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ... 49 elided |
”?”をNaNに変換することと、数字をダブルとして変換する関数:
1 2 3 4 5 6 7 |
scala> def toDouble(s: String) = { | if ("?".equals(s)) Double.NaN else s.toDouble | } toDouble: (s: String)Double scala> val scores = rawscores.map(toDouble) scores: Array[Double] = Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) |
全部、関数にまとめると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val id2 = pieces(1).toInt | val scores = pieces.slice(2, 11).map(toDouble) | val matched = pieces(11).toBoolean | (id1, id2, scores, matched) | } parse: (line: String)(Int, Int, Array[Double], Boolean) scala> val tup = parse(line) tup: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true) scala> tup._1 res11: Int = 36950 scala> tup.productElement(0) res12: Any = 36950 scala> tup.productArity res13: Int = 4 |
名前で管理できるようにする:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scala> case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean) defined class MatchData scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val id2 = pieces(1).toInt | val scores = pieces.slice(2, 11).map(toDouble) | val matched = pieces(11).toBoolean | MatchData(id1, id2, scores, matched) | } parse: (line: String)MatchData scala> val md = parse(line) md: MatchData = MatchData(36950,42116,[D@33cd31cf,true) |
ヘッド行以外の先頭9行をMatchDataにアプライ:
1 2 |
scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x)) mds: Array[MatchData] = Array(MatchData(37291,53113,[D@578ce232,true), MatchData(39086,47614,[D@3bfd73e7,true), MatchData(70031,70237,[D@33225b82,true), MatchData(84795,97439,[D@10370297,true), MatchData(36950,42116,[D@209aefd1,true), MatchData(42413,48491,[D@70a8c26e,true), MatchData(25965,64753,[D@51f779b2,true), MatchData(49451,90407,[D@797cfaa1,true), MatchData(39932,40902,[D@e6f7638,true)) |
1 2 3 4 5 6 7 8 9 10 11 |
scala> md.matched res15: Boolean = true scala> md.id1 res16: Int = 36950 scala> val parsed = noheader.map(line => parse(line)) parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[3] at map at <console>:29 scala> parsed.cache() res14: parsed.type = MapPartitionsRDD[3] at map at <console>:29 |
ヘッド行以外の先頭9行をグループ化
1 2 3 4 5 |
scala> val grouped = mds.groupBy(md => md.matched) grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@578ce232,true), MatchData(39086,47614,[D@3bfd73e7,true), MatchData(70031,70237,[D@33225b82,true), MatchData(84795,97439,[D@10370297,true), MatchData(36950,42116,[D@209aefd1,true), MatchData(42413,48491,[D@70a8c26e,true), MatchData(25965,64753,[D@51f779b2,true), MatchData(49451,90407,[D@797cfaa1,true), MatchData(39932,40902,[D@e6f7638,true))) scala> grouped.mapValues(x => x.size).foreach(println) (true,9) |
いよいよ大規模集計、ヒストグラムの作成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> val matchCounts = parsed.map(md => md.matched).countByValue() [Stage 3:> (0 + 4) / 10]2018-09-24 18:01:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 70.0 MB so far) 2018-09-24 18:01:19 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:19 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:01:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 70.0 MB so far) 2018-09-24 18:01:19 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:19 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 3:=======================> (4 + 4) / 10]2018-09-24 18:01:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:23 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:23 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:01:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:23 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:23 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 3:==============================================> (8 + 2) / 10]2018-09-24 18:01:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:25 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:25 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:01:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:01:25 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:01:25 WARN BlockManager:66 - Putting block rdd_3_8 failed matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201) |
WARNがいっぱい出たが、trueが20931、falseが5728201、であった。
Seq型を利用して、得られた結果をソートすると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
scala> val matchCountSeq = matchCounts.toSeq matchCountSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201)) scala> matchCountSeq.sortBy(_._1).foreach(println) (false,5728201) (true,20931) scala> matchCountSeq.sortBy(_._2).foreach(println) (true,20931) (false,5728201) scala> matchCountSeq.sortBy(_._2).reverse.foreach(println) (false,5728201) (true,20931) |
Scoreの最初の値をparseしてみると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> parsed.map(md => md.scores(0)).stats() [Stage 5:=======================> (4 + 4) / 10]2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:10:03 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:10:03 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:03 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:10:05 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 70.0 MB so far) 2018-09-24 18:10:05 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:05 WARN BlockManager:66 - Putting block rdd_3_0 failed [Stage 5:==============================================> (8 + 2) / 10]2018-09-24 18:10:07 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:10:07 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:07 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:10:07 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 70.0 MB so far) 2018-09-24 18:10:07 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:10:07 WARN BlockManager:66 - Putting block rdd_3_8 failed res21: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN) |
NaNで失敗しているので、JavaのisNaN関数を使って、NaNを取り除くと、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
scala> import java.lang.Double.isNaN import java.lang.Double.isNaN scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats() [Stage 6:=======================> (4 + 4) / 10]2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:12:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:52 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:52 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 6:==============================================> (8 + 2) / 10]2018-09-24 18:12:55 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:12:55 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:55 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:12:55 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:12:55 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:12:55 WARN BlockManager:66 - Putting block rdd_3_9 failed res22: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) |
全部の変数をループで統計してみると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
scala> val stats = (0 until 9).map(i => { | parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() | }) [Stage 7:=======================> (4 + 4) / 10]2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:08 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:08 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:08 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:10 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:10 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:10 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 7:==============================================> (8 + 2) / 10]2018-09-24 18:16:11 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:11 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:11 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:11 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:11 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:11 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 8:=======================> (4 + 4) / 10]2018-09-24 18:16:14 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:14 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:14 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:14 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:14 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:14 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:15 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:15 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:15 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:15 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:15 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:15 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 8:==============================================> (8 + 2) / 10]2018-09-24 18:16:16 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:16 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:16 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:17 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:17 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:17 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 9:=======================> (4 + 4) / 10]2018-09-24 18:16:18 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:18 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:18 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:19 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:19 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:19 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:19 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:19 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:20 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:20 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:20 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 9:==============================================> (8 + 2) / 10]2018-09-24 18:16:21 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:21 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:21 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:21 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:21 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:21 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 10:======================> (4 + 4) / 10]2018-09-24 18:16:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:23 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:23 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:23 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:23 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:23 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:24 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:24 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:24 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:25 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:25 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:25 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 10:=============================================> (8 + 2) / 10]2018-09-24 18:16:26 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:26 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:26 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:16:26 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:26 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:26 WARN BlockManager:66 - Putting block rdd_3_8 failed [Stage 11:======================> (4 + 4) / 10]2018-09-24 18:16:28 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:28 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:28 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:28 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:28 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:28 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:29 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:29 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:29 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:29 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:29 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:29 WARN BlockManager:66 - Putting block rdd_3_1 failed [Stage 11:=============================================> (8 + 2) / 10]2018-09-24 18:16:31 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:31 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:31 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:31 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:31 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:31 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 12:======================> (4 + 4) / 10]2018-09-24 18:16:33 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:33 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:33 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:33 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:33 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:33 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:34 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:34 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:34 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:34 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:34 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:34 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 12:=============================================> (8 + 2) / 10]2018-09-24 18:16:36 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:36 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:36 WARN BlockManager:66 - Putting block rdd_3_9 failed 2018-09-24 18:16:36 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:36 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:36 WARN BlockManager:66 - Putting block rdd_3_8 failed [Stage 13:======================> (4 + 4) / 10]2018-09-24 18:16:38 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:38 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:38 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:38 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:38 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:38 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:39 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:39 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:39 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:39 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:39 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:39 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 13:=============================================> (8 + 2) / 10]2018-09-24 18:16:41 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:41 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:41 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:41 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:41 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:41 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 14:======================> (4 + 4) / 10]2018-09-24 18:16:43 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:43 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:43 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:43 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:43 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:43 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:44 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:44 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:44 WARN BlockManager:66 - Putting block rdd_3_5 failed 2018-09-24 18:16:44 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:44 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:44 WARN BlockManager:66 - Putting block rdd_3_6 failed [Stage 14:=============================================> (8 + 2) / 10]2018-09-24 18:16:46 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:46 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:46 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:46 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:46 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:46 WARN BlockManager:66 - Putting block rdd_3_9 failed [Stage 15:======================> (4 + 4) / 10]2018-09-24 18:16:48 WARN MemoryStore:66 - Not enough space to cache rdd_3_1 in memory! (computed 19.7 MB so far) 2018-09-24 18:16:48 WARN BlockManager:66 - Block rdd_3_1 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:48 WARN BlockManager:66 - Putting block rdd_3_1 failed 2018-09-24 18:16:48 WARN MemoryStore:66 - Not enough space to cache rdd_3_0 in memory! (computed 29.5 MB so far) 2018-09-24 18:16:48 WARN BlockManager:66 - Block rdd_3_0 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:48 WARN BlockManager:66 - Putting block rdd_3_0 failed 2018-09-24 18:16:49 WARN MemoryStore:66 - Not enough space to cache rdd_3_6 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:49 WARN BlockManager:66 - Block rdd_3_6 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:49 WARN BlockManager:66 - Putting block rdd_3_6 failed 2018-09-24 18:16:49 WARN MemoryStore:66 - Not enough space to cache rdd_3_5 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:49 WARN BlockManager:66 - Block rdd_3_5 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:49 WARN BlockManager:66 - Putting block rdd_3_5 failed [Stage 15:=============================================> (8 + 2) / 10]2018-09-24 18:16:52 WARN MemoryStore:66 - Not enough space to cache rdd_3_8 in memory! (computed 44.9 MB so far) 2018-09-24 18:16:52 WARN BlockManager:66 - Block rdd_3_8 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:52 WARN BlockManager:66 - Putting block rdd_3_8 failed 2018-09-24 18:16:53 WARN MemoryStore:66 - Not enough space to cache rdd_3_9 in memory! (computed 70.0 MB so far) 2018-09-24 18:16:53 WARN BlockManager:66 - Block rdd_3_9 could not be removed as it was not found on disk or in memory 2018-09-24 18:16:53 WARN BlockManager:66 - Putting block rdd_3_9 failed stats: scala.collection.immutable.IndexedSeq[org.apache.spark.util.StatCounter] = Vector((count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000), (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000), (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000), (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000,... scala> |
メモリが足りないよう!ドライバのローカルプロセスがメモリを使えるように起動時に2g指定で再度やり直し:
${SPARK_HOME}/bin/spark-shell –master local[*] –driver-memory 2g
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local[*] --driver-memory 2g 2018-09-24 18:21:12 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2018-09-24 18:21:20 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Spark context Web UI available at http://macbook-pro-5:4041 Spark context available as 'sc' (master = local[*], app id = local-1537780881289). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. |
途中から記録、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
scala> val matchCounts = parsed.map(md => md.matched).countByValue() matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201) scala> val matchCountSeq = matchCounts.toSeq matchCountSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201)) scala> import java.lang.Double.isNaN import java.lang.Double.isNaN scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats() res7: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) scala> val stats = (0 until 9).map(i => { | parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() | }) stats: scala.collection.immutable.IndexedSeq[org.apache.spark.util.StatCounter] = Vector((count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000), (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000), (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000), (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000), (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000), (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000,... scala> scala> stats(1) res8: org.apache.spark.util.StatCounter = (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) scala> stats(8) res9: org.apache.spark.util.StatCounter = (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000) |
無事、エラーは消失した。
上記プログラムでは、9回も繰り返してデータが処理されたため大変非効率であった。
そこで再利用可能な要約統計処理のコード作成
StatsWithMissing.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import org.apache.spark.util.StatCounter class NAStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var missing: Long = 0 def add(x: Double): NAStatCounter = { if (java.lang.Double.isNaN(x)) { missing += 1 } else { stats.merge(x) } this } def merge(other: NAStatCounter): NAStatCounter = { stats.merge(other.stats) missing += other.missing this } override def toString = { "stats: " + stats.toString + " NaN: " + missing } } object NAStatCounter extends Serializable { def apply(x: Double) = new NAStatCounter().add(x) } |
StatsWithMissing.scalaを/opt/spark-2.3.1-bin-hadoop2.7に置く。そしてloadする。
1 2 3 4 5 6 7 8 |
scala> :load StatsWithMissing.scala Loading StatsWithMissing.scala... import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD defined class NAStatCounter defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. |
NAStatCoutnerの例を示すと、
1 2 3 4 5 6 7 8 9 10 11 |
scala> val nas1 = NAStatCounter(10.0) nas1: NAStatCounter = stats: (count: 1, mean: 10.000000, stdev: 0.000000, max: 10.000000, min: 10.000000) NaN: 0 scala> nas1.add(2.1) res10: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 0 scala> val nas2 = NAStatCounter(Double.NaN) nas2: NAStatCounter = stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1 scala> nas1.merge(nas2) res11: NAStatCounter = stats: (count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000) NaN: 1 |
要するに、NAStatCounterのインスタンスは、(加えられた有効な数値の数、数値の平均値、標準偏差、最大値、最小値)と、NaNの数を保持し、NAStatCounterのインスタンス同士もmergeで合算できるという訳。
続けて、Arrayの数値要素をmap関数でNAStatCounterにアプライすると、それぞれの要素に対するNAStatCounterインスタンスを生成してくれる。
1 2 3 4 5 |
scala> val arr = Array(1.0, Double.NaN, 17.29) arr: Array[Double] = Array(1.0, NaN, 17.29) scala> val nas = arr.map(d => NAStatCounter(d)) nas: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 17.290000, stdev: 0.000000, max: 17.290000, min: 17.290000) NaN: 0) |
そこで、
各行データに対して、MatchDataを生成するparse()関数を用いて、以前に
scala> val parsed = noheader.map(line => parse(line))
と、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて、nasRDを生成させる。
1 2 3 4 |
scala> val nasRDD = parsed.map(md => { | md.scores.map(d => NAStatCounter(d)) | }) nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MapPartitionsRDD[38] at map at <console>:32 |
ここで、複数のNASataCounterインスタンスで構成されるArray nas1とnas2に対して、zip()関数で
全体を包んだのちに、merge()関数に当てはめ、合体できるが、
1 2 3 4 5 6 7 8 9 10 11 |
scala> val nas1 = Array(1.0, Double.NaN).map(d => NAStatCounter(d)) nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1) scala> val nas2 = Array(Double.NaN, 2.0).map(d => NAStatCounter(d)) nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0) scala> val merged = nas1.zip(nas2).map(p => p._1.merge(p._2)) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) scala> val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) } merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) |
複数のNASataCounterインスタンスを持つArray[NAStatCounter]のインスタンスをListに包んで、
List[Array[NAStatCounter]] のインスタンスとして、reduce()関数に当てはめ、merge()関数に送れば、それぞれのArray[NAStatCounter]要素ごとにmerge()を行うことができる。
1 2 3 4 5 6 7 |
scala> val nas = List(nas1, nas2) nas: List[Array[NAStatCounter]] = List(Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 2, stats: (count: 2, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1), Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)) scala> val merged = nas.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) } | }) merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 3, stats: (count: 3, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1) |
このreduce() => merge()作戦を、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて生成したnasRDDに当てはめれば、各MachDataごとに、merge()が適応されて、一気にArray[NAStatCounter]の全データに対する統計サマリーが取得できる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
scala> val reduced = nasRDD.reduce((n1, n2) => { | n1.zip(n2).map { case (a, b) => a.merge(b) } | }) reduced: Array[NAStatCounter] = Array(stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.... scala> reduced.foreach(println) stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007 stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434 stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668 stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000) NaN: 795 stats: (count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000) NaN: 12843 |
そこで、このreduce() => merge()のメカニズムをstatsWithMissing()関数としてStatsWithMissing.scalaに組み込む改良して、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD class NAStatCounter extends Serializable { val stats: StatCounter = new StatCounter() var missing: Long = 0 def add(x: Double): NAStatCounter = { if (java.lang.Double.isNaN(x)) { missing += 1 } else { stats.merge(x) } this } def merge(other: NAStatCounter): NAStatCounter = { stats.merge(other.stats) missing += other.missing this } override def toString = { "stats: " + stats.toString + " NaN: " + missing } } object NAStatCounter extends Serializable { def apply(x: Double) = new NAStatCounter().add(x) } def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = { val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => { val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d)) iter.foreach(arr => { nas.zip(arr).foreach { case (n, d) => n.add(d) } }) Iterator(nas) }) nastats.reduce((n1, n2) => { n1.zip(n2).map { case (a, b) => a.merge(b) } }) } |
改良版StatWithMissing.scalaをロードし直し、
1 2 3 4 5 6 7 8 9 10 11 |
scala> :load StatsWithMissing.scala Loading StatsWithMissing.scala... import org.apache.spark.util.StatCounter import org.apache.spark.rdd.RDD defined class NAStatCounter warning: previously defined object NAStatCounter is not a companion to class NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. defined object NAStatCounter warning: previously defined class NAStatCounter is not a companion to object NAStatCounter. Companions must be defined together; you may wish to use :paste mode for this. statsWithMissing: (rdd: org.apache.spark.rdd.RDD[Array[Double]])Array[NAStatCounter] |
MatchDataクラスの全データparsedに対して、matchedであったものの各スコアの統合統計データstatmと、matchedでなかったものの各スコアの統合統計データstatnを求めて、
1 2 3 4 5 |
scala> val statsm = statsWithMissing(parsed.filter(_.matched).map(_.scores)) statsm: Array[NAStatCounter] = Array(stats: (count: 20922, mean: 0.997316, stdev: 0.036506, max: 1.000000, min: 0.000000) NaN: 9, stats: (count: 1333, mean: 0.989890, stdev: 0.082489, max: 1.000000, min: 0.000000) NaN: 19598, stats: (count: 20931, mean: 0.997015, stdev: 0.043118, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 475, mean: 0.969370, stdev: 0.153291, max: 1.000000, min: 0.000000) NaN: 20456, stats: (count: 20931, mean: 0.987292, stdev: 0.112013, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 20925, mean: 0.997085, stdev: 0.053914, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.997945, stdev: 0.045285, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.996129, stdev: 0.062097, max: 1.000000, min: 0.000000) NaN: 6, stats: (cou... scala> val statsn = statsWithMissing(parsed.filter(!_.matched).map(_.scores)) statsn: Array[NAStatCounter] = Array(stats: (count: 5727203, mean: 0.711863, stdev: 0.389081, max: 1.000000, min: 0.000000) NaN: 998, stats: (count: 102365, mean: 0.898847, stdev: 0.272720, max: 1.000000, min: 0.000000) NaN: 5625836, stats: (count: 5728201, mean: 0.313138, stdev: 0.332281, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 1989, mean: 0.162955, stdev: 0.192975, max: 1.000000, min: 0.000000) NaN: 5726212, stats: (count: 5728201, mean: 0.954883, stdev: 0.207560, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5727412, mean: 0.221643, stdev: 0.415352, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.486995, stdev: 0.499831, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.219923, stdev: 0.414194, max: 1.000000, min: 0.00... |
データ数値ごとに、missingデータの合算数と、平均値の差を求めると、
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> statsm.zip(statsn).map { case(m, n) => | (m.missing + n.missing, m.stats.mean - n.stats.mean) | }.foreach(println) (1007,0.285452905746686) (5645434,0.09104268062279897) (0,0.6838772482597568) (5746668,0.8064147192926266) (0,0.03240818525033473) (795,0.7754423117834044) (795,0.5109496938298719) (795,0.7762059675300521) (12843,0.9563812499852178) |
となる。
この9つのデータは、もとものとデータがなんであったかを確認しておくと、
1 2 3 4 5 6 7 8 9 |
3. cmp_fname_c1: agreement of first name, first component 4. cmp_fname_c2: agreement of first name, second component 5. cmp_lname_c1: agreement of family name, first component 6. cmp_lname_c2: agreement of family name, second component 7. cmp_sex: agreement sex 8. cmp_bd: agreement of date of birth, day component 9. cmp_bm: agreement of date of birth, month component 10. cmp_by: agreement of date of birth, year component 11. cmp_plz: agreemen of postal code |
であり、
はじめのダウンロードしたデータのm_and_m.py.txtには、
1 2 3 4 5 6 7 8 9 10 11 |
Missing Attribute Values: cmp_fname_c1: 1007 cmp_fname_c2: 5645434 cmp_lname_c1: 0 cmp_lname_c2: 5746668 cmp_sex: 0 cmp_bd: 795 cmp_bm: 795 cmp_by: 795 cmp_plz: 12843 |
とすでに欠損データ数がまとめられており、自前で計算したものと合致していることがわかる。
結果を分析してみると、 cmp_fname_c2やcmp_lname_c2は、missingが多すぎて使い物にならないし、sexはmissingは皆無だが2群間での平均値の差が0.03と殆ど無い。2つの患者データが同一患者かどうかを判別するものには、以下の5項目を利用が適当ということがわかる。
1 2 3 4 5 |
2. 5. cmp_lname_c1: agreement of family name, first component 5 8. cmp_bd: agreement of date of birth, day component 6 9. cmp_bm: agreement of date of birth, month component 7 10. cmp_by: agreement of date of birth, year component 8 11. cmp_plz: agreemen of postal code |
では、最後に、NaNを0点として、この5つのパラメータ値を合算した値を評価係数として策定する。
まずは、NaNをすべて、0に置き換える関数naz()を定義し、MatchDataとscoreを引数とするScoredクラスとして作成し、
1 2 3 4 5 |
scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d naz: (d: Double)Double scala> case class Scored(md: MatchData, score:Double) defined class Scored |
全データのMatchData RDDであるparsedに適応させる。
1 2 3 4 5 |
scala> val ct = parsed.map(md => { | val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum | Scored(md, score) | }) ct: org.apache.spark.rdd.RDD[Scored] = MapPartitionsRDD[45] at map at <console>:46 |
このスコア値が4以上、あるいは2以上の場合に、md RDDでmatchedかどうかをcountByValue()でカウントさせる。
1 2 3 4 5 |
scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue() res17: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637) scala> ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue() res18: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414) |
その結果、4以上では、偽陽性は637件であるが、2まで減じると偽陽性が増えることが理解できる。
【まとめ】
600万近いデータだけど、データ構造は単純で、やっていることもIDが異なる患者が実は同一患者であることを、9つの因子でどの程度まで探索できるのかという課題に取り組み、最終的には5つの鑑別因子を選んで、合計値を識別子とした場合、どの程度の識別能と偽陽性が発生するのかをみる。難しい点は、データには、欠損値が多く含まれ、欠損値の処理を含めてどうしていくのかについてが、学習のポイントであった。このプロジェクトでは、機械学習のアルゴリズムはとくには使っていない。