Spark APIについて、学習をすすめる。参考資料は、第5章@Apache Spark入門, Shoeisha
ーーーーーーーーーーーーーーーーーーーーーーーーーーー
まずは、Spark-Shellをローカルモードで立ち上げる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ ${SPARK_HOME}/bin/spark-shell --master local 2018-09-22 13:07:15 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://************:4040 Spark context available as 'sc' (master = local, app id = local-1537589242625). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131) Type in expressions to have them evaluated. Type :help for more information. scala> |
続けてワードカウントの実装を進める。
まずはテキストファイル simple-words.txt
1 2 3 4 5 6 7 8 9 10 11 |
cat dog .org cat cat && tiger dog 100 tiger cat |
からRDDを生成する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
scala> val textRDD = sc.textFile("/Users/******/spark-test/simple-words.txt") textRDD: org.apache.spark.rdd.RDD[String] = /Users/*****/spark-test/simple-words.txt MapPartitionsRDD[1] at textFile at <console>:24 scala> val textArray = textRDD.collect textArray: Array[String] = Array(cat, dog, .org, cat, cat, &&, tiger, dog, 100, tiger, cat) scala> textArray.foreach(println) cat dog .org cat cat && tiger dog 100 tiger cat scala> |
次にRDDの要素をフィルタリングする。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
scala> val wordRDD = textRDD.filter(_.matches("""\p{Alnum}+""")) wordRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25 scala> val wordArray = wordRDD.collect wordArray: Array[String] = Array(cat, dog, cat, cat, tiger, dog, 100, tiger, cat) scala> wordArray.foreach(println) cat dog cat cat tiger dog 100 tiger cat scala> |
RDDの要素を加工する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
scala> val wordAndOnePairRDD = wordRDD.map(word => (word, 1)) wordAndOnePairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25 scala> val wordAndOnePairArray = wordAndOnePairRDD.collect wordAndOnePairArray: Array[(String, Int)] = Array((cat,1), (dog,1), (cat,1), (cat,1), (tiger,1), (dog,1), (100,1), (tiger,1), (cat,1)) scala> wordAndOnePairArray.foreach(println) (cat,1) (dog,1) (cat,1) (cat,1) (tiger,1) (dog,1) (100,1) (tiger,1) (cat,1) scala> |
RDDの要素をキーごとに集約処理する。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
scala> val wordAndCountRDD = wordAndOnePairRDD.reduceByKey((result, elem) => result + elem) wordAndCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25 scala> val wordAndCountArray = wordAndCountRDD.collect wordAndCountArray: Array[(String, Int)] = Array((tiger,2), (100,1), (dog,2), (cat,4)) scala> wordAndCountArray.foreach(println) (tiger,2) (100,1) (dog,2) (cat,4) scala> |
ネストしたコレクションをフラットにする。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
scala> val textRDD = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/README.md") textRDD: org.apache.spark.rdd.RDD[String] = /opt/spark-2.3.1-bin-hadoop2.7/README.md MapPartitionsRDD[8] at textFile at <console>:24 scala> val textArray = textRDD.collect textArray: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html)., This README file only contains basic setup instructions., "", ## Building Spark, "", Spark is built using [Apache Ma... scala> textArray.foreach(println) # Apache Spark Spark is a fast and general cluster computing system for Big Data. It provides ................ scala> val wordCandidateRDD = textRDD.flatMap(_.split("[ ,.]")) wordCandidateRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:25 scala> val wordCandidateArray = wordCandidateRDD.collect wordCandidateArray: Array[String] = Array(#, Apache, Spark, "", Spark, is, a, fast, and, general, cluster, computing, system, for, Big, Data, "", It, provides, high-level, APIs, in, Scala, "", Java, "", Python, "", and, R, "", and, an, optimized, engine, that, supports, general, computation, graphs, for, data, analysis, "", It, also, supports, a, rich, set, of, higher-level, tools, including, Spark, SQL, for, SQL, and, DataFrames, MLlib, for, machine, learning, "", GraphX, for, graph, processing, and, Spark, Streaming, for, stream, processing, "", <http://spark, apache, org/>, "", "", ##, Online, Documentation, "", You, can, find, the, latest, Spark, documentation, "", including, a, programming, guide, "", on, the, [project, web, page](http://spark, apache, org/documentation, html), Thi... scala> wordCandidateArray.foreach(println) # Apache Spark Spark is a fast .......... scala> val wordRDD = wordCandidateRDD.filter(_.matches("""\p{Alnum}+""")) wordRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:25 scala> val wordAndOnePairRDD = wordRDD.map((_, 1)) wordAndOnePairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:25 scala> val wordAndCountRDD = wordAndOnePairRDD.reduceByKey(_ + _) wordAndCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:25 scala> val wordAndCountArray = wordAndCountRDD.collect wordAndCountArray: Array[(String, Int)] = Array((package,3), (For,3), (Programs,1), (Because,1), (The,1), (its,1), (than,1), (guide,1), (APIs,1), (have,1), (Try,1), (computation,1), (through,1), (several,1), (This,2), (graph,1), (Hive,2), (storage,1), (To,2), (Once,1), (prefer,1), (SparkPi,2), (Data,1), (engine,1), (version,1), (file,1), (the,24), (URL,1), (tips,1), (are,1), (params,1), (not,1), (different,1), (refer,2), (Interactive,2), (if,4), (build,4), (given,1), (site,1), (be,2), (Tests,1), (when,1), (Apache,1), (thread,2), (including,4), (Versions,1), (HDFS,1), (Maven,2), (them,1), (Testing,1), (programming,1), (environment,1), (Developer,1), (Streaming,1), (clean,1), (rich,1), (GraphX,1), (Please,4), (is,6), (run,7), (R,1), (same,1), (on,7), (instructions,1), (built,2), (against,... scala> wordAndCountArray.foreach(println) (package,3) (For,3) (Programs,1) (Because,1) (The,1) ................. |