K-Means Clusteringに進む。
————————————————————-
k-means
http://tech.nitoyon.com/ja/blog/2009/04/09/kmeans-visualise/
https://dev.classmethod.jp/machine-learning/k-means-impl/
1)分割対象となるクラスタ数kを決める
2)データが含まれる空間にランダムにk個の点(セントロイド)を置くき、それぞれのクラスタの中心とする
3)各データがセントロイドのうちどれに最も近いかを計算して、そのデータが所属するクラスタとする
4)セントロイドの位置をそのクラスタに含まれるデータの重心になるように移動する
各セントロイドの重心が変わらなくなるまで3, 4を繰り返す
————————————————————-
データのダウンロード:
http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
データ構造は以下の通り、ネットワークパケットデータに関する38の特徴を一行ずつ収めたもの。
1 2 3 4 5 6 7 8 9 10 |
0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,238,1282,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,4,4,0.00,0.00,0.00,0.00,1.00,0.00,0.00,5,5,1.00,0.00,0.20,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,5,5,0.00,0.00,0.00,0.00,1.00,0.00,0.00,6,6,1.00,0.00,0.17,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,234,1364,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,7,7,1.00,0.00,0.14,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,239,1295,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,7,7,0.00,0.00,0.00,0.00,1.00,0.00,0.00,8,8,1.00,0.00,0.12,0.00,0.00,0.00,0.00,0.00,normal. 0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal. |
データファイルを展開して、HDFSへコピーする:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
MacBook-Pro-5:bin *******$ ./hadoop fs -ls Found 2 items drwxr-xr-x - ******* supergroup 0 2018-10-08 08:35 ds drwxr-xr-x - ******* supergroup 0 2018-09-23 17:46 output MacBook-Pro-5:bin *******$ ./hadoop fs -put /Users/*******/Desktop/kddcup.data /user/*******/ds MacBook-Pro-5:bin *******$ ./hadoop fs -ls Found 2 items drwxr-xr-x - ******* supergroup 0 2018-10-10 23:27 ds drwxr-xr-x - ******* supergroup 0 2018-09-23 17:46 output MacBook-Pro-5:bin *******$ ./hadoop fs -ls ds/ Found 5 items -rw-r--r-- 3 ******* supergroup 2932731 2018-09-29 15:02 ds/artist_alias.txt -rw-r--r-- 3 ******* supergroup 55963575 2018-09-29 15:02 ds/artist_data.txt -rw-r--r-- 3 ******* supergroup 75169317 2018-10-08 08:35 ds/covtype.data -rw-r--r-- 3 ******* supergroup 742579829 2018-10-10 23:27 ds/kddcup.data -rw-r--r-- 3 ******* supergroup 426761761 2018-09-29 15:02 ds/user_artist_data.txt |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
scala> val rawData = sc.textFile("hdfs://localhost/user/*******/ds/kddcup.data") rawData: org.apache.spark.rdd.RDD[String] = hdfs://localhost/user/*******/ds/kddcup.data MapPartitionsRDD[153] at textFile at <console>:66 scala> rawData.first res22: String = 0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal. scala> rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println) (smurf.,2807886) (neptune.,1072017) (normal.,972781) (satan.,15892) (ipsweep.,12481) (portsweep.,10413) (nmap.,2316) (back.,2203) (warezclient.,1020) (teardrop.,979) (pod.,264) (guess_passwd.,53) (buffer_overflow.,30) (land.,21) (warezmaster.,20) (imap.,12) (rootkit.,10) (loadmodule.,9) (ftp_write.,8) (multihop.,7) (phf.,4) (perl.,3) (spy.,2) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
scala> import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg._ scala> val labelsAndData = rawData.map { line => | val buffer = line.split(',').toBuffer | buffer.remove(1,3) | val label = buffer.remove(buffer.length-1) | val vector = Vectors.dense(buffer.map(_.toDouble).toArray) | (label,vector) | } labelsAndData: org.apache.spark.rdd.RDD[(String, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[158] at map at <console>:72 scala> val data = labelsAndData.values.cache() data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[159] at values at <console>:70 scala> import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.clustering._ scala> val kmeans = new KMeans() kmeans: org.apache.spark.mllib.clustering.KMeans = org.apache.spark.mllib.clustering.KMeans@72175eb2 scala> val model = kmeans.run(data) 2018-10-11 22:35:24 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 2018-10-11 22:35:24 WARN BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@1a9c895e scala> model.clusterCenters.foreach(println) [48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994342,0.1780369894027253,0.05766489875327374,0.05772990937912739,0.7898841322630883,0.021179610609908736,0.02826081009629284,232.98107822302248,189.21428335201279,0.7537133898006421,0.030710978823798966,0.6050519309248854,0.006464107887636004,0.1780911843182601,0.17788589813474293,0.05792761150001131,0.05765922142400886] [10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0] scala> val clusterLabelCount = labelsAndData.map { case (label, datum) => | val cluster = model.predict(datum) | (cluster,label) | }.countByValue clusterLabelCount: scala.collection.Map[(Int, String),Long] = Map((0,portsweep.) -> 10412, (0,rootkit.) -> 10, (0,buffer_overflow.) -> 30, (0,phf.) -> 4, (0,pod.) -> 264, (0,perl.) -> 3, (0,spy.) -> 2, (0,ftp_write.) -> 8, (0,nmap.) -> 2316, (0,ipsweep.) -> 12481, (0,imap.) -> 12, (0,warezmaster.) -> 20, (0,satan.) -> 15892, (0,teardrop.) -> 979, (0,smurf.) -> 2807886, (0,neptune.) -> 1072017, (0,loadmodule.) -> 9, (0,guess_passwd.) -> 53, (0,normal.) -> 972781, (0,land.) -> 21, (0,multihop.) -> 7, (1,portsweep.) -> 1, (0,warezclient.) -> 1020, (0,back.) -> 2203) scala> clusterLabelCount.toSeq.sorted.foreach { | case ((cluster,label),count) => | println(f"$cluster%1s$label%18s$count%8s") | } 0 back. 2203 0 buffer_overflow. 30 0 ftp_write. 8 0 guess_passwd. 53 0 imap. 12 0 ipsweep. 12481 0 land. 21 0 loadmodule. 9 0 multihop. 7 0 neptune. 1072017 0 nmap. 2316 0 normal. 972781 0 perl. 3 0 phf. 4 0 pod. 264 0 portsweep. 10412 0 rootkit. 10 0 satan. 15892 0 smurf. 2807886 0 spy. 2 0 teardrop. 979 0 warezclient. 1020 0 warezmaster. 20 1 portsweep. 1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
scala> def distance(a: Vector, b: Vector) = | math.sqrt(a.toArray.zip(b.toArray). | map(p => p._1 - p._2).map(d => d * d).sum) distance: (a: org.apache.spark.mllib.linalg.Vector, b: org.apache.spark.mllib.linalg.Vector)Double scala> def distToCentroid(datum: Vector, model: KMeansModel) = { | val cluster = model.predict(datum) | val centroid = model.clusterCenters(cluster) | distance(centroid, datum) | } distToCentroid: (datum: org.apache.spark.mllib.linalg.Vector, model: org.apache.spark.mllib.clustering.KMeansModel)Double scala> import org.apache.spark.rdd._ import org.apache.spark.rdd._ scala> def clusteringScore(data: RDD[Vector], k: Int) = { | val kmeans = new KMeans() | kmeans.setK(k) | val model = kmeans.run(data) | data.map(datum => distToCentroid(datum, model)).mean() | } clusteringScore: (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector], k: Int)Double scala> (5 to 40 by 5).map(k => (k, clusteringScore(data, k))). | foreach(println) (5,1938.8583418059188) (10,1722.0822302073261) (15,1408.0323192835147) (20,1320.8639631824994) (25,1398.286324551) (30,1361.1805159713338) (35,1391.254669856399) (40,1317.702661115164) scala> kmeans.setRuns(10) warning: there was one deprecation warning; re-run with -deprecation for details 2018-10-12 07:19:06 WARN KMeans:66 - Setting number of runs has no effect since Spark 2.0.0. res27: kmeans.type = org.apache.spark.mllib.clustering.KMeans@72175eb2 scala> kmeans.setEpsilon(1.0e-6) res29: kmeans.type = org.apache.spark.mllib.clustering.KMeans@72175eb2 scala> val model = kmeans.run(data) model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@2699fdc6 scala> model.clusterCenters.foreach(println) [48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994342,0.1780369894027253,0.05766489875327374,0.05772990937912739,0.7898841322630883,0.021179610609908736,0.02826081009629284,232.98107822302248,189.21428335201279,0.7537133898006421,0.030710978823798966,0.6050519309248854,0.006464107887636004,0.1780911843182601,0.17788589813474293,0.05792761150001131,0.05765922142400886] [10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0] scala> (30 to 100 by 10).map(k => (k, clusteringScore(data, k))). | toList.foreach(println) (30,1371.5245809694686) (40,616.3543918300082) (50,1371.5003729493899) (60,1231.7358181447594) (70,1033.4866135659229) (80,1037.6127995247475) (90,1027.3537463530504) (100,1122.627708161601) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
scala> val dataAsArray = data.map(_.toArray) dataAsArray: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[1120] at map at <console>:76 scala> val numCols = dataAsArray.first().length numCols: Int = 38 scala> val n = dataAsArray.count() n: Long = 4898431 scala> val sums = dataAsArray.reduce( | (a,b) => a.zip(b).map(t => t._1 + t._2)) sums: Array[Double] = Array(2.3680206E8, 8.986765238E9, 5.357035893E9, 28.0, 3178.0, 39.0, 60925.0, 157.0, 703067.0, 39620.0, 334.0, 180.0, 63361.0, 5823.0, 364.0, 5002.0, 0.0, 2.0, 4091.0, 1.640844284E9, 1.446345448E9, 871775.1400000013, 872101.7299999918, 282468.4699999987, 282786.919999999, 3869193.1300014798, 103746.83999989525, 138433.59999998374, 1.141241758E9, 9.26852923E8, 3692012.2800011584, 150436.22999986156, 2963805.5300003863, 31663.98000003283, 872367.200000095, 871361.6200001689, 283755.3500000004, 282440.6600000077) scala> val sumSquares = dataAsArray.fold( | new Array[Double](numCols) | )( | (a,b) =>a.zip(b).map(t => t._1 + t._2 * t._2) | ) sumSquares: Array[Double] = Array(2.463849195674467E24, 1.5029566894234366E37, 4.1480721617281846E36, 348.0, 2.0716658E7, 41157.0, 4.05409351029E11, 40315.0, 2.67804240743E11, 3.87620239932607E15, 57514.0, 32330.0, 3.973113289930345E15, 1.910147959E9, 40730.0, 1.3449708E7, 0.0, 2.0, 6009507.0, 1.3240355904052093E23, 1.2303279161186096E23, 2.2930630690647998E11, 2.2997396165149054E11, 4.76244909831684E10, 4.7649954379826195E10, 2.6532680322640913E12, 3.892803952005269E8, 3.9456167844662333E9, 1.4456076820536306E22, 1.0060527982319084E22, 2.45669433155672E12, 1.185875219465042E9, 1.9054814751375215E12, 2.4872169439699925E7, 2.2942432803497485E11, 2.2980600871673083E11, 4.725207007332573E10, 4.683466205162662E10) scala> val stdevs = sumSquares.zip(sums).map { | case(sumSq,sum) => math.sqrt(n*sumSq - sum*sum)/n | } stdevs: Array[Double] = Array(7.092160637568064E8, 1.751639521459706E15, 9.202263271748564E14, 0.0084287083113581, 2.056512384574052, 0.09166285000710928, 287.68577159744495, 0.09072036889476061, 233.81918676097567, 28130.323433474063, 0.1083573070299824, 0.08124082227268468, 28479.8027413705, 19.74717642001751, 0.09118608360562963, 1.6570203724101165, 0.0, 6.3897874456691E-4, 1.1076200324017111, 1.644073889668342E8, 1.5848272926710385E8, 216.36118707751112, 216.67593997485977, 98.6021992658583, 98.62855554368699, 735.9728858210965, 8.914593473131658, 28.381068274048623, 5.432462364556549E7, 4.5319163933310404E7, 708.1851544280705, 15.559303546893325, 623.6970123393882, 2.253339083152612, 216.41685907071218, 216.5968049851283, 98.2159104102263, 97.78114589612957) scala> val means = sums.map(_ / n) means: Array[Double] = Array(48.34243046395876, 1834.6211752293746, 1093.6228137132073, 5.716116037972159E-6, 6.487791703098401E-4, 7.961733052889793E-6, 0.012437656057623349, 3.205107921291532E-5, 0.14352901980246327, 0.008088304193730605, 6.81850984529536E-5, 3.674646024410674E-5, 0.012934958152926926, 0.0011887479888968528, 7.430950849363806E-5, 0.001021143300783455, 0.0, 4.082940027122971E-7, 8.351653825480036E-4, 334.97344027097654, 295.26708613431526, 0.17797028068783685, 0.1780369530570078, 0.057665091128158937, 0.05773010174074086, 0.7898841751576127, 0.02117960628615474, 0.028260804326933203, 232.98108271811932, 189.21424492863122, 0.7537132359323135, 0.03071110524979561, 0.6050520115523493, 0.006464106568007762, 0.17809114796147887, 0.17788586181987026, 0.05792780382126448, 0.... scala> def normalize(datum:Vector) = { | val normalizedArray = (datum.toArray, means, stdevs).zipped.map( | (value, mean, stdev) => | if (stdev <= 0) (value - mean) else (value - mean) / stdev | ) | Vectors.dense(normalizedArray) | } normalize: (datum: org.apache.spark.mllib.linalg.Vector)org.apache.spark.mllib.linalg.Vector scala> val normalizedData = data.map(normalize).cache() normalizedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[1121] at map at <console>:78 scala> (60 to 120 by 10).par.map(k => | (k, clusteringScore(normalizedData, k))).toList.foreach(println) (60,0.002515205989657537) (70,0.0074734204141181405) (80,0.009216201322302978) (90,0.00850656316132561) (100,0.0037743584717983615) (110,0.004191318459031246) (120,0.002252777771221129) scala> def entropy(counts: Iterable[Int]) = { | val values = counts.filter(_ > 0) | val n: Double = values.sum | values.map { v => | val p = v / n | -p * math.log(p) | }.sum | } entropy: (counts: Iterable[Int])Double scala> def clusteringScore( | normalizedLabelsAndData: RDD[(String,Vector)], | k: Int) = { | val means = new KMeans() | val model = kmeans.run(normalizedLabelsAndData.values) | val labelsAndClusters = | normalizedLabelsAndData.mapValues(model.predict) | val clustersAndLabels = labelsAndClusters.map(_.swap) | val labelsInCluster = clustersAndLabels.groupByKey().values | val labelCounts = labelsInCluster.map( | _.groupBy(l => l).map(_._2.size)) | val n = normalizedLabelsAndData.count() | labelCounts.map(m => m.sum * entropy(m)).sum / n | } clusteringScore: (normalizedLabelsAndData: org.apache.spark.rdd.RDD[(String, org.apache.spark.mllib.linalg.Vector)], k: Int)Double scala> val normalizedData = data.map(normalize).cache() normalizedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[1591] at map at <console>:78 scala> val dataAsArray = data.map(_.toArray) dataAsArray: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[1594] at map at <console>:76 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
scala> def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = { | val dataAsArray = data.map(_.toArray) | val numCols = dataAsArray.first().length | val n = dataAsArray.count() | val sums = dataAsArray.reduce( | (a, b) => a.zip(b).map(t => t._1 + t._2)) | val sumSquares = dataAsArray.fold( | new Array[Double](numCols) | )( | (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2) | ) | val stdevs = sumSquares.zip(sums).map { | case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n | } | val means = sums.map(_ / n) | | (datum: Vector) => { | val normalizedArray = (datum.toArray, means, stdevs).zipped.map( | (value, mean, stdev) => | if (stdev <= 0) (value - mean) else (value - mean) / stdev | ) | Vectors.dense(normalizedArray) | } | } buildNormalizationFunction: (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector])org.apache.spark.mllib.linalg.Vector => org.apache.spark.mllib.linalg.Vector scala> val data = rawData.map { line => | val buffer = line.split(',').toBuffer | buffer.remove(1, 3) | buffer.remove(buffer.length - 1) | Vectors.dense(buffer.map(_.toDouble).toArray) | } data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[1598] at map at <console>:82 scala> def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String,Vector)) = { | val splitData = rawData.map(_.split(',')) | val protocols = splitData.map(_(1)).distinct().collect().zipWithIndex.toMap | val services = splitData.map(_(2)).distinct().collect().zipWithIndex.toMap | val tcpStates = splitData.map(_(3)).distinct().collect().zipWithIndex.toMap | (line: String) => { | val buffer = line.split(',').toBuffer | val protocol = buffer.remove(1) | val service = buffer.remove(1) | val tcpState = buffer.remove(1) | val label = buffer.remove(buffer.length - 1) | val vector = buffer.map(_.toDouble) | | val newProtocolFeatures = new Array[Double](protocols.size) | newProtocolFeatures(protocols(protocol)) = 1.0 | val newServiceFeatures = new Array[Double](services.size) | newServiceFeatures(services(service)) = 1.0 | val newTcpStateFeatures = new Array[Double](tcpStates.size) | newTcpStateFeatures(tcpStates(tcpState)) = 1.0 | | vector.insertAll(1, newTcpStateFeatures) | vector.insertAll(1, newServiceFeatures) | vector.insertAll(1, newProtocolFeatures) | | (label, Vectors.dense(vector.toArray)) | } | } buildCategoricalAndLabelFunction: (rawData: org.apache.spark.rdd.RDD[String])String => (String, org.apache.spark.mllib.linalg.Vector) scala> val parseFunction = buildCategoricalAndLabelFunction(rawData) parseFunction: String => (String, org.apache.spark.mllib.linalg.Vector) = <function1> scala> val labelsAndData = rawData.map(parseFunction) labelsAndData: org.apache.spark.rdd.RDD[(String, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[1614] at map at <console>:84 scala> val normalizedLabelsAndData = | labelsAndData.mapValues(buildNormalizationFunction(labelsAndData.values)).cache() normalizedLabelsAndData: org.apache.spark.rdd.RDD[(String, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[1617] at mapValues at <console>:85 scala> (80 to 160 by 10).map(k => | (k, clusteringScore3(normalizedLabelsAndData, k))).toList.foreach(println) ............. (80,1.012046519872184) (90,0.9743177501437411) (100,0.9913653857398761) (110,0.5303639330366472) (120,0.9893296160212628) (130,1.006420963739569) (140,0.9943646103936579) (150,0.9422446404832385) (160,0.9822526994639119) |