Advanced Analytics with Spark #7に入る。共起ネットワークの構築。ScaleによるXMLのハンドリングが含まれる。
———————————————
共起ネットワークの「共起」とは、ある単語がある文章(または文)中に出たとき、その文章(文)中に別の限られた単語が頻繁に出現すること。共起とは、自然言語処理の分野において、任意の文書や文において、ある文字列とある文字列が同時に出現することである。
———————————————
NIHのFTPサーバーからMedlineのサンプルデータを取得する。
1 |
ftp://ftp.nlm.nih.gov/nlmdata/sample/medline/*.gz |
gunzipして展開して得たxmlファイルを確認。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE MedlineCitationSet PUBLIC "-//NLM//DTD Medline Citation, 1st January, 2016//EN" "https://www.nlm.nih.gov/databases/dtd/nlmmedlinecitationset_160101.dtd"> <MedlineCitationSet> <MedlineCitation Owner="PIP" Status="MEDLINE"> <PMID Version="1">12254773</PMID> <DateCreated> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCreated> <DateCompleted> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCompleted> <DateRevised> <Year>2010</Year> <Month>12</Month> <Day>10</Day> </DateRevised> <Article PubModel="Print"> <Journal> <ISSN IssnType="Print">0972-2068</ISSN> <JournalIssue CitedMedium="Print"> <Volume>27</Volume> <PubDate> <Year>1965</Year> </PubDate> </JournalIssue> <Title>The Indian journal of surgery</Title> <ISOAbbreviation>Indian J Surg</ISOAbbreviation> </Journal> <ArticleTitle>Vasectomy-behind testis.</ArticleTitle> <Pagination> <MedlinePgn>485-7</MedlinePgn> </Pagination> <AuthorList CompleteYN="Y"> <Author ValidYN="Y"> <LastName>Shahade</LastName> <ForeName>M G</ForeName> <Initials>MG</Initials> </Author> <Author ValidYN="Y"> <LastName>Shah</LastName> <ForeName>V C</ForeName> <Initials>VC</Initials> </Author> <Author ValidYN="Y"> <LastName>Boradkar</LastName> <ForeName>R V</ForeName> <Initials>RV</Initials> </Author> </AuthorList> <Language>eng</Language> <PublicationTypeList> <PublicationType UI="D016428">Journal Article</PublicationType> </PublicationTypeList> </Article> <MedlineJournalInfo> <Country>United States</Country> <MedlineTA>Indian J Surg</MedlineTA> <NlmUniqueID>0373026</NlmUniqueID> <ISSNLinking>0973-9793</ISSNLinking> </MedlineJournalInfo> <CitationSubset>J</CitationSubset> <MeshHeadingList> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D005193">Family Planning Services</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="Y" UI="D013502">General Surgery</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013245">Sterilization, Reproductive</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013812">Therapeutics</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013521">Urologic Surgical Procedures, Male</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="Y" UI="D014659">Vasectomy</DescriptorName> </MeshHeading> </MeshHeadingList> <OtherID Source="PIP">650212</OtherID> <OtherID Source="POP">00005390</OtherID> <OtherAbstract Type="PIP" Language="eng"> <AbstractText>The findings in a series of 100 scrotal dissections seem to indicate that the conventional method of vasectomy may be partly responsible for the persistent failures of vaso-vasostomy. It is suggested that the selection of a different site for vasectomy is necessary in order to minimize the percentage of failures attending the operation of vaso-vasostomy for refertility. Anatomically, vasectomy performed above the level of the upper pole of the testis carries some disadvantages. There is a greater possibility of damaging the vessels. Defining the ends of vas in relatively loose tissue calls for a wider dissection and possibly may prove more damaging when done for the subsequent operation of anastomosis. Thus, the blood supply to both the testis and vas may be jeopardized. The resultant scar tissue is also significantly large. When the vas is exposed at the back of the upper 1/3 of the testis, the testicular artery has already divided into branches and left the vas. The venous plexus is yet to form a veil for it. The vas is comparatively better anchored in the surrounding areolar tissue. Since in most of the cases the vas is not coiled in this region, no difficulty should be encountered, and the difficulty of anastomozing the ends of the coiled portion of the vas deferens is generally accepted when the patients comes for an anastomotic operation at a later date. Also the scar tissue will be less. It is recommended that the method be used on a larger scale.</AbstractText> </OtherAbstract> <KeywordList Owner="PIP"> <Keyword MajorTopicYN="N">Family Planning</Keyword> <Keyword MajorTopicYN="N">Male Sterilization</Keyword> <Keyword MajorTopicYN="N">Male Urologic Surgery</Keyword> <Keyword MajorTopicYN="N">Sterilization, Sexual</Keyword> <Keyword MajorTopicYN="Y">Surgery</Keyword> <Keyword MajorTopicYN="N">Treatment</Keyword> <Keyword MajorTopicYN="Y">Vasectomy</Keyword> </KeywordList> </MedlineCitation> <MedlineCitation Owner="PIP" Status="MEDLINE"> ......... |
その中の、MeSHのキーワードは、主要トピックス属性を含む。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<MeshHeadingList> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D005193">Family Planning Services</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="Y" UI="D013502">General Surgery</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013245">Sterilization, Reproductive</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013812">Therapeutics</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="N" UI="D013521">Urologic Surgical Procedures, Male</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName MajorTopicYN="Y" UI="D014659">Vasectomy</DescriptorName> </MeshHeading> </MeshHeadingList> |
xmlファイルをHDFSにロードする。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
MacBook-Pro-5:bin $ ./hadoop fs -mkdir medline MacBook-Pro-5:bin $ ./hadoop fs -mkdir medline MacBook-Pro-5:bin $ ./hadoop fs -put /Users/*******/Desktop/medline/*.xml /user/*******/medline MacBook-Pro-5:bin $ ./hadoop fs -ls medline Found 8 items -rw-r--r-- 3 supergroup 71147066 2018-10-13 20:40 medline/medline16n0033.xml -rw-r--r-- 3 supergroup 72705330 2018-10-13 20:40 medline/medline16n0074.xml -rw-r--r-- 3 supergroup 105189622 2018-10-13 20:40 medline/medline16n0144.xml -rw-r--r-- 3 supergroup 112711106 2018-10-13 20:40 medline/medline16n0237.xml -rw-r--r-- 3 supergroup 156910066 2018-10-13 20:40 medline/medline16n0365.xml -rw-r--r-- 3 supergroup 131298588 2018-10-13 20:41 medline/medline16n0439.xml -rw-r--r-- 3 supergroup 133663105 2018-10-13 20:41 medline/medline16n0576.xml -rw-r--r-- 3 supergroup 145188012 2018-10-13 20:41 medline/medline16n0736.xml |
https://github.com/sryza/aas/tree/1st-edition
よりコードをダウンロード、unzipで、commonディレクトリへ移動し、mavenでビルド。
1 2 3 |
$ cd common/ $ mvn package $ spark-shell --jars target/common-1.0.0-with-dependencies.jar |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
MacBook-Pro-5:common $ ${SPARK_HOME}/bin/spark-shell --jars target/common-1.0.2-jar-with-dependencies.jar 2018-10-19 20:03:24 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://macbook-pro-5:4040 Spark context available as 'sc' (master = local[*], app id = local-1539947013911). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> import com.cloudera.datascience.common.XmlInputFormat import com.cloudera.datascience.common.XmlInputFormat scala> import org.apache.spark.SparkContext import org.apache.spark.SparkContext scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.hadoop.io.{Text, LongWritable} import org.apache.hadoop.io.{Text, LongWritable} scala> import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration scala> def loadMedline(sc: SparkContext, path: String) = { | @transient val conf = new Configuration() | conf.set(XmlInputFormat.START_TAG_KEY, "<MedlineCitation ") | conf.set(XmlInputFormat.END_TAG_KEY, "</MedlineCitation>") | val in =sc.newAPIHadoopFile(path, classOf[XmlInputFormat], | classOf[LongWritable], classOf[Text], conf) | in.map(line => line._2.toString) | } loadMedline: (sc: org.apache.spark.SparkContext, path: String)org.apache.spark.rdd.RDD[String] scala> val medline_raw = loadMedline(sc, "hdfs://localhost/user/*******/medline") medline_raw: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:35 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
scala> import scala.xml._ import scala.xml._ scala> val raw_xml = medline_raw.take(1)(0) raw_xml: String = <MedlineCitation Owner="PIP" Status="MEDLINE"> <PMID Version="1">12255379</PMID> <DateCreated> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCreated> <DateCompleted> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCompleted> <DateRevised> <Year>2013</Year> <Month>02</Month> <Day>19</Day> </DateRevised> <Article PubModel="Print"> <Journal> <ISSN IssnType="Print">0002-9955</ISSN> <JournalIssue CitedMedium="Print"> <Volume>159</Volume> <Issue>3</Issue> <PubDate> <Year>1955</Year> <Month>Sep</Month> <Day>17</Day> </PubDate> </JournalIssue> <Title>Journal of the American Medical Association</Title> <ISOAbbreviation>J Am Med Assoc</ISOAbbreviation> </Journal> <ArticleTitle>Association of maternal and fetal factors with development of mental deficiency. 1. ... scala> val elem = XML.loadString(raw_xml) elem: scala.xml.Elem = <MedlineCitation Status="MEDLINE" Owner="PIP"> <PMID Version="1">12255379</PMID> <DateCreated> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCreated> <DateCompleted> <Year>1980</Year> <Month>01</Month> <Day>03</Day> </DateCompleted> <DateRevised> <Year>2013</Year> <Month>02</Month> <Day>19</Day> </DateRevised> <Article PubModel="Print"> <Journal> <ISSN IssnType="Print">0002-9955</ISSN> <JournalIssue CitedMedium="Print"> <Volume>159</Volume> <Issue>3</Issue> <PubDate> <Year>1955</Year> <Month>Sep</Month> <Day>17</Day> </PubDate> </JournalIssue> <Title>Journal of the American Medical Association</Title> <ISOAbbreviation>J Am Med Assoc</ISOAbbreviation> </Journal> <ArticleTitle>Association of maternal and fetal factors with development of mental deficiency.... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
scala> elem.label res0: String = MedlineCitation scala> elem.attributes res1: scala.xml.MetaData = Status="MEDLINE" Owner="PIP" scala> elem \ "MeshHeadingList" res2: scala.xml.NodeSeq = NodeSeq(<MeshHeadingList> <MeshHeading> <DescriptorName UI="D001519" MajorTopicYN="N">Behavior</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D000013" MajorTopicYN="N">Congenital Abnormalities</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D006233" MajorTopicYN="N">Disabled Persons</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D004194" MajorTopicYN="N">Disease</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D008607" MajorTopicYN="Y">Intellectual Disability</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D007360" MajorTopicYN="N">Intelligence</DescriptorName> </MeshHeading> <MeshHeading> <DescriptorName UI="D008431" MajorTopicYN="Y">Maternal-Fetal Exchange</DescriptorNa... scala> elem \\ "MeshHeading" res3: scala.xml.NodeSeq = NodeSeq(<MeshHeading> <DescriptorName UI="D001519" MajorTopicYN="N">Behavior</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D000013" MajorTopicYN="N">Congenital Abnormalities</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D006233" MajorTopicYN="N">Disabled Persons</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D004194" MajorTopicYN="N">Disease</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D008607" MajorTopicYN="Y">Intellectual Disability</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D007360" MajorTopicYN="N">Intelligence</DescriptorName> </MeshHeading>, <MeshHeading> <DescriptorName UI="D008431" MajorTopicYN="Y">Maternal-Fetal Exchange</DescriptorName> </MeshHe... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
scala> def majorTopics(elem: Elem): Seq[String] = { | val dn = elem \\ "DescriptorName" | val mt = dn.filter(n => (n \ "@MajorTopicYN").text =="Y") | mt.map(n => n.text) | } majorTopics: (elem: scala.xml.Elem)Seq[String] scala> majorTopics(elem) res4: Seq[String] = List(Intellectual Disability, Maternal-Fetal Exchange, Pregnancy Complications) scala> val mxml: RDD[Elem] = medline_raw.map(XML.loadString) mxml: org.apache.spark.rdd.RDD[scala.xml.Elem] = MapPartitionsRDD[2] at map at <console>:33 scala> val medline: RDD[Seq[String]] = mxml.map(majorTopics).cache() medline: org.apache.spark.rdd.RDD[Seq[String]] = MapPartitionsRDD[3] at map at <console>:35 scala> medline.take(1)(0) res5: Seq[String] = List(Intellectual Disability, Maternal-Fetal Exchange, Pregnancy Complications) |
MedlineデータからMeSHトピックタグをmedlineとして取り出せた。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
scala> medline.count() res6: Long = 240000 scala> val topics: RDD[String] = medline.flatMap(mesh => mesh) topics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:33 scala> val topicCounts = topics.countByValue() topicCounts: scala.collection.Map[String,Long] = Map(Dental Implantation, Subperiosteal -> 3, Odontogenic Tumors -> 10, Spinal Cord Diseases -> 25, Family Leave -> 2, Crystallography, X-Ray -> 2, Ethylene Glycol -> 2, Cisplatin -> 1, Type A Personality -> 3, G(M1) Ganglioside -> 4, 2-Naphthylamine -> 1, Pancreatic Diseases -> 42, Lung Compliance -> 6, Streptolysins -> 2, Vicia faba -> 1, Direct Service Costs -> 1, Lichen Planus -> 11, Case-Control Studies -> 11, Superoxide Dismutase -> 3, Verbal Behavior -> 37, Phantom Limb -> 4, Leeches -> 18, Leisure Activities -> 25, Collective Bargaining -> 19, Limulus Test -> 1, Dentition, Mixed -> 1, Emergency Medical Service Communication Systems -> 4, Psychopharmacology -> 32, Crying -> 4, Leishmania tropica -> 2, Microscopy, Confocal -> 6, Benz... scala> topicCounts.size res7: Int = 14548 scala> val tcSeq = topicCounts.toSeq tcSeq: Seq[(String, Long)] = ArrayBuffer((Dental Implantation, Subperiosteal,3), (Odontogenic Tumors,10), (Spinal Cord Diseases,25), (Family Leave,2), (Crystallography, X-Ray,2), (Ethylene Glycol,2), (Cisplatin,1), (Type A Personality,3), (G(M1) Ganglioside,4), (2-Naphthylamine,1), (Pancreatic Diseases,42), (Lung Compliance,6), (Streptolysins,2), (Vicia faba,1), (Direct Service Costs,1), (Lichen Planus,11), (Case-Control Studies,11), (Superoxide Dismutase,3), (Verbal Behavior,37), (Phantom Limb,4), (Leeches,18), (Leisure Activities,25), (Collective Bargaining,19), (Limulus Test,1), (Dentition, Mixed,1), (Emergency Medical Service Communication Systems,4), (Psychopharmacology,32), (Crying,4), (Leishmania tropica,2), (Microscopy, Confocal,6), (Benzoates,14), (Filoviridae,1), (Peritonsilla... scala> tcSeq.sortBy(_._2).reverse.take(10).foreach(println) (Research,1649) (Disease,1349) (Neoplasms,1123) (Tuberculosis,1066) (Public Policy,816) (Jurisprudence,796) (Demography,763) (Population Dynamics,753) (Economics,690) (Medicine,682) scala> val valueDist = topicCounts.groupBy(_._2).mapValues(_.size) valueDist: scala.collection.immutable.Map[Long,Int] = Map(69 -> 12, 365 -> 1, 138 -> 5, 101 -> 9, 479 -> 1, 249 -> 2, 234 -> 1, 88 -> 13, 170 -> 2, 115 -> 5, 276 -> 1, 5 -> 680, 120 -> 1, 655 -> 1, 269 -> 1, 202 -> 2, 10 -> 296, 385 -> 1, 56 -> 20, 142 -> 4, 153 -> 3, 174 -> 1, 185 -> 2, 42 -> 38, 24 -> 87, 320 -> 2, 37 -> 71, 25 -> 79, 257 -> 1, 52 -> 24, 14 -> 203, 184 -> 2, 110 -> 6, 125 -> 5, 357 -> 1, 196 -> 1, 542 -> 1, 157 -> 9, 189 -> 2, 20 -> 134, 421 -> 1, 46 -> 34, 93 -> 14, 325 -> 3, 152 -> 4, 228 -> 2, 57 -> 20, 316 -> 2, 78 -> 16, 29 -> 59, 216 -> 1, 164 -> 3, 179 -> 5, 211 -> 4, 253 -> 2, 106 -> 8, 121 -> 7, 348 -> 1, 84 -> 16, 353 -> 1, 147 -> 4, 280 -> 3, 61 -> 13, 221 -> 1, 132 -> 6, 89 -> 9, 133 -> 2, 116 -> 3, 1 -> 3106, 312 -> 1, 74 -> 8, 206 -> 3, 307 -> 1, 233 -> ... scala> valueDist.toSeq.sorted.take(10).foreach(println) (1,3106) (2,1699) (3,1207) (4,902) (5,680) (6,571) (7,490) (8,380) (9,356) (10,296) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
scala> val list = List(1, 2, 3) list: List[Int] = List(1, 2, 3) scala> val combs = list.combbinations(2) <console>:33: error: value combbinations is not a member of List[Int] val combs = list.combbinations(2) ^ scala> val combs = list.combinations(2) combs: Iterator[List[Int]] = non-empty iterator scala> combs.foreach(println) List(1, 2) List(1, 3) List(2, 3) scala> val combbs = list.reverse.combinations(2) combbs: Iterator[List[Int]] = non-empty iterator scala> combs.foreach(println) scala> List(3 ,2) == List(2, 3) res15: Boolean = false scala> val topicPairs = medline.flatMap(t => t.sorted.combinations(2)) topicPairs: org.apache.spark.rdd.RDD[Seq[String]] = MapPartitionsRDD[8] at flatMap at <console>:33 scala> val cooccurs = topicPairs.map(p => (p, 1)).reduceByKey(_+_) cooccurs: org.apache.spark.rdd.RDD[(Seq[String], Int)] = ShuffledRDD[10] at reduceByKey at <console>:33 scala> cooccurs.cache() res16: cooccurs.type = ShuffledRDD[10] at reduceByKey at <console>:33 scala> cooccurs.count() res17: Long = 213745 scala> val ord = Ordering.by[(Seq[String], Int), Int](_._2) ord: scala.math.Ordering[(Seq[String], Int)] = scala.math.Ordering$$anon$9@3b5dd166 scala> cooccurs.top(10)(ord).foreach(println) (List(Demography, Population Dynamics),288) (List(Government Regulation, Social Control, Formal),254) (List(Emigration and Immigration, Population Dynamics),230) (List(Acquired Immunodeficiency Syndrome, HIV Infections),220) (List(Antibiotics, Antitubercular, Dermatologic Agents),205) (List(Analgesia, Anesthesia),183) (List(Economics, Population Dynamics),181) (List(Analgesia, Anesthesia and Analgesia),179) (List(Anesthesia, Anesthesia and Analgesia),177) (List(Population Dynamics, Population Growth),174) |