自前のデータでSparkに挑戦してみる。
データは、昇圧剤の使用の有無と他の術前因子の相関を見る24010件のコード化した過去4年の麻酔データ
—————————————————–
まずは、エクセルデータファイルのデータ#N/Aを空白にして、csv(UTF-8)ファイルとしてHDFSに移す。Sparkを立ち上げて:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
MacBook-Pro-5:~ $ ${SPARK_HOME}/bin/spark-shell --master local Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/11/14 10:17:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/11/14 10:17:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 18/11/14 10:17:15 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://***.***.***.***:4041 Spark context available as 'sc' (master = local, app id = local-1542158227418). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@20e9c165 scala> val rawblocks = sc.textFile("hdfs://localhost/user/*******/ds/stats_codes4.csv") rawblocks: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/********/ds/stats_codes4.csv MapPartitionsRDD[1] at textFile at <console>:24 scala> rawblocks.first res1: String = case_no,診療科コード,手術室コード,申し込み区分コード,麻酔方法コード,ASA分類コード,年齢月,年齢日,性別コード,身長,体重,年齢年,BMI,年齢区分コード,入室時刻コード,麻酔開始時刻コード,予定手術時間,手術部位コード,体位コード,塩酸エフェドリン注40mgの投与回数,エホチール注10mgの投与回数,ネオシネジン注1mgの投与回数,昇圧剤,昇圧剤BL scala> val head = rawblocks.take(10) head: Array[String] = Array(case_no,診療科コード,手術室コード,申し込み区分コード,麻酔方法コード,ASA分類コード,年齢月,年齢日,性別コード,身長,体重,年齢年,BMI,年齢区分コード,入室時刻コード,麻酔開始時刻コード,予定手術時間,手術部位コード,体位コード,塩酸エフェドリン注40mgの投与回数,エホチール注10mgの投与回数,ネオシネジン注1mgの投与回数,昇圧剤,昇圧剤BL, 1,15,11,1,2,2,6,29,0,170.5,63.6,60,21.88,7,8,9,720 ,24,5,0,0,8,1,TRUE, 2,19,6,1,2,1,9,2,1,155.2,60.8,45,25.24,5,8,9,300 ,26,5,1,0,0,1,TRUE, 3,15,1,4,3,7,2,17,1,160,52.3,40,20.43,5,0,0,,37,5,0,0,0,0,FALSE, 4,15,11,1,2,2,1,14,0,162,46.6,78,17.76,8,8,8,840 ,22,5,7,0,0,1,TRUE, 5,17,6,1,0,0,2,4,0,173,69.7,71,23.29,8,8,9,45 ,0,5,0,0,0,0,FALSE, 6,8,13,1,3,2,6,11,0,166.7,61.3,82,22.06,9,10,10,60 ,5,5,0,0,7,1,TRUE, 7,9,5,1,5,2,3,24,0,167.3,51.65,81,18.45,9,8,8,450 ,5,5,1,0,0,1,TRUE, 8,14,11,2,3,3,0,27,1,,3.43,0,,1,9,9,450 ,14,5,0,0,3,1,TRUE, 9,19,7,1,3,2,11,5,0,168,78.1,74,27.67,8,8,8,... scala> head.length res2: Int = 10 scala> head.foreach(println) case_no,診療科コード,手術室コード,申し込み区分コード,麻酔方法コード,ASA分類コード,年齢月,年齢日,性別コード,身長,体重,年齢年,BMI,年齢区分コード,入室時刻コード,麻酔開始時刻コード,予定手術時間,手術部位コード,体位コード,塩酸エフェドリン注40mgの投与回数,エホチール注10mgの投与回数,ネオシネジン注1mgの投与回数,昇圧剤,昇圧剤BL 1,15,11,1,2,2,6,29,0,170.5,63.6,60,21.88,7,8,9,720 ,24,5,0,0,8,1,TRUE 2,19,6,1,2,1,9,2,1,155.2,60.8,45,25.24,5,8,9,300 ,26,5,1,0,0,1,TRUE 3,15,1,4,3,7,2,17,1,160,52.3,40,20.43,5,0,0,,37,5,0,0,0,0,FALSE 4,15,11,1,2,2,1,14,0,162,46.6,78,17.76,8,8,8,840 ,22,5,7,0,0,1,TRUE 5,17,6,1,0,0,2,4,0,173,69.7,71,23.29,8,8,9,45 ,0,5,0,0,0,0,FALSE 6,8,13,1,3,2,6,11,0,166.7,61.3,82,22.06,9,10,10,60 ,5,5,0,0,7,1,TRUE 7,9,5,1,5,2,3,24,0,167.3,51.65,81,18.45,9,8,8,450 ,5,5,1,0,0,1,TRUE 8,14,11,2,3,3,0,27,1,,3.43,0,,1,9,9,450 ,14,5,0,0,3,1,TRUE 9,19,7,1,3,2,11,5,0,168,78.1,74,27.67,8,8,8,180 ,7,5,0,0,0,0,FALSE scala> def isHeader(line: String) = line.contains("case_no") isHeader: (line: String)Boolean scala> def isHeader(line: String): Boolean = { | line.contains("case_no") | } isHeader: (line: String)Boolean scala> head.filter(isHeader).foreach(println) case_no,診療科コード,手術室コード,申し込み区分コード,麻酔方法コード,ASA分類コード,年齢月,年齢日,性別コード,身長,体重,年齢年,BMI,年齢区分コード,入室時刻コード,麻酔開始時刻コード,予定手術時間,手術部位コード,体位コード,塩酸エフェドリン注40mgの投与回数,エホチール注10mgの投与回数,ネオシネジン注1mgの投与回数,昇圧剤,昇圧剤BL scala> head.filterNot(isHeader).length res5: Int = 9 scala> head.filter(x => !isHeader(x)).length res6: Int = 9 scala> head.filter(!isHeader(_)).length res7: Int = 9 scala> val noheader =rawblocks.filter(x => !isHeader(x)) noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:28 scala> noheader.first res8: String = 1,15,11,1,2,2,6,29,0,170.5,63.6,60,21.88,7,8,9,720 ,24,5,0,0,8,1,TRUE scala> head(3) res9: String = 3,15,1,4,3,7,2,17,1,160,52.3,40,20.43,5,0,0,,37,5,0,0,0,0,FALSE scala> val line = head(3) line: String = 3,15,1,4,3,7,2,17,1,160,52.3,40,20.43,5,0,0,,37,5,0,0,0,0,FALSE scala> val pieces = line.split(',') pieces: Array[String] = Array(3, 15, 1, 4, 3, 7, 2, 17, 1, 160, 52.3, 40, 20.43, 5, 0, 0, "", 37, 5, 0, 0, 0, 0, FALSE) scala> val id1 = pieces(0).toInt id1: Int = 3 scala> val matched = pieces(23).toBoolean matched: Boolean = false scala> val rowscores = pieces.slice(1,23) rowscores: Array[String] = Array(15, 1, 4, 3, 7, 2, 17, 1, 160, 52.3, 40, 20.43, 5, 0, 0, "", 37, 5, 0, 0, 0, 0) scala> rawscores.map(s => s.toDouble) <console>:24: error: not found: value rawscores rawscores.map(s => s.toDouble) ^ scala> val rawscores = pieces.slice(1,23) rawscores: Array[String] = Array(15, 1, 4, 3, 7, 2, 17, 1, 160, 52.3, 40, 20.43, 5, 0, 0, "", 37, 5, 0, 0, 0, 0) scala> rawscores.map(s => s.toDouble) java.lang.NumberFormatException: empty String at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at $anonfun$1.apply(<console>:35) at $anonfun$1.apply(<console>:35) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ... 48 elided scala> def toDouble(s: String) = { | if ("".equals(s)) Double.NaN else s.toDouble | } toDouble: (s: String)Double scala> val scores = rawscores.map(toDouble) scores: Array[Double] = Array(15.0, 1.0, 4.0, 3.0, 7.0, 2.0, 17.0, 1.0, 160.0, 52.3, 40.0, 20.43, 5.0, 0.0, 0.0, NaN, 37.0, 5.0, 0.0, 0.0, 0.0, 0.0) scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val scores = pieces.slice(1,23).map(toDouble) | val matched = pieces(23).toBoolean | (id1, scores, matched) | } parse: (line: String)(Int, Array[Double], Boolean) scala> val tup = parse(line) tup: (Int, Array[Double], Boolean) = (3,Array(15.0, 1.0, 4.0, 3.0, 7.0, 2.0, 17.0, 1.0, 160.0, 52.3, 40.0, 20.43, 5.0, 0.0, 0.0, NaN, 37.0, 5.0, 0.0, 0.0, 0.0, 0.0),false) scala> tup._1 res12: Int = 3 scala> tup.productElement(0) res13: Any = 3 scala> tup.productArity res14: Int = 3 scala> case class MatchData(id1: Int, scores: Array[Double], matched: Boolean) defined class MatchData scala> def parse(line: String) = { | val pieces = line.split(',') | val id1 = pieces(0).toInt | val scores = pieces.slice(1,23).map(toDouble) | val matched = pieces(23).toBoolean | MatchData(id1, scores, matched) | } parse: (line: String)MatchData scala> val md = parse(line) md: MatchData = MatchData(3,[D@74121a9,false) scala> md.matched res15: Boolean = false scala> md.id1 res16: Int = 3 scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x)) mds: Array[MatchData] = Array(MatchData(1,[D@1d7dca19,true), MatchData(2,[D@5bc776e2,true), MatchData(3,[D@5bf54c2c,false), MatchData(4,[D@7d3abf88,true), MatchData(5,[D@ee49f34,false), MatchData(6,[D@6e74986c,true), MatchData(7,[D@1f112da2,true), MatchData(8,[D@377f0737,true), MatchData(9,[D@1dc823d3,false)) scala> val parsed = noheader.map(line => parse(line)) parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[3] at map at <console>:50 scala> parsed.cache() res17: parsed.type = MapPartitionsRDD[3] at map at <console>:50 scala> val grouped = mds.groupBy(md => md.matched) grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(false -> Array(MatchData(3,[D@5bf54c2c,false), MatchData(5,[D@ee49f34,false), MatchData(9,[D@1dc823d3,false)), true -> Array(MatchData(1,[D@1d7dca19,true), MatchData(2,[D@5bc776e2,true), MatchData(4,[D@7d3abf88,true), MatchData(6,[D@6e74986c,true), MatchData(7,[D@1f112da2,true), MatchData(8,[D@377f0737,true))) scala> grouped.mapValues(x => x.size).foreach(println) (false,3) (true,6) scala> val parsed = noheader.map(line => parse(line)) parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[22] at map at <console>:36 scala> parsed.first res82: MatchData = MatchData(1,[D@a946954,true) scala> parsed.cache() res83: parsed.type = MapPartitionsRDD[22] at map at <console>:36 scala> val matchCounts = parsed.map(md => md.matched).countByValue() matchCounts: scala.collection.Map[Boolean,Long] = Map(false -> 11136, true -> 12874) scala> val matchCountsSeq = matchCounts.toSeq matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((false,11136), (true,12874)) scala> matchCountsSeq.sortBy(_._1).foreach(println) (false,11136) (true,12874) scala> matchCountsSeq.sortBy(_._2).foreach(println) (false,11136) (true,12874) scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println) (true,12874) (false,11136) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
scala> parsed.map(md => md.scores(0)).stats() res88: org.apache.spark.util.StatCounter = (count: 24010, mean: 13.743982, stdev: 6.592062, max: 26.000000, min: 0.000000) scala> import java.lang.Double.isNaN import java.lang.Double.isNaN scala> parsed.map(md => md.scores(1)).stats() res89: org.apache.spark.util.StatCounter = (count: 24010, mean: NaN, stdev: NaN, max: NaN, min: NaN) scala> parsed.map(md => md.scores(1)).filter(!isNaN(_)).stats() res90: org.apache.spark.util.StatCounter = (count: 24000, mean: 6.987208, stdev: 3.946544, max: 14.000000, min: 0.000000) scala> val stats = (0 until 9).map(i => { | parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() | }) stats: scala.collection.immutable.IndexedSeq[org.apache.spark.util.StatCounter] = Vector((count: 24010, mean: 13.743982, stdev: 6.592062, max: 26.000000, min: 0.000000), (count: 24000, mean: 6.987208, stdev: 3.946544, max: 14.000000, min: 0.000000), (count: 24010, mean: 1.378176, stdev: 0.873797, max: 4.000000, min: 1.000000), (count: 24010, mean: 2.965889, stdev: 0.791539, max: 9.000000, min: 0.000000), (count: 24009, mean: 2.127161, stdev: 1.637918, max: 12.000000, min: 0.000000), (count: 24010, mean: 5.421824, stdev: 3.477432, max: 11.000000, min: 0.000000), (count: 24010, mean: 14.662432, stdev: 8.833673, max: 30.000000, min: 0.000000), (count: 24010, mean: 0.502291, stdev: 0.499995, max: 1.000000, min: 0.000000), (count: 23641, mean: 149.161181, stdev: 36.153062, max: 999.000000, m... scala> stats.foreach(println) (count: 24010, mean: 13.743982, stdev: 6.592062, max: 26.000000, min: 0.000000) (count: 24000, mean: 6.987208, stdev: 3.946544, max: 14.000000, min: 0.000000) (count: 24010, mean: 1.378176, stdev: 0.873797, max: 4.000000, min: 1.000000) (count: 24010, mean: 2.965889, stdev: 0.791539, max: 9.000000, min: 0.000000) (count: 24009, mean: 2.127161, stdev: 1.637918, max: 12.000000, min: 0.000000) (count: 24010, mean: 5.421824, stdev: 3.477432, max: 11.000000, min: 0.000000) (count: 24010, mean: 14.662432, stdev: 8.833673, max: 30.000000, min: 0.000000) (count: 24010, mean: 0.502291, stdev: 0.499995, max: 1.000000, min: 0.000000) (count: 23641, mean: 149.161181, stdev: 36.153062, max: 999.000000, min: 0.000000) (count: 23746, mean: 51.093369, stdev: 28.436763, max: 999.000000, min: 0.000000) (count: 24010, mean: 48.042357, stdev: 27.279266, max: 100.000000, min: 0.000000) (count: 23637, mean: 55.244923, stdev: 2936.249415, max: 324000.000000, min: 0.600000) (count: 24010, mean: 5.325823, stdev: 2.692657, max: 9.000000, min: 0.000000) (count: 24010, mean: 10.458309, stdev: 3.053219, max: 23.000000, min: 0.000000) (count: 24010, mean: 10.532778, stdev: 3.057840, max: 23.000000, min: 0.000000) (count: 23907, mean: 198.328105, stdev: 145.685223, max: 915.000000, min: 5.000000) (count: 24010, mean: 23.174177, stdev: 13.090623, max: 41.000000, min: 0.000000) (count: 24010, mean: 4.863848, stdev: 0.697348, max: 6.000000, min: 0.000000) (count: 23989, mean: 1.233315, stdev: 2.014880, max: 24.000000, min: 0.000000) (count: 23989, mean: 0.041352, stdev: 0.465715, max: 28.000000, min: 0.000000) (count: 23989, mean: 1.413731, stdev: 3.443031, max: 46.000000, min: 0.000000) (count: 24010, mean: 0.536193, stdev: 0.498688, max: 1.000000, min: 0.000000) |