分散並行処理Sparkについての学習を「分散平行処理」として少しずつ開始
ーーーーーーーーーーーーーーーーーーーーーーーーーーー
まずは、http://spark.apache.orgから、spark-2.3.1-bin-hadoop2.7.tgzをダウンロード。conf/log4j.propertiesのtemplateから、”log4j.rootCategory=WARN, console”として、WARNレベル以上のメッセージ制限を入れて、Spark-Shellを起動してみる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ bin/spark-shell 18/09/17 17:48:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://**.**.**.***:4040 Spark context available as 'sc' (master = local[*], app id = local-1537174143588). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131) Type in expressions to have them evaluated. Type :help for more information. scala> |
なんだか、WARNといっぱい出現したが、無視して、はじめのScalaプログラムで、README.mdの行数カウンターを動かしてみる。
1 2 3 4 5 6 7 8 |
scala> val lines = sc.textFile("README.md") lines: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> lines.count() res0: Long = 103 scala> lines.first() res1: String = # Apache Spark |
ここでは、linesというRDD(Resilient Distributed Datasets)耐障害性分散データセットが生成されて、それに対して色々な並行処理を行うことができるそうな。ついでに立ち上がったときに表示されるように、Spark context Web UI available at http://**.**.**.***:4040のSpark UIにアクセスすれば、タスクやクラスタに対する情報が見れる。
Sparkの中核となっている概念:
クラスタ上で複数の並列処理を起動するのがドライバプログラムであり、spark-shellそのものである。ドライバプログラムは、シェルを通じてscという変数で自動生成されるSparkContextオブジェクトを通じてSparkにアクセスする。
1 2 |
scala> sc res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@593354fa |
さきの例では、scにtextFile()を送って、テキストの行を表すRDDを生成した。ドライパプログラムは、エクゼキュータと呼ばれるノードの数を管理している。ここでは、ローカルマシンで単一の処理で行われたが、同じシェルをクラスタ接続することによって、データを並行分析できるそうな。先程のプログラムを少し修正して、”Python”という単語を含む最初の行の表示を行う。
1 2 3 4 5 6 7 8 |
scala> val lines = sc.textFile("README.md") lines: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at <console>:24 scala> val pythonlines = lines.filter(line => line.contains("Python")) pythonlines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:25 scala> pythonlines.first() res3: String = high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
ここでfilter()というような関数ベースの操作も、クラスタに渡って並行処理される。単一のドライバプログラムにコードを書けば、その一部が自動で複数のノード上で実行される。
難解!