PySparkを用いたBig Dataの解析についての学習
—————————————
まずは、Python3.6でなくて、Python3.5をAnacondaの環境に以下のようにTerminalから命令して追加する。
1 |
> conda create -n py35 python=3.5 anaconda |
次に、Anaconda NavigatorからJupyter Notebookをpy35へインストールする。
.bash_profileにPATHを以下のように追加する
1 2 3 4 5 6 |
export PATH=/opt/spark-2.4.0-bin-hadoop2.7/bin:$PATH #export PATH=/opt/spark-1.5.0/bin:$PATH export PYSPARK_PYTHON=$HOME/anaconda3/envs/py35/bin/python3 export PYSPARK_DRIVER_PYTHON=$HOME/anaconda3/envs/py35/bin/jupyter #export PYSPARK_DRIVER_PYTHON_OPTS='notebook' pyspark export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False --NotebookApp.ip='*' --NotebookApp.port=8880" |
で、Sparkのフォルダ内から以下の命令で、
1 2 3 4 |
bin/pyspark The Jupyter Notebook is running at: [********* NotebookApp] http://127.0.0.1:8880/ |
でJupyter Notebook立ち上がり、PySparkを動かす。
始めの例題は、UCIの機械学習ライブラリーから、KDDCup 1999 Datasetをダウンロード。kddcup.data.gzは18.1 MB、kddcup.data.gzは、745.5 MBもある。概略は、
https://ntddk.github.io/2016/11/23/kdd-cup-99-data/
に記載されている。UCIの記述を機械翻訳すると、
“これは、第3回国際知識発見とデータマイニングツールコンペティションに使用され、KDD-99第5回知識発見とデータマイニングに関する国際会議と一緒に開催されました。競争の課題は、侵入や攻撃と呼ばれる「悪い」接続と「良い」通常の接続を区別できる予測モデルであるネットワーク侵入検知機能を構築することでした。このデータベースには、監査対象の標準的なデータセットが含まれています。これには、軍事ネットワーク環境でシミュレートされたさまざまな侵入が含まれています。”
侵入検知器の学習
ネットワークへの侵入を検出するソフトウェアは、おそらくインサイダーを含む不正ユーザーからコンピュータネットワークを保護します。侵入検知器学習タスクは、侵入または攻撃と呼ばれる「悪い」接続と、「良い」通常接続とを区別することができる予測モデル(すなわち分類器)を構築することである。
1998 DARPA侵入検知評価プログラムは、MIT Lincoln Labsによって作成および管理されています。その目的は、侵入検知の研究を調査し評価することでした。軍事ネットワーク環境でシミュレートされたさまざまな侵入を含む、監査対象の標準データセットが提供されました。 1999 KDD侵入検知コンテストは、このデータセットのバージョンを使用します。
Lincoln Labsは、典型的な米国空軍LANをシミュレートするローカルエリアネットワーク(LAN)用に9週間の生TCPダンプデータを取得するための環境を設定しました。彼らはあたかもそれが本当の空軍環境であるかのようにLANを操作しましたが、それを複数の攻撃でこじ開けました。
生のトレーニングデータは、7週間のネットワークトラフィックからの約4ギガバイトの圧縮バイナリTCPダンプデータでした。これは約500万の接続レコードに処理されました。同様に、2週間のテストデータから約200万件の接続記録が得られました。
接続とは、データが送信元IPアドレスと送信先IPアドレスの間で、送信先IPアドレスとの間で送受信されるTCPパケットのシーケンスのことです。各接続は、通常、または攻撃として分類され、厳密に1つの攻撃タイプがあります。各接続レコードは約100バイトで構成されています。
攻撃は4つの主なカテゴリに分類されます。
DOS:サービス拒否、例えばシンフラッド。
R2L:リモートマシンからの不正アクセスパスワードを推測します。
U2R:ローカルのスーパーユーザー(root)特権への不正アクセス。例えば、さまざまな “バッファオーバーフロー”攻撃。
プロービング:サーベイランスおよび他のプロービング、例えばポートスキャン。
テストデータはトレーニングデータと同じ確率分布からのものではなく、トレーニングデータに含まれていない特定の攻撃タイプが含まれていることに注意することが重要です。これにより、タスクがより現実的になります。侵入の専門家の中には、ほとんどの新種の攻撃が既知の攻撃の変種であり、既知の攻撃の「シグネチャ」で新種の変種を捉えるのに十分であると考える人もいます。データセットには、合計24種類のトレーニング攻撃タイプが含まれていますが、テストデータのみでさらに14種類が含まれています。
中身を覗いてみると、
1 2 3 4 5 6 7 8 9 10 11 12 13 |
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,238,1282,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,4,4,0.00,0.00,0.00,0.00,1.00,0.00,0.00,5,5,1.00,0.00,0.20,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,5,5,0.00,0.00,0.00,0.00,1.00,0.00,0.00,6,6,1.00,0.00,0.17,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,234,1364,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,7,7,1.00,0.00,0.14,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,239,1295,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,7,7,0.00,0.00,0.00,0.00,1.00,0.00,0.00,8,8,1.00,0.00,0.12,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,184,124,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,10,10,1.00,0.00,0.10,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,185,9020,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,11,11,1.00,0.00,0.09,0.00,0.00,0.00,0.00,0.00,normal. ....................... |
列に特徴名を振ると、
1 2 3 4 5 6 7 8 9 10 |
col_names = ["duration","protocol_type","service","flag","src_bytes", ... "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins", ... "logged_in","num_compromised","root_shell","su_attempted","num_root", ... "num_file_creations","num_shells","num_access_files","num_outbound_cmds", ... "is_host_login","is_guest_login","count","srv_count","serror_rate", ... "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate", ... "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count", ... "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate", ... "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate", ... "dst_host_rerror_rate","dst_host_srv_rerror_rate","label"] |
1 2 3 4 5 6 7 8 9 |
In [1]: sc Out [1]: SparkContext Spark UI Version v2.4.0 Master local[*] AppName PySparkShell |
1 |
In [2]: raw_data = sc.textFile("/Users/********/kddcup.data.gz") |
1 |
In [3]: from time import time |
1 2 |
In [4]: sampled = raw_data.sample(False, 0.1, 42) contains_normal_sample = sampled.map(lambda x: x.split(",")).filter(lambda x: "normal." in x) |
1 2 3 |
In [5]: t0 = time() num_sampled = contains_normal_sample.count() duration = time() - t0 |
1 2 |
In [6]: num_sampled Out [6]: 97404 |
1 2 |
In [7]: duration Out [7]: 13.587102890014648 |
1 2 3 4 |
In [8]: contains_normal = raw_data.map(lambda x: x.split(",")).filter(lambda x: "normal." in x) t0 = time() num_sampled = contains_normal.count() duration = time() - t0 |
1 2 |
In [9]: num_sampled Out [9]: 972781 |
1 2 |
In [10]: duration Out [10]: 28.32080101966858 |
1 2 |
In [11]: data_in_memory = raw_data.takeSample(False, 10, 42) contains_normal_py = [line.split(",") for line in data_in_memory if "normal." in line] |
1 2 3 4 5 6 7 8 9 10 11 |
In [12]: data_in_memory Out [12]:['0,udp,private,SF,105,147,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,253,0.99,0.01,0.00,0.00,0.00,0.00,0.00,0.00,normal.', '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,509,509,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.', '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,207,12,1.00,1.00,0.00,0.00,0.06,0.06,0.00,255,12,0.05,0.06,0.00,0.00,1.00,1.00,0.00,0.00,neptune.', '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,192,11,1.00,1.00,0.00,0.00,0.06,0.05,0.00,255,11,0.04,0.05,0.00,0.00,1.00,1.00,0.00,0.00,neptune.', '0,tcp,efs,REJ,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,69,1,0.00,0.00,1.00,1.00,0.01,0.07,0.00,255,1,0.00,0.07,0.00,0.00,0.00,0.00,1.00,1.00,neptune.', '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.', '0,tcp,private,RSTO,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,238,15,0.00,0.00,1.00,1.00,0.06,0.07,0.00,255,15,0.06,0.08,0.00,0.00,0.00,0.00,1.00,1.00,neptune.', '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,138,18,1.00,1.00,0.00,0.00,0.13,0.05,0.00,255,18,0.07,0.05,0.00,0.00,1.00,1.00,0.00,0.00,neptune.', '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.', '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,200,11,1.00,1.00,0.00,0.00,0.05,0.05,0.00,255,11,0.04,0.06,0.00,0.00,1.00,1.00,0.00,0.00,neptune.'] |
1 2 |
In [13]: len(contains_normal_py) Out [13]: 1 |
1 |
In [14]: normal_sample = sampled.filter(lambda line: "normal." in line) |
1 |
In [15]: non_normal_sample = sampled.subtract(normal_sample) |
1 2 |
In [16]: sampled.count() Out [16]: 490705 |
1 2 |
In [17]: normal_sample.count() Out [17]: 97404 |
1 2 |
In [18]: non_normal_sample.count() Out [18]: 393301 |
1 |
In [19]: feature_1 = sampled.map(lambda line: line.split(",")).map(lambda features: features[1]).distinct() |
1 |
In [20]: feature_2 = sampled.map(lambda line: line.split(",")).map(lambda features: features[2]).distinct() |
1 2 |
In [21]:f1 = feature_1.collect() f2 = feature_2.collect() |
1 2 |
In [22]:f1 Out [22]: ['icmp', 'udp', 'tcp'] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
In [23]:f2 Out [23]:['finger', 'http', 'netbios_dgm', 'name', 'hostnames', 'vmnet', 'systat', 'shell', 'netstat', 'netbios_ssn', 'urh_i', 'smtp', 'ctf', 'domain', 'mtp', 'remote_job', 'exec', 'supdup', 'http_443', 'urp_i', 'pop_2', 'csnet_ns', 'klogin', 'whois', 'ldap', 'daytime', 'imap4', 'nntp', 'rje', 'IRC', 'link', 'uucp', 'tftp_u', 'iso_tsap', 'uucp_path', 'auth', 'ecr_i', 'other', 'domain_u', 'courier', 'discard', 'red_i', 'tim_i', 'time', 'login', 'ftp', 'pop_3', 'telnet', 'ntp_u', 'sql_net', 'X11', 'private', 'gopher', 'efs', 'bgp', 'ftp_data', 'nnsp', 'ssh', 'sunrpc', 'eco_i', 'Z39_50', 'kshell', 'echo', 'netbios_ns', 'pm_dump', 'printer'] |
1 2 |
In [24]:len(feature_1.cartesian(feature_2).collect()) Out [24]: 198 |