台風24号ヒット! この莫大な破壊的エネルギーをプラス方向に利用できないのだろうか?
こちらはとにかくSparkするということで、Advanced Analytics from Spark第二章:交互最小二乗法を用いた協調フィルタリング:要するにrecommenderのアルゴリズム
—————————————————–
まずは、もととなるlast.fmの音楽情報ファイルのダウンロードと、HDFSへのコピー。
たいていリンクが壊れているのだが、幸い以下のサイトからデータファイルをダウンロード
http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
必要なファイルは以下の3つ:
user_artist_data.txt 426.8M
artist_data.txt     56M
artist_alias.txt  2.9M
巨大なテキストファイル!
ファイルの内容は、README.txtによると
user_artist_data.txt
    3 columns: userid artistid playcount
artist_data.txt
    2 columns: artistid artist_name
artist_alias.txt
    2 columns: badid, goodid
    known incorrectly spelt artists and the correct artist id.
    you can correct errors in user_artist_data as you read it in using this file
    (we’re not yet finished merging this data)
user_artist_data.txtは、UserID ?tab ArtistID ?tab 再生回数
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14  | 
						1000002 1 55 1000002 1000006 33 1000002 1000007 8 1000002 1000009 144 1000002 1000010 314 1000002 1000013 8 1000002 1000014 42 1000002 1000017 69 1000002 1000024 329 1000002 1000025 1 1000002 1000028 17 1000002 1000031 47 1000002 1000033 15 .........  | 
					
artist_data.txtは、ArtistID ?tab Artist名
| 
					 1 2 3 4 5 6 7 8 9 10  | 
						1134999	06Crazy Life 6821360	Pang Nakarin 10113088	Terfel, Bartoli- Mozart: Don 10151459	The Flaming Sidebur 6826647	Bodenstandig 3000 10186265	Jota Quest e Ivete Sangalo 6828986	Toto_XX (1977 10236364	U.S Bombs - 1135000	artist formaly know as Mat ........  | 
					
artisit_alias.txtには、どうやら同じアーティストの異なるIDの接続リストbadid, goodidのようだ。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13  | 
						1092764	1000311 1095122	1000557 6708070	1007267 10088054	1042317 1195917	1042317 1112006	1000557 1187350	1294511 1116694	1327092 6793225	1042317 1079959	1000557 6789612	1000591 1262241	1000591 ..............  | 
					
単純なデータだが、巨大なテキストファイルである。
この3つのテキストファイル、つまりユーザーがどのアーティストの曲を何回選択したかという単純な情報をもとに、機械学習をさせて、あるユーザーに推奨するアーティスト名を返すというコードを構築する。
Sparkの機械学ライブラリMLlibのAlternating Least Squares(ALS)を用いて構築するわけだが、
作業としては、artisit_alias.txtを用いて間違ったIDを正しいID(finalArtistID)に置き換えることと、ALSのアルゴリズムに放り込むために、Ratingオブジェクト(userID, finalArtistID, count)に変換する。かつ、Artist IDリストを放り込めば、アーティスト名が返されるようにしたいという訳。
————————————–
まずは、巨大なテキストファイルを、HadoopのHDFSに移す。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21  | 
						MacBook-Pro-5:spark-2.3.1-bin-hadoop2.7 $ cd /usr/local/Cellar/hadoop/3.1.1/ MacBook-Pro-5:3.1.1 $ cd bin MacBook-Pro-5:bin $ ./hadoop fs -ls / Found 2 items drwxr-xr-x   - ******* supergroup          0 2018-09-24 00:15 /linkage drwxr-xr-x   - ******* supergroup          0 2018-09-23 17:46 /user MacBook-Pro-5:bin $ ./hadoop fs -cd user MacBook-Pro-5:bin $ ./hadoop fs -ls  Found 1 items drwxr-xr-x   - ******* supergroup          0 2018-09-23 17:46 output MacBook-Pro-5:bin $ ./hadoop fs -mkdir ds MacBook-Pro-5:bin $ ./hadoop fs  -ls Found 2 items drwxr-xr-x   - ******* supergroup          0 2018-09-29 14:48 ds drwxr-xr-x   - ******* supergroup          0 2018-09-23 17:46 output MacBook-Pro-5:3.1.1 $ ./bin/hadoop fs -put /Users/*******/Desktop/profiledata_06-May-2005/artist_data.txt /user/*******/ds MacBook-Pro-5:3.1.1 $ ./bin/hadoop fs -put /Users/*******/Desktop/profiledata_06-May-2005/artist_alias.txt /user/*******/ds MacBook-Pro-5:3.1.1 $ ./bin/hadoop fs -put /Users/*******/Desktop/profiledata_06-May-2005/user_artist_data.txt /user/teijisw/ds  | 
					
http://localhost:9870/ でbrowseして転送されたか確かめると、

でOK。
では、Sparkの操作に移る。
大量のメモリが必要ということで、マルチコアモード&6G設定でspark-shellを立ち上げる。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  | 
						MacBook-Pro-5:3.1.1 $ ${SPARK_HOME}/bin/spark-shell --master local[*] --driver-memory 6g 2018-09-29 15:17:03 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2018-09-29 15:17:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. Spark context Web UI available at http://macbook-pro-5:4041 Spark context available as 'sc' (master = local[*], app id = local-1538201831639). Spark session available as 'spark'. Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/    /___/ .__/\_,_/_/ /_/\_\   version 2.3.1       /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala>  | 
					
まずは、データのRDDへの取り込み
| 
					 1 2 3 4 5  | 
						scala> val rawUserArtistData = sc.textFile("hdfs://localhost/user/******/ds/user_artist_data.txt") rawUserArtistData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/******/ds/user_artist_data.txt MapPartitionsRDD[3] at textFile at <console>:24 scala> rawUserArtistData.first res2: String = 1000002 1 55  | 
					
UserIDやArtistIDについて統計を見ると、
| 
					 1 2 3 4 5  | 
						scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats() res3: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000) scala> rawUserArtistData.map(_.split(' ')(1).toDouble).stats() res4: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)  | 
					
UserID最大値は、2,443,548件、Artist ID最大値は10,794,401
次にArtistIDを取り込む。
| 
					 1 2 3 4 5  | 
						scala> val rawArtistData = sc.textFile("hdfs://localhost/user/*******/ds/artist_data.txt") rawArtistData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/*******/ds/artist_data.txt MapPartitionsRDD[9] at textFile at <console>:24 scala> rawArtistData.first res5: String = 1134999	06Crazy Life  | 
					
アーティストIDとアーティスト名のリストを作成しておく:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16  | 
						scala> val artistByID = rawArtistData.flatMap { line =>      | val (id, name) = line.span(_ != '\t')      | if (name.isEmpty) {      | None      | } else {      | try {      | Some((id.toInt, name.trim))      | } catch {      | case e: NumberFormatException => None      | }      | }      | } artistByID: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at flatMap at <console>:25 scala> artistByID.first res6: (Int, String) = (1134999,06Crazy Life)  | 
					
重複するアーティストIDについて、”正しい”IDに紐づけするMapを作成する。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15  | 
						scala> val artistAlias = rawArtistAlias.flatMap { line =>      | val tokens = line.split('\t')      | if (tokens(0).isEmpty) {      | None      | } else {      | Some((tokens(0).toInt, tokens(1).toInt))      | }      | }.collectAsMap() artistAlias: scala.collection.Map[Int,Int] = Map(6803336 -> 1000010, 6663187 -> 1992, 2124273 -> 2814, 10412283 -> 1010353, 9969191 -> 1320354, 2024757 -> 1001941, 10208201 -> 4605, 2139121 -> 1011083, 1186393 -> 78, 2094504 -> 1012167, 9931106 -> 1000289, 2167517 -> 2060894, 1351735 -> 1266817, 6943682 -> 1003342, 2027368 -> 1000024, 2056419 -> 1020783, 1214789 -> 1001066, 1022944 -> 1004983, 6640739 -> 1010367, 6902331 -> 411, 10303141 -> 82, 10029249 -> 2070, 7001129 -> 739, 6627784 -> 1046699, 1113560 -> 1275800, 2155414 -> 1000790, 1291139 -> 4163, 10061700 -> 831, 1043158 -> 1301875, 10294241 -> 1234737, 9991298 -> 1001419, 9965450 -> 1016520, 6800447 -> 1078506, 1042440 -> 304, 1068288 -> 1001417, 6729982 -> 1809, 1138035 -> 1406, 1278247 -> 1239248, 1115453 -> 3824, 7035536 -> 3... scala> artistByID.lookup(6803336).head res7: String = Aerosmith (unplugged)                                             scala> artistByID.lookup(1000010).head res8: String = Aerosmith                                                          | 
					
ーーーーーーーーーーーーーーーーーーーーーー
次にモデル構築に先立って、Saprk MLlibのALSの実装での抽象化オブジェクトRatingを作成しておく。Ratingは(userID, finalID, count)という配列で構成される。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16  | 
						scala> import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.recommendation._ scala> val bArtistAlias = sc.broadcast(artistAlias) bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]] = Broadcast(13) scala> val trainData = rawUserArtistData.map { line =>      | val Array(userID, artistID, count) = line.split(' ').map(_.toInt)      | val finalArtistID =      | bArtistAlias.value.getOrElse(artistID, artistID)      | Rating(userID, finalArtistID, count)      | }.cache() trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[20] at map at <console>:30 scala> trainData.first res16: org.apache.spark.mllib.recommendation.Rating = Rating(1000002,1,55.0)      | 
					
いよいよALSモデル構築:
| 
					 1 2 3 4 5 6 7 8 9  | 
						scala> val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0) 2018-09-30 20:00:17 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 2018-09-30 20:00:17 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS [Stage 24:>                                                         (0 + 4) / 4]2018-09-30 20:00:19 WARN  LAPACK:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 2018-09-30 20:00:19 WARN  LAPACK:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@332d9d17 scala> model.userFeatures.mapValues(_.mkString(", ")).first() res10: (Int, String) = (116,-0.021858863532543182, -0.014364797621965408, 0.06130968779325485, -0.06215142086148262, -0.004167529288679361, 0.033082250505685806, 0.012420137412846088, 0.007236468605697155, 0.045138705521821976, -0.0030664762016385794)  | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17  | 
						scala> val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).      | filter { case Array(user,_,_) => user.toInt == 2093760 } rawArtistsForUser: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[150] at filter at <console>:29 scala> val existingProducts =       | rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt }.      | collect().toSet existingProducts: scala.collection.immutable.Set[Int] = Set(1255340, 942, 1180, 813, 378) scala> artistByID.filter { case (id, name) =>      | existingProducts.contains(id)      | }.values.collect().foreach(println) David Gray                                                                       Blackalicious Jurassic 5 The Saw Doctors Xzibit  | 
					
レコメンデーションを動かしてみると、
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18  | 
						scala> recommendations.foreach(println) Rating(2093760,2814,0.028278282551135854) Rating(2093760,1300642,0.02794663060097468) Rating(2093760,1001819,0.027406245396385975) Rating(2093760,1007614,0.027003857902096893) Rating(2093760,4605,0.026823378469111325) scala> val recommendedProductIDs = recommendations.map(_.product).toSet recommendedProductIDs: scala.collection.immutable.Set[Int] = Set(2814, 1001819, 1300642, 4605, 1007614) scala> artistByID.filter { case (id, name) =>      | recommendedProductIDs.contains(id)      | }.values.collect().foreach(println) 50 Cent                                                                          Snoop Dogg Jay-Z 2Pac The Game  |