2つ前のブログの続きだが、AUCの計算をspark-shellから行うようにする。
ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー
Advanced Analytics with Sparkで注意すべきは、RunRecommenderのScalaコードは、第1版と改訂第2版では異なることに注意が必要(日本語版は、第1版に基づいて翻訳されている)。
念の為、明示的にMLlibの交互最小二乗法アルゴリズムALSをインポート
1 2 |
scala> import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.ALS |
次に、
1 2 |
scala> import org.apache.spark.rdd._ import org.apache.spark.rdd._ |
https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala
はじめにbuildRatings()関数を入力し、
1 2 3 4 5 6 7 8 9 10 |
scala> def buildRatings( | rawUserArtistData: RDD[String], | bArtistAlias: Broadcast[Map[Int,Int]]) = { | rawUserArtistData.map { line => | val Array(userID, artistID, count) = line.split(' ').map(_.toInt) | val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) | Rating(userID, finalArtistID, count) | } | } buildRatings: (rawUserArtistData: org.apache.spark.rdd.RDD[String], bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,Int]])org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] |
そして、areaUnderCurve()関数を入力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
scala> def areaUnderCurve( | positiveData: RDD[Rating], | bAllItemIDs: Broadcast[Array[Int]], | predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = { | // What this actually computes is AUC, per user. The result is actually something | // that might be called "mean AUC". | | // Take held-out data as the "positive", and map to tuples | val positiveUserProducts = positiveData.map(r => (r.user, r.product)) | // Make predictions for each of them, including a numeric score, and gather by user | val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user) | | // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of | // small AUC problems, and it would be inefficient, when a direct computation is available. | | // Create a set of "negative" products for each user. These are randomly chosen | // from among all of the other items, excluding those that are "positive" for the user. | val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions { | // mapPartitions operates on many (user,positive-items) pairs at once | userIDAndPosItemIDs => { | // Init an RNG and the item IDs set once for partition | val random = new Random() | val allItemIDs = bAllItemIDs.value | userIDAndPosItemIDs.map { case (userID, posItemIDs) => | val posItemIDSet = posItemIDs.toSet | val negative = new ArrayBuffer[Int]() | var i = 0 | // Keep about as many negative examples per user as positive. | // Duplicates are OK | while (i < allItemIDs.size && negative.size < posItemIDSet.size) { | val itemID = allItemIDs(random.nextInt(allItemIDs.size)) | if (!posItemIDSet.contains(itemID)) { | negative += itemID | } | i += 1 | } | // Result is a collection of (user,negative-item) tuples | negative.map(itemID => (userID, itemID)) | } | } | }.flatMap(t => t) | // flatMap breaks the collections above down into one big set of tuples | | // Make predictions on the rest: | val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user) | | // Join positive and negative by user | positivePredictions.join(negativePredictions).values.map { | case (positiveRatings, negativeRatings) => | // AUC may be viewed as the probability that a random positive item scores | // higher than a random negative one. Here the proportion of all positive-negative | // pairs that are correctly ranked is computed. The result is equal to the AUC metric. | var correct = 0L | var total = 0L | // For each pairing, | for (positive <- positiveRatings; | negative <- negativeRatings) { | // Count the correctly-ranked pairs | if (positive.rating > negative.rating) { | correct += 1 | } | total += 1 | } | // Return AUC: fraction of pairs ranked correctly | correct.toDouble / total | }.mean() // Return mean AUC over users | } areaUnderCurve: (positiveData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], bAllItemIDs: org.apache.spark.broadcast.Broadcast[Array[Int]], predictFunction: org.apache.spark.rdd.RDD[(Int, Int)] => org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating])Double |
続いて、
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> val allData = buildRatings(rawUserArtistData, bArtistAlias) allData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[176] at map at <console>:56 scala> val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[177] at randomSplit at <console>:57 cvData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[178] at randomSplit at <console>:57 scala> trainData.cache() res30: trainData.type = MapPartitionsRDD[177] at randomSplit at <console>:57 scala> cvData.cache() res31: cvData.type = MapPartitionsRDD[178] at randomSplit at <console>:57 |
そして、
1 2 3 4 5 6 7 8 9 10 11 |
scala> val allItemIDs = allData.map(_.product).distinct().collect() allItemIDs: Array[Int] = Array(1080592, 10018048, 10525020, 1056524, 10619288, 2292732, 6780292, 10263176, 2157804, 10589952, 6934196, 10678216, 6996068, 10049204, 6971832, 2077700, 6613612, 6783116, 9999308, 10205264, 10678004, 9985320, 1247824, 6959324, 10090912, 10255740, 9961692, 1199844, 1005400, 7012524, 10076880, 10124800, 10311216, 10325196, 10555588, 2290348, 10036736, 10650956, 2291648, 2169600, 2159020, 10159884, 10667052, 10456872, 10278380, 10209852, 10005972, 10454268, 10424208, 6841520, 10501616, 1296256, 6998068, 10497112, 10278964, 10034920, 6918716, 2026604, 1259928, 10661936, 10054332, 10270552, 10292828, 9967084, 10425360, 10607200, 10367540, 10615396, 6950220, 10461484, 10463692, 1096820, 9952444, 10744988, 1279340, 10009896, 1323760, 6831708, 10602496, 10750156, 10... scala> val bAllItemIDs = sc.broadcast(allItemIDs) bAllItemIDs: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(70) scala> val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0) model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@25b61308 scala> val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict) auc: Double = 0.9635602721348083 |
で、aucがおよそ0.96として算定された!
次に、全ユーザーによって最も再生されたアーティストのノンパーソナライズ・レコメンデーションについて、predictMostListened()関数を入力し、AUCを求めると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
scala> def predictMostListened( | sc: SparkContext, | train: RDD[Rating])(allData: RDD[(Int, Int)]) = { | | val bListenCount = sc.broadcast( | train.map(r => (r.product, r.rating)). | reduceByKey(_ + _).collectAsMap() | ) | allData.map { case (user, product) => | Rating( | user, | product, | bListenCount.value.getOrElse(product, 0.0) | ) | } | } predictMostListened: (sc: org.apache.spark.SparkContext, train: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating])(allData: org.apache.spark.rdd.RDD[(Int, Int)])org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] scala> val auc = areaUnderCurve( | cvData, bAllItemIDs, predictMostListened(sc, trainData)) auc: Double = 0.9396235286692161 |
0.93 と算定された。
もうひと踏ん張り、ハイパーパラメータの選択のアルゴリズムを試すと、
1 2 3 4 5 6 7 8 9 |
scala> val evaluations = | for (rank <- Array(10, 50); | lambda <- Array(1.0, 0.0001); | alpha <- Array(1.0, 40.0)) | yield { | val model = ALS.trainImplicit(trainData, rank, 10, lambda, alpha) | val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict) | ((rank, lambda, alpha), auc) | } |
これはたいへんな時間がかかったが、
1 2 3 4 5 6 7 8 9 10 11 |
evaluations: Array[((Int, Double, Double), Double)] = Array(((10,1.0,1.0),0.9686171840536397), ((10,1.0,40.0),0.9772468567451441), ((10,1.0E-4,1.0),0.9648577864850243), ((10,1.0E-4,40.0),0.9767269334034012), ((50,1.0,1.0),0.9671881706203465), ((50,1.0,40.0),0.9774044747907816), ((50,1.0E-4,1.0),0.9549044403996609), ((50,1.0E-4,40.0),0.9765901640521224)) scala> evaluations.sortBy(_._2).reverse.foreach(println) ((50,1.0,40.0),0.9774044747907816) ((10,1.0,40.0),0.9772468567451441) ((10,1.0E-4,40.0),0.9767269334034012) ((50,1.0E-4,40.0),0.9765901640521224) ((10,1.0,1.0),0.9686171840536397) ((50,1.0,1.0),0.9671881706203465) ((10,1.0E-4,1.0),0.9648577864850243) ((50,1.0E-4,1.0),0.9549044403996609) |
と出力結果が得られた。
最後に、100人のユーザーに対するレコメンデーションは、
1 2 3 4 |
val someUsers = allData.map(_.user).distinct().take(100) someUsers: Array[Int] = Array(1000536, 1054536, 1055300, 2377612, 2252008, 2311888, 2070920, 2405248, 2310772, 2005368, 2090648, 2403060, 2294288, 2321808, 1052120, 1073180, 2142096, 2309472, 2328592, 2097060, 2442036, 2103600, 2344500, 2381892, 2193052, 2434716, 2431784, 2432512, 1064252, 2349132, 1005400, 2289460, 2015584, 2426576, 2334316, 1055540, 2290348, 2351700, 2431812, 1039724, 2150588, 1000268, 2360472, 2130636, 9336, 2394728, 1065208, 2293400, 2199036, 2088736, 2344148, 2141020, 2158180, 2429052, 1038816, 2075792, 2187896, 2063160, 2442920, 2402200, 2181924, 1056692, 2334088, 2047224, 2398336, 1073884, 2168512, 2361124, 2350016, 2172448, 2165792, 2211764, 2110860, 2291444, 2169840, 2228280, 2334288, 2303208, 2375692, 2195536, 2037904, 2367844, 2142924, 2209660, 2403684, 10262... Array[Array[org.apache.spark.mllib.recommendation.Rating]] = Array(Array(Rating(1000536,1000591,0.23488420865315707), Rating(1000536,2,0.23194148292017544), Rating(1000536,15,0.2316448559704606), Rating(1000536,530,0.22932569339683917), Rating(1000536,979,0.22826476334846676)), Array(Rating(1054536,478,1.1689171803448288), Rating(1054536,441,1.1644528910011611), Rating(1054536,118,1.1359750064173522), Rating(1054536,1,1.1065668632862296), Rating(1054536,15,1.1063510245381283)), Array(Rating(1055300,5705,1.2553594813590623), Rating(1055300,5810,1.2185943167157451), Rating(1055300,1006016,1.151232770302419), Rating(1055300,1854,1.1483776203654905), Rating(1055300,1003282,1.1278787157205392)), Array(Rating(2377612,979,0.5671701177824222), Rating(2377612,1000113,0.55503... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
scala> val someRecommendations = | someUsers.map(userID => model.recommendProducts(userID, 5)) someRecommendations: Array[Array[org.apache.spark.mllib.recommendation.Rating]] = Array(Array(Rating(1000536,1000591,0.23488420865315707), Rating(1000536,2,0.23194148292017544), Rating(1000536,15,0.2316448559704606), Rating(1000536,530,0.22932569339683917), Rating(1000536,979,0.22826476334846676)), Array(Rating(1054536,478,1.1689171803448288), Rating(1054536,441,1.1644528910011611), Rating(1054536,118,1.1359750064173522), Rating(1054536,1,1.1065668632862296), Rating(1054536,15,1.1063510245381283)), Array(Rating(1055300,5705,1.2553594813590623), Rating(1055300,5810,1.2185943167157451), Rating(1055300,1006016,1.151232770302419), Rating(1055300,1854,1.1483776203654905), Rating(1055300,1003282,1.1278787157205392)), Array(Rating(2377612,979,0.5671701177824222), Rating(2377612,1000113,0.55503... scala> someRecommendations.map( | recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ") | ).foreach(println) 1000536 -> 1000591, 2, 15, 530, 979 1054536 -> 478, 441, 118, 1, 15 1055300 -> 5705, 5810, 1006016, 1854, 1003282 2377612 -> 979, 1000113, 1034635, 976, 82 2252008 -> 1250079, 1013656, 6684730, 6621492, 1308328 2311888 -> 1009156, 1014187, 1014716, 1238230, 1088164 2070920 -> 1008419, 5705, 1000024, 4209, 1011083 2405248 -> 4267, 1274, 976, 1205, 1000113 2310772 -> 1890, 2, 15, 1105360, 1319 2005368 -> 1011083, 1009291, 1008419, 3533, 1002725 2090648 -> 6694932, 1285410, 1005820, 420, 1233342 2403060 -> 4468, 1000024, 1000481, 2003588, 4267 2294288 -> 1007903, 1854, 1001048, 1002326, 1006016 2321808 -> 1000113, 979, 1270639, 1177, 3327 1052120 -> 1134859, 1305866, 1516, 1278093, 1002530 1073180 -> 1000481, 1004162, 4468, 5409, 2003588 2142096 -> 1000251, 1183, 1002575, 1399, 3909 2309472 -> 1001531, 1298659, 1285410, 1001909, 1193 2328592 -> 1298659, 352, 1233196, 5270, 1001909 2097060 -> 1000113, 1177, 979, 1205, 4267 2442036 -> 4267, 1854, 1274, 1001412, 1000123 2103600 -> 1854, 4465, 1001412, 4267, 1002326 2344500 -> 1825, 1000107, 4209, 1000024, 1008419 2381892 -> 1119903, 1000241, 1008627, 1011967, 1002287 2193052 -> 1000427, 1001066, 1004484, 1002128, 1004226 2434716 -> 793, 1000113, 606, 1394, 313 2431784 -> 5705, 1000024, 4468, 1000107, 2003588 2432512 -> 1025225, 1002128, 1004278, 1004294, 1016435 1064252 -> 2, 15, 441, 59, 979 2349132 -> 441, 478, 221, 530, 1000591 1005400 -> 1037970, 1003673, 250, 860, 1007614 2289460 -> 1019694, 1265562, 107, 1006427, 58 2015584 -> 1270, 1205, 1034635, 1428, 1274 2426576 -> 1000113, 1205, 1177, 979, 82 2334316 -> 1002465, 621, 5837, 478, 5210 1055540 -> 1090387, 1033714, 1233782, 1006626, 1006594 2290348 -> 1025225, 1002128, 1004278, 1006957, 1026440 2351700 -> 1205, 1000113, 1275996, 1177, 1890 2431812 -> 1000239, 1005176, 1011083, 1008419, 1011299 1039724 -> 5837, 1000557, 1000591, 1000781, 1251 2150588 -> 1002326, 3111, 1000445, 1006016, 1001048 1000268 -> 15, 1000113, 979, 754, 189 2360472 -> 1000113, 979, 4267, 1177, 1270639 2130636 -> 5705, 5810, 4468, 1000024, 2003588 9336 -> 1000113, 352, 979, 1177, 1231740 2394728 -> 1008419, 4209, 1000495, 1011083, 1001582 1065208 -> 930, 1000113, 4267, 1274, 1177 2293400 -> 1002840, 6694932, 1004983, 1233712, 736 2199036 -> 1205, 4267, 1000113, 1274, 1270 2088736 -> 1000113, 300, 1177, 1233770, 352 2344148 -> 4267, 1270639, 1274, 1177, 1000139 2141020 -> 1000113, 1205, 1275996, 1177, 82 2158180 -> 1177, 1205, 1000113, 1145952, 1270639 2429052 -> 979, 1000113, 976, 4267, 1000139 1038816 -> 1003729, 1119903, 1008851, 1012445, 1013203 2075792 -> 1854, 4267, 1006016, 1274, 930 2187896 -> 2439, 5705, 742, 1014536, 4468 2063160 -> 979, 976, 4267, 1000113, 82 2442920 -> 1008984, 1007491, 1022843, 1020405, 1016791 2402200 -> 1001909, 1298659, 1000130, 1001779, 1193 2181924 -> 1000113, 1205, 1177, 1275996, 793 1056692 -> 1000113, 1177, 979, 1205, 1231740 2334088 -> 1394, 1223, 82, 606, 1000113 2047224 -> 4192, 1003176, 1004278, 1001523, 1007735 2398336 -> 930, 4267, 1854, 1274, 1205 1073884 -> 1233342, 1001907, 1001531, 1285410, 420 2168512 -> 5687, 1000055, 1001708, 2897, 1006612 2361124 -> 1205, 1270, 1274, 4267, 1000113 2350016 -> 15, 242, 1495, 2, 59 2172448 -> 1205, 1000113, 1177, 1275996, 979 2165792 -> 1285410, 1233342, 1250233, 1298659, 1008445 2211764 -> 478, 5837, 118, 221, 1002465 2110860 -> 1002328, 1000442, 1066440, 1003258, 1000136 2291444 -> 1205, 1177, 2132, 1000113, 1890 2169840 -> 15, 2, 1000113, 979, 189 2228280 -> 4267, 1274, 1270639, 1259, 1001412 2334288 -> 92, 478, 221, 57, 441 2303208 -> 1000024, 5705, 4468, 4267, 976 2375692 -> 1000113, 979, 82, 1394, 313 2195536 -> 5810, 1000591, 5705, 1034635, 4241 2037904 -> 153, 4250, 1844, 411, 1303 2367844 -> 1205, 1177, 1000113, 1259, 1000088 2142924 -> 2, 1000113, 15, 979, 352 2209660 -> 1022166, 1004584, 1005896, 1005143, 1025389 2403684 -> 4192, 1000130, 1007735, 1233196, 1006234 1026276 -> 1034635, 1295531, 1247272, 1031984, 1256375 2056508 -> 1000113, 1205, 82, 1177, 979 2120144 -> 1001534, 176, 1012167, 2636, 1000570 2212208 -> 1003284, 1001848, 1002344, 1015706, 1164352 2026648 -> 1032595, 1257225, 1268623, 1000505, 4153 2141916 -> 969, 1854, 5705, 2003588, 1000183 2272856 -> 1300642, 1004028, 1034510, 1003249, 829 2273244 -> 1034635, 930, 1274, 4267, 5810 2336012 -> 4267, 1274, 1000024, 5705, 976 2302280 -> 1000113, 313, 352, 979, 793 2403720 -> 1784, 930, 1238013, 6914803, 1002270 2227004 -> 4192, 1233196, 1000130, 1001779, 1270639 2279152 -> 250, 1037970, 1007614, 930, 78 2434668 -> 1007286, 1105069, 1003278, 1023455, 1012630 2271688 -> 1001531, 1298659, 1285410, 1193, 1235 |