共起ネットワーク:抽出語またはコードを用いて、出現パターンの似通ったものを線で結んだ図、すなわち共起関係を線(edge)で表したネットワークを描く機能
ーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーーー
GraphX:グラフに対して最適された特別なRDDの実装を基盤とする。
VertexRDD[VD], RDD[(VertexId, VD)]: VertexIDはLong型ですべての頂点に必須
VDは、頂点に関連づけられた頂点の属性、任意の型のデータ
EdgeRDD[ED], RDD[Edge[ED]]: Edgeは、2つのVertexID値とED型の辺の属性を持つケースクラス。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
scala> import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets scala> import java.security.MessageDigest import java.security.MessageDigest scala> def hashId(str: String): Long = { | val bytes = MessageDigest.getInstance("MD5"). | digest(str.getBytes(StandardCharsets.UTF_8)) | (bytes(0) & 0xFFL) | | ((bytes(1) & 0xFFL) << 8) | | ((bytes(2) & 0xFFL) << 16) | | ((bytes(3) & 0xFFL) << 24) | | ((bytes(4) & 0xFFL) << 32) | | ((bytes(5) & 0xFFL) << 40) | | ((bytes(6) & 0xFFL) << 48) | | ((bytes(7) & 0xFFL) << 56) | } hashId: (str: String)Long scala> val vertices = topics.map(topic => (hashId(topic), topic)) vertices: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[12] at map at <console>:37 scala> val uniqueHashes = vertices.map(_._1).countByValue() uniqueHashes: scala.collection.Map[Long,Long] = Map(-5463505990747276935 -> 129, -5648550135738908125 -> 80, -4338122482909018135 -> 86, 814828650435338689 -> 11, 8616370848682321939 -> 1, -3061924423727164275 -> 11, -4761753283958666425 -> 2, 8518496914599387445 -> 10, -519918301923265192 -> 29, -909116023300148938 -> 2, -8609663971641751753 -> 17, 398581641960663275 -> 115, -7797988349527728970 -> 3, -8861979347488278804 -> 337, 1640973796745058702 -> 2, 7544981738786029967 -> 1, 5468248999204594527 -> 5, -1540174592956162871 -> 1, 4888490987316767907 -> 8, 3788497069650193252 -> 1, 3609967523423816492 -> 1, -5754327275894548688 -> 1, 1795002388729510994 -> 2, 6785823199518695772 -> 134, -1533871353730553556 -> 14, -2718961283740799767 -> 7, 2893062267821501710 -> 183, -25366619520260... scala> val uniqueTopics = vertices.map(_._2).countByValue() uniqueTopics: scala.collection.Map[String,Long] = Map(Dental Implantation, Subperiosteal -> 3, Odontogenic Tumors -> 10, Spinal Cord Diseases -> 25, Family Leave -> 2, Crystallography, X-Ray -> 2, Ethylene Glycol -> 2, Cisplatin -> 1, Type A Personality -> 3, G(M1) Ganglioside -> 4, 2-Naphthylamine -> 1, Pancreatic Diseases -> 42, Lung Compliance -> 6, Streptolysins -> 2, Vicia faba -> 1, Direct Service Costs -> 1, Lichen Planus -> 11, Case-Control Studies -> 11, Superoxide Dismutase -> 3, Verbal Behavior -> 37, Phantom Limb -> 4, Leeches -> 18, Leisure Activities -> 25, Collective Bargaining -> 19, Limulus Test -> 1, Dentition, Mixed -> 1, Emergency Medical Service Communication Systems -> 4, Psychopharmacology -> 32, Crying -> 4, Leishmania tropica -> 2, Microscopy, Confocal -> 6, Ben... scala> uniqueHashes.size == uniqueTopics.size res11: Boolean = true scala> import org.apache.spark.graphx._ import org.apache.spark.graphx._ scala> val edges = cooccurs.map(p => { | val (topics, cnt) = p | val ids = topics.map(hashId).sorted | Edge(ids(0), ids(1), cnt) | }) edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[21] at map at <console>:42 scala> val topicGraph = Graph(vertices, edges) topicGraph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@3bd7ff1f scala> topicGraph.cache() res12: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@3bd7ff1f scala> vertices.count() res13: Long = 280464 scala> topicGraph.vertices.count() res14: Long = 14548 scala> val connectedComponentGraph: Graph[VertexId, Int] = | topicGraph.connectedComponents() connectedComponentGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@577226c0 scala> def sortedConnectedComponents( | connectedComponents: Graph[VertexId, _]) | : Seq[(VertexId, Long)] = { | val componentCounts = connectedComponents.vertices.map(_._2). | countByValue | componentCounts.toSeq.sortBy(_._2).reverse | } sortedConnectedComponents: (connectedComponents: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId, _])Seq[(org.apache.spark.graphx.VertexId, Long)] scala> val componentCounts = sortedConnectedComponents( | connectedComponentGraph) componentCounts: Seq[(org.apache.spark.graphx.VertexId, Long)] = ArrayBuffer((-9218306090261648869,13610), (-8193948242717911820,5), (-2062883918534425492,4), (1765411469112156596,3), (-784187332742198415,3), (2742772755763603550,3), (-8679136035911620397,3), (-7016546051037489808,3), (-7685954109876710390,3), (-3299226677350014771,2), (-5362458719777034637,2), (-7057556716085932818,2), (-5688848791152897676,2), (4990772389918494402,2), (2026738476704047088,2), (-2317423407077322989,2), (-5295884525273097033,2), (-1570577283199790912,2), (-1046815223728304871,2), (-2387762220049741428,2), (-2512898474094185390,2), (1928754713977428085,2), (4303892666114347427,2), (-2565470797001804556,2), (-4349348356289090846,2), (-4717785562675251817,2), (-3467839743215210439,2), (-3247250938470812857... scala> componentCounts.size res15: Int = 878 scala> componentCounts.take(10).foreach(println) (-9218306090261648869,13610) (-8193948242717911820,5) (-2062883918534425492,4) (1765411469112156596,3) (-784187332742198415,3) (2742772755763603550,3) (-8679136035911620397,3) (-7016546051037489808,3) (-7685954109876710390,3) (-3299226677350014771,2) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
scala> val nameCID = topicGraph.vertices. | innerJoin(connectedComponentGraph.vertices){ | (topicId, name, componentId) => (name, componentId) | } nameCID: org.apache.spark.graphx.VertexRDD[(String, org.apache.spark.graphx.VertexId)] = VertexRDDImpl[320] at RDD at VertexRDD.scala:57 scala> val c1 = nameCID.filter(x => x._2._2 == componentCounts(1)._2) c1: org.apache.spark.graphx.VertexRDD[(String, org.apache.spark.graphx.VertexId)] = VertexRDDImpl[322] at RDD at VertexRDD.scala:57 scala> val nameCID = topicGraph.vertices. | innerJoin(connectedComponentGraph.vertices) { | (topicId, name, componentId) => | (name, componentId)} nameCID: org.apache.spark.graphx.VertexRDD[(String, org.apache.spark.graphx.VertexId)] = VertexRDDImpl[324] at RDD at VertexRDD.scala:57 scala> val c1 = nameCID.filter(x => x._2._2 == componentCounts(1)._1) c1: org.apache.spark.graphx.VertexRDD[(String, org.apache.spark.graphx.VertexId)] = VertexRDDImpl[326] at RDD at VertexRDD.scala:57 scala> c1.collect().foreach(x => println(x._2._1)) 3-Hydroxyacyl CoA Dehydrogenases Carbon-Carbon Double Bond Isomerases Acetyl-CoA C-Acyltransferase Racemases and Epimerases Enoyl-CoA Hydratase scala> val hiv = topics.filter(_.contains("HIV")).countByValue() hiv: scala.collection.Map[String,Long] = Map(HIV -> 13, HIV Integrase Inhibitors -> 1, HIV Antigens -> 1, HIV Seroprevalence -> 4, HIV Long Terminal Repeat -> 4, HIV-1 -> 62, HIV Seronegativity -> 5, HIV Antibodies -> 1, HIV Seropositivity -> 42, HIV Infections -> 325, HIV-2 -> 2) scala> hiv.foreach(println) (HIV,13) (HIV Integrase Inhibitors,1) (HIV Antigens,1) (HIV Seroprevalence,4) (HIV Long Terminal Repeat,4) (HIV-1,62) (HIV Seronegativity,5) (HIV Antibodies,1) (HIV Seropositivity,42) (HIV Infections,325) (HIV-2,2) scala> val degrees: VertexRDD[Int] = topicGraph.degrees.cache() degrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[334] at RDD at VertexRDD.scala:57 scala> degrees.map(_._2).stats() res19: org.apache.spark.util.StatCounter = (count: 13721, mean: 31.155892, stdev: 65.497591, max: 2596.000000, min: 1.000000) scala> val sing = medline.filter(x => x.size == 1) sing: org.apache.spark.rdd.RDD[Seq[String]] = MapPartitionsRDD[338] at filter at <console>:38 scala> sing.count() res20: Long = 44509 scala> val singTopic = sing.flatMap(topic => topic).distinct() singTopic: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[342] at distinct at <console>:38 scala> singTopic.count() res22: Long = 8243 scala> val topic2 = topicPairs.flatMap(p => p) topic2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[348] at flatMap at <console>:38 scala> singTopic.subtract(topic2).count() res26: Long = 827 scala> def topNamesAndDegrees(degrees: VertexRDD[Int], | topicGraph: Graph[String, Int]): Array[(String, Int)] = { | val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) { | (topicId, degree, name) => (name, degree) | } | val ord = Ordering.by[(String, Int), Int](_._2) | namesAndDegrees.map(_._2).top(19)(ord) | } topNamesAndDegrees: (degrees: org.apache.spark.graphx.VertexRDD[Int], topicGraph: org.apache.spark.graphx.Graph[String,Int])Array[(String, Int)] scala> topNamesAndDegrees(degrees, topicGraph).foreach(println) (Research,2596) (Disease,1746) (Neoplasms,1202) (Blood,914) (Pharmacology,882) (Tuberculosis,815) (Toxicology,694) (Drug Therapy,678) (Jurisprudence,661) (Biomedical Research,633) (Physicians,625) (Public Policy,601) (Medicine,590) (Metabolism,578) (Social Change,570) (Wounds and Injuries,570) (Brain,569) (Hospitals,557) (Urine,551) |