Advanced Analytics from Spark #2-2

前回の続き:
${SPARK_HOME}/bin/spark-shell –master local[*]と[*]指定すれば、自分のマシンのCPUのコア数に合わせた、スレッドがローカルクラスタで使われるとのこと。私のMacBook ProはCore7iの2コア。

まずは、SparkContextのscとそのメソッドをリストしてみる。

次にSparkContext scに対して、耐障害性分散データ・セットRDD (Resilient Distributted Dataset) を生成するメソッドの中から、テキストファイルに保存された1行1データへの参照をRDDにとして得るためのtextFileメソッドを用いて、先にHadoopでHDFSのlinkageフォルダに置いたUCIデータbloc_*.csvを指定し、変数rawblocksへ

RDDの先頭要素をfisrtメソッドで返さすと、

RDDの最初の10行を読み取ると、

ヘッダ行の指定を関数を使って行うと、

ヘッダ以外の行数を求める様々なScalaの記述方法は、

クラスタ上データ全体に対して、ヘッダを除くフィルタリングかけたnoheader RDDを作成し、先頭行を確かめてみると、

次に、行内のデータを取り出して、配列に収める作業:5行目を取り出して、

コンマでラインを分割し、

”?”が含まれているため、そのままではエラーが、

”?”をNaNに変換することと、数字をダブルとして変換する関数:

全部、関数にまとめると、

名前で管理できるようにする:

ヘッド行以外の先頭9行をMatchDataにアプライ:

ヘッド行以外の先頭9行をグループ化

いよいよ大規模集計、ヒストグラムの作成

WARNがいっぱい出たが、trueが20931、falseが5728201、であった。
Seq型を利用して、得られた結果をソートすると、

Scoreの最初の値をparseしてみると、

NaNで失敗しているので、JavaのisNaN関数を使って、NaNを取り除くと、

全部の変数をループで統計してみると、

メモリが足りないよう!ドライバのローカルプロセスがメモリを使えるように起動時に2g指定で再度やり直し:
${SPARK_HOME}/bin/spark-shell –master local[*] –driver-memory 2g

途中から記録、

無事、エラーは消失した。
上記プログラムでは、9回も繰り返してデータが処理されたため大変非効率であった。
そこで再利用可能な要約統計処理のコード作成
StatsWithMissing.scala

StatsWithMissing.scalaを/opt/spark-2.3.1-bin-hadoop2.7に置く。そしてloadする。

NAStatCoutnerの例を示すと、

要するに、NAStatCounterのインスタンスは、(加えられた有効な数値の数、数値の平均値、標準偏差、最大値、最小値)と、NaNの数を保持し、NAStatCounterのインスタンス同士もmergeで合算できるという訳。
続けて、Arrayの数値要素をmap関数でNAStatCounterにアプライすると、それぞれの要素に対するNAStatCounterインスタンスを生成してくれる。

そこで、
各行データに対して、MatchDataを生成するparse()関数を用いて、以前に
scala> val parsed = noheader.map(line => parse(line))
と、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて、nasRDを生成させる。

ここで、複数のNASataCounterインスタンスで構成されるArray nas1とnas2に対して、zip()関数で
全体を包んだのちに、merge()関数に当てはめ、合体できるが、

複数のNASataCounterインスタンスを持つArray[NAStatCounter]のインスタンスをListに包んで、
List[Array[NAStatCounter]] のインスタンスとして、reduce()関数に当てはめ、merge()関数に送れば、それぞれのArray[NAStatCounter]要素ごとにmerge()を行うことができる。

このreduce() => merge()作戦を、noheeader全データにをMatchDataに変換したparsed RDDに対して、NAStatCounterを適応させて生成したnasRDDに当てはめれば、各MachDataごとに、merge()が適応されて、一気にArray[NAStatCounter]の全データに対する統計サマリーが取得できる。

そこで、このreduce() => merge()のメカニズムをstatsWithMissing()関数としてStatsWithMissing.scalaに組み込む改良して、

改良版StatWithMissing.scalaをロードし直し、

MatchDataクラスの全データparsedに対して、matchedであったものの各スコアの統合統計データstatmと、matchedでなかったものの各スコアの統合統計データstatnを求めて、

データ数値ごとに、missingデータの合算数と、平均値の差を求めると、

となる。
この9つのデータは、もとものとデータがなんであったかを確認しておくと、

であり、
はじめのダウンロードしたデータのm_and_m.py.txtには、

とすでに欠損データ数がまとめられており、自前で計算したものと合致していることがわかる。
結果を分析してみると、 cmp_fname_c2やcmp_lname_c2は、missingが多すぎて使い物にならないし、sexはmissingは皆無だが2群間での平均値の差が0.03と殆ど無い。2つの患者データが同一患者かどうかを判別するものには、以下の5項目を利用が適当ということがわかる。

では、最後に、NaNを0点として、この5つのパラメータ値を合算した値を評価係数として策定する。
まずは、NaNをすべて、0に置き換える関数naz()を定義し、MatchDataとscoreを引数とするScoredクラスとして作成し、

全データのMatchData RDDであるparsedに適応させる。

このスコア値が4以上、あるいは2以上の場合に、md RDDでmatchedかどうかをcountByValue()でカウントさせる。

その結果、4以上では、偽陽性は637件であるが、2まで減じると偽陽性が増えることが理解できる。

【まとめ】
600万近いデータだけど、データ構造は単純で、やっていることもIDが異なる患者が実は同一患者であることを、9つの因子でどの程度まで探索できるのかという課題に取り組み、最終的には5つの鑑別因子を選んで、合計値を識別子とした場合、どの程度の識別能と偽陽性が発生するのかをみる。難しい点は、データには、欠損値が多く含まれ、欠損値の処理を含めてどうしていくのかについてが、学習のポイントであった。このプロジェクトでは、機械学習のアルゴリズムはとくには使っていない。