Advanced Analytics with Spark、協調フィルタリングの続き:
前回のレコメンデーションの問題点は、すでにユーザーが再生したことのあるアーティストが選ばれている可能性があり、改善の余地があるとのことで、潜在的な隠されていたアーティストのランクを高くする工夫が必要。
受信者操作特性(ROC)曲線下の面積AUCの計算によるランダムに選択された良いレコメンデーションが、ランダムに選択された悪いレコメンデーションより高くなるとみなすレトリックを用いることになる。
———————–
RunRecommender.scalaをビルドして実行してみる。
O’Reillyの
https://github.com/sryza/aas
から、2nd Editionのaas-masterをダウンロードして、zipを展開。(注意点は1st Editionとは、コードが異なる点。日本語版解説は、1st Edition)
ターミナルでダウンロードしたファイルのルートフォルダに移動して、まとめて全部をmvn packageでビルド。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
MacBook-Pro-5:$ cd ~/Desktop/aas-master/ MacBook-Pro-5:aas-master $ mvn package [INFO] Scanning for projects... [INFO] ------------------------------------------------------------------------ [INFO] Reactor Build Order: [INFO] [INFO] Advanced Analytics with Spark [pom] [INFO] Introduction to Data Analysis with Scala and Spark [jar] [INFO] Recommender Engines with Audioscrobbler data [jar] [INFO] Covtype with Random Decision Forests [jar] [INFO] Anomaly Detection with K-means [jar] [INFO] Wikipedia Latent Semantic Analysis [jar] [INFO] Network Analysis with GraphX [jar] [INFO] Temporal and Geospatial Analysis [jar] [INFO] Value at Risk through Monte Carlo Simulation [jar] [INFO] Genomics Analysis with ADAM [jar] [INFO] Simple Spark Project [jar] [INFO] [INFO] -------------< com.cloudera.datascience:spark-book-parent >------------- [INFO] Building Advanced Analytics with Spark 2.0.0 [1/11] [INFO] --------------------------------[ pom ]--------------------------------- .......................... [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Advanced Analytics with Spark 2.0.0 ................ SUCCESS [ 27.126 s] [INFO] Introduction to Data Analysis with Scala and Spark . SUCCESS [ 7.520 s] [INFO] Recommender Engines with Audioscrobbler data ....... SUCCESS [ 8.289 s] [INFO] Covtype with Random Decision Forests ............... SUCCESS [ 7.380 s] [INFO] Anomaly Detection with K-means ..................... SUCCESS [ 7.744 s] [INFO] Wikipedia Latent Semantic Analysis ................. SUCCESS [02:57 min] [INFO] Network Analysis with GraphX ....................... SUCCESS [ 11.606 s] [INFO] Temporal and Geospatial Analysis ................... SUCCESS [ 38.236 s] [INFO] Value at Risk through Monte Carlo Simulation ....... SUCCESS [ 39.553 s] [INFO] Genomics Analysis with ADAM ........................ SUCCESS [01:03 min] [INFO] Simple Spark Project 0.1.0 ......................... SUCCESS [ 5.468 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 06:33 min [INFO] Finished at: 2018-10-01T21:28:43+09:00 [INFO] ------------------------------------------------------------------------ |
次は、上記ビルドで生成されたCh03のRunRecommenderオブジェクトを含むjarファイルであるch03-recommender-2.0.0.jarをsparkのルートフォルダに戻って、./bin/spark-shell –class クラス名 –master local –driver-memory 6g
この歳、scalaコード内のhdsfを指定のこと。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
MacBook-Pro-5:opt $ cd spark-2.3.1-bin-hadoop2.7/ MacBook-Pro-5:bin $ ./spark-submit --class com.cloudera.datascience.recommender.RunRecommender --master local --driver-memory 6g /Users/******/Desktop/aas-master/ch03-recommender/target/ch03-recommender-2.0.0.jar "hdfs://localhost/user/******/ds" 2018-10-02 12:34:36 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2018-10-02 12:34:36 INFO SparkContext:54 - Running Spark version 2.3.1 2018-10-02 12:34:36 INFO SparkContext:54 - Submitted application: com.cloudera.datascience.recommender.RunRecommender 2018-10-02 12:34:36 INFO SecurityManager:54 - Changing view acls to: ****** 2018-10-02 12:34:36 INFO SecurityManager:54 - Changing modify acls to: ****** 2018-10-02 12:34:36 INFO SecurityManager:54 - Changing view acls groups to: 2018-10-02 12:34:36 INFO SecurityManager:54 - Changing modify acls groups to: 2018-10-02 12:34:36 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(*******); groups with view permissions: Set(); users with modify permissions: Set(******); groups with modify permissions: Set() ................................ 2018-10-02 12:35:01 INFO DAGScheduler:54 - Job 1 finished: show at RunRecommender.scala:54, took 15.299026 s +---------+---------+-----------+-----------+ |min(user)|max(user)|min(artist)|max(artist)| +---------+---------+-----------+-----------+ | 90| 2443548| 1| 10794401| +---------+---------+-----------+-----------+ .............................. 2018-10-02 12:35:03 INFO DAGScheduler:54 - Job 3 finished: show at RunRecommender.scala:60, took 1.395868 s +-------+----------------+ | id| name| +-------+----------------+ |1208690|Collective Souls| |1003926| Collective Soul| +-------+----------------+ .............................................. 2018-10-02 12:38:12 INFO DAGScheduler:54 - Job 20 finished: show at RunRecommender.scala:87, took 0.027555 s +----------------------------------------------------------------------------------------------------------------------------------------------+ |features | +----------------------------------------------------------------------------------------------------------------------------------------------+ |[-0.53730315, -0.16584359, -0.16210097, 0.18933417, 0.23322561, -1.1863623, -0.3488593, -0.69462603, -0.12432492, 0.30754876] | |[-0.10383668, 0.02541644, -0.17977226, -0.20040289, -0.084848225, -0.2579723, 0.12273478, -0.17608017, -0.13086882, -0.11101537] | |[-0.0024654828, -0.0022357032, -5.875338E-4, 0.004703222, 0.0029828765, -0.0054704775, 0.0011728706, 0.005223815, -3.3195328E-4, 0.0018437293]| |[-1.283496, 0.10618588, 0.014418791, -0.24593131, 0.2850579, -1.2994826, -1.1507778, -0.79442567, -0.5174058, -0.07698267] | |[-0.2682497, 0.28311056, -0.6489346, -0.5748753, -0.08043325, -1.8599199, -0.4102788, -1.4428464, 0.07455098, -0.43402758] | |[-0.22923502, 3.1890643, -1.3474056, -2.0670774, 0.109929055, -0.38829893, 0.71637166, 0.47915265, -3.2555096, -1.2660463] | |[-0.94964045, 0.10526103, -0.4038005, -0.49588615, -0.53273034, -1.3643637, -0.3723757, -1.0292097, -0.6975986, -0.53862673] | |[-0.10861254, 0.023622228, -0.05853923, -0.11052456, 0.062839165, -0.042079225, 0.016297048, 0.012557145, -0.031182658, 0.015315899] | |[-1.0018965, 1.3609643, 0.48011023, -0.6970923, 1.1940441, -0.65466535, 1.0252241, -0.06693699, -0.29960147, -0.004105416] | |[-0.26099408, 0.22590931, -1.03931, -0.3156938, 0.1327528, -0.74317986, -0.18529738, -0.85464865, 0.18234478, -0.6488315] | |[-0.71419555, 1.0681338, -1.4361836, -0.7710001, 1.2170936, -2.9110541, -0.9240388, -1.6840515, -0.41185093, 0.2699301] | |[0.47626457, 1.2853637, -2.5167668, -1.4255049, 0.06962372, -2.6849105, 0.64177734, -1.1455265, -0.9962598, 0.14937088] | |[-0.09681982, 0.60888803, -1.352679, -0.97277033, 0.15170042, -2.327941, 1.1989579, -1.5569432, -1.0098588, 0.18629323] | |[0.15343769, 1.5712098, -1.3730986, -2.1901414, 1.359406, -2.6616871, 1.0845017, -0.9930877, -1.1129212, 0.362242] | |[0.12544422, -0.14659607, -0.73566866, -0.44283217, -0.71781284, -0.78667325, -0.40465298, -0.5765957, -0.5172876, -0.42280206] | |[0.06068485, 0.4695999, -0.8293207, -0.95579416, 0.051268563, -1.0010786, 0.7862446, -0.44474217, -0.7528527, 0.24726658] | |[-0.069934726, 1.6075708, -1.872137, -1.2995775, 0.21003456, -2.6858637, -3.0294926, -0.025330616, -0.12815729, -1.3466915] | |[-0.12813951, 0.23946032, -0.35572857, -0.38602868, -0.121549465, -0.44149035, 0.19253857, -0.3877246, -0.4000775, -0.3083994] | |[-0.0035302863, 0.027211253, -0.0068199676, 0.0069124564, 0.019341556, -0.06967176, 0.053023823, -0.041810427, -0.015937444, 0.024241213] | |[0.5703832, -4.256954E-4, -1.6173834, -1.6129936, -0.7122008, -1.9598482, 0.2769042, -0.88904965, -0.34728128, -0.37004444] | +----------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 20 rows .......................................................... 2018-10-02 12:38:39 INFO DAGScheduler:54 - Job 22 finished: show at RunRecommender.scala:97, took 1.116664 s +-------+---------------+ | id| name| +-------+---------------+ | 1180| David Gray| | 378| Blackalicious| | 813| Jurassic 5| |1255340|The Saw Doctors| | 942| Xzibit| +-------+---------------+ ................................................................. 2018-10-02 12:38:39 INFO SparkContext:54 - Invoking stop() from shutdown hook 2018-10-02 12:38:39 INFO AbstractConnector:318 - Stopped Spark@22dbe5c9{HTTP/1.1,[http/1.1]}{0.0.0.0:4042} 2018-10-02 12:38:39 INFO SparkUI:54 - Stopped Spark web UI at http://*********:4042 2018-10-02 12:38:39 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2018-10-02 12:38:39 INFO MemoryStore:54 - MemoryStore cleared 2018-10-02 12:38:39 INFO BlockManager:54 - BlockManager stopped 2018-10-02 12:38:39 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2018-10-02 12:38:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2018-10-02 12:38:39 INFO SparkContext:54 - Successfully stopped SparkContext 2018-10-02 12:38:39 INFO ShutdownHookManager:54 - Shutdown hook called 2018-10-02 12:38:39 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/0j/j_y5y9pj73g9f1r01zqzycxm0000gp/T/spark-54ea2c76-6bfe-41b7-bdf1-3378282beac0 2018-10-02 12:38:39 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/0j/j_y5y9pj73g9f1r01zqzycxm0000gp/T/spark-a47b1d59-135e-48a5-b810-b3e75da8849e MacBook-Pro-5:bin $ |
と大量の出力コードの中に、ちらほらと結果が!
RunRecommender.scalaの中身の意味を分析してみよう。。。。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
/* * Copyright 2015 and onwards Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills * * See LICENSE file for further information. */ package com.cloudera.datascience.recommender import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ object RunRecommender { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Optional, but may help avoid errors due to long lineage spark.sparkContext.setCheckpointDir("hdfs://localhost/tmp/") val base = "hdfs://localhost/user/******/ds/" val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt") val rawArtistData = spark.read.textFile(base + "artist_data.txt") val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt") val runRecommender = new RunRecommender(spark) runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias) runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias) runRecommender.evaluate(rawUserArtistData, rawArtistAlias) runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias) } } class RunRecommender(private val spark: SparkSession) { import spark.implicits._ def preparation( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = { rawUserArtistData.take(5).foreach(println) val userArtistDF = rawUserArtistData.map { line => val Array(user, artist, _*) = line.split(' ') (user.toInt, artist.toInt) }.toDF("user", "artist") userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show() val artistByID = buildArtistByID(rawArtistData) val artistAlias = buildArtistAlias(rawArtistAlias) val (badID, goodID) = artistAlias.head artistByID.filter($"id" isin (badID, goodID)).show() } def model( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = { val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias)) val trainData = buildCounts(rawUserArtistData, bArtistAlias).cache() val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(10). setRegParam(0.01). setAlpha(1.0). setMaxIter(5). setUserCol("user"). setItemCol("artist"). setRatingCol("count"). setPredictionCol("prediction"). fit(trainData) trainData.unpersist() model.userFactors.select("features").show(truncate = false) val userID = 2093760 val existingArtistIDs = trainData. filter($"user" === userID). select("artist").as[Int].collect() val artistByID = buildArtistByID(rawArtistData) artistByID.filter($"id" isin (existingArtistIDs:_*)).show() val topRecommendations = makeRecommendations(model, userID, 5) topRecommendations.show() val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect() artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show() model.userFactors.unpersist() model.itemFactors.unpersist() } def evaluate( rawUserArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = { val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias)) val allData = buildCounts(rawUserArtistData, bArtistAlias) val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache() cvData.cache() val allArtistIDs = allData.select("artist").as[Int].distinct().collect() val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs) val mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData)) println(mostListenedAUC) val evaluations = for (rank <- Seq(5, 30); regParam <- Seq(1.0, 0.0001); alpha <- Seq(1.0, 40.0)) yield { val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(rank).setRegParam(regParam). setAlpha(alpha).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(trainData) val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform) model.userFactors.unpersist() model.itemFactors.unpersist() (auc, (rank, regParam, alpha)) } evaluations.sorted.reverse.foreach(println) trainData.unpersist() cvData.unpersist() } def recommend( rawUserArtistData: Dataset[String], rawArtistData: Dataset[String], rawArtistAlias: Dataset[String]): Unit = { val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias)) val allData = buildCounts(rawUserArtistData, bArtistAlias).cache() val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(10).setRegParam(1.0).setAlpha(40.0).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(allData) allData.unpersist() val userID = 2093760 val topRecommendations = makeRecommendations(model, userID, 5) val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect() val artistByID = buildArtistByID(rawArtistData) artistByID.join(spark.createDataset(recommendedArtistIDs).toDF("id"), "id"). select("name").show() model.userFactors.unpersist() model.itemFactors.unpersist() } def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = { rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None } else { try { Some((id.toInt, name.trim)) } catch { case _: NumberFormatException => None } } }.toDF("id", "name") } def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = { rawArtistAlias.flatMap { line => val Array(artist, alias) = line.split('\t') if (artist.isEmpty) { None } else { Some((artist.toInt, alias.toInt)) } }.collect().toMap } def buildCounts( rawUserArtistData: Dataset[String], bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = { rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) (userID, finalArtistID, count) }.toDF("user", "artist", "count") } def makeRecommendations(model: ALSModel, userID: Int, howMany: Int): DataFrame = { val toRecommend = model.itemFactors. select($"id".as("artist")). withColumn("user", lit(userID)) model.transform(toRecommend). select("artist", "prediction"). orderBy($"prediction".desc). limit(howMany) } def areaUnderCurve( positiveData: DataFrame, bAllArtistIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive". // Make predictions for each of them, including a numeric score val positivePredictions = predictFunction(positiveData.select("user", "artist")). withColumnRenamed("prediction", "positivePrediction") // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other artists, excluding those that are "positive" for the user. val negativeData = positiveData.select("user", "artist").as[(Int,Int)]. groupByKey { case (user, _) => user }. flatMapGroups { case (userID, userIDAndPosArtistIDs) => val random = new Random() val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet val negative = new ArrayBuffer[Int]() val allArtistIDs = bAllArtistIDs.value var i = 0 // Make at most one pass over all artists to avoid an infinite loop. // Also stop when number of negative equals positive set size while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { val artistID = allArtistIDs(random.nextInt(allArtistIDs.length)) // Only add new distinct IDs if (!posItemIDSet.contains(artistID)) { negative += artistID } i += 1 } // Return the set with user ID added back negative.map(artistID => (userID, artistID)) }.toDF("user", "artist") // Make predictions on the rest: val negativePredictions = predictFunction(negativeData). withColumnRenamed("prediction", "negativePrediction") // Join positive predictions to negative predictions by user, only. // This will result in a row for every possible pairing of positive and negative // predictions within each user. val joinedPredictions = positivePredictions.join(negativePredictions, "user"). select("user", "positivePrediction", "negativePrediction").cache() // Count the number of pairs per user val allCounts = joinedPredictions. groupBy("user").agg(count(lit("1")).as("total")). select("user", "total") // Count the number of correctly ordered pairs per user val correctCounts = joinedPredictions. filter($"positivePrediction" > $"negativePrediction"). groupBy("user").agg(count("user").as("correct")). select("user", "correct") // Combine these, compute their ratio, and average over all users val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer"). select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")). agg(mean("auc")). as[Double].first() joinedPredictions.unpersist() meanAUC } def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = { val listenCounts = train.groupBy("artist"). agg(sum("count").as("prediction")). select("artist", "prediction") allData. join(listenCounts, Seq("artist"), "left_outer"). select("user", "artist", "prediction") } } |