Hadoop 3.3.1 & HSDF on M1 Macで悪戦苦闘記
sudoでないと,
1 |
./bin/hdfs namenode -format |
が通らなくなった。いろいろ調べて、何かセキュリティー設定を間違えてしまったのか、versionアップして、脆弱性が更新されたのか、よくわからない。
1 |
sudo ./bin/hdfs namenode -format |
とすると、localhostと、Logが書き込めないとエラー。
1 2 3 4 |
localhost: ERROR: Unable to write in /opt/homebrew/Cellar/hadoop/3.3.1/libexec/logs. Aborting. Starting secondary namenodes [MacBook-Pro-7.local] MacBook-Pro-7.local: Warning: Permanently added 'macbook-pro-7.local' (ECDSA) to the list of known hosts. MacBook-Pro-7.local: ERROR: Unable to write in /opt/homebrew/Cellar/hadoop/3.3.1/libexec/logs. Aborting. |
そこで、etc/hostsに、127.0.0.1 macbook-pro-7.localを追加して、次に、hadoop_env.shのログ記録のフォルダーを/var/logs/hadoopに書き換えて、chmod 777でフォルダを書き込み可能と変更。
1 |
export HADOOP_LOG_DIR=/var/logs/hadoop |
1 |
sudo -u ******** ./bin/hdfs namenode -format |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
MacBook-Pro-7:~ $ cd /opt/homebrew/Cellar/hadoop/3.3.1/ MacBook-Pro-7:3.3.1 $ sudo -u ******* ./sbin/start-dfs.sh Starting namenodes on [localhost] localhost: WARNING: /var/logs/hadoop does not exist. Creating. Starting datanodes Starting secondary namenodes [MacBook-Pro-7.local] MacBook-Pro-7:3.3.1 $ jps 737 13140 SecondaryNameNode 13226 Jps 12907 NameNode 13007 DataNode MacBook-Pro-7:3.3.1 $ sudo -u ******** ./sbin/start-yarn.sh Starting resourcemanager Starting nodemanagers MacBook-Pro-7:3.3.1 $ jps 737 13140 SecondaryNameNode 13334 ResourceManager 13495 Jps 13433 NodeManager 12907 NameNode 13007 DataNode MacBook-Pro-7:3.3.1 $ cd bin MacBook-Pro-7:bin $ sudo ./hadoop fs -mkdir /linkage MacBook-Pro-7:bin $ sudo ./hadoop fs -ls / Found 1 items drwxr-xr-x - root supergroup 0 2021-12-17 09:44 /linkage MacBook-Pro-7:bin $ sudo ./hadoop fs -put /Users/*******/linkage/data_5.csv /linkage MacBook-Pro-7:bin ********$ sudo ./hadoop fs -chmod 777 /linkage/data_5.csv |
Apache Sparkを立ち上げて、HDFSを確認する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
MacBook-Pro-7:~ $ cd /opt/spark-3.2.0/ MacBook-Pro-7:spark-3.2.0 $ ./bin/spark-shell 21/12/17 09:45:48 WARN Utils: Your hostname, MacBook-Pro-7.local resolves to a loopback address: 127.0.0.1; using 192.168.43.40 instead (on interface en0) Spark context Web UI available at http://192.168.43.40:4040 Spark context available as 'sc' (master = local[*], app id = local-1639701952085). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.0 /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_312) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.Pipeline scala> import org.apache.spark.ml.feature._ import org.apache.spark.ml.feature._ scala> import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.classification.DecisionTreeClassifier scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator scala> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics scala> scala> val featureNames = Seq("dept", "room", "entry", "anesth", "AS", "age", "gender", "height", "weight", "BMI", "start_time", "ane_start", "surg_time", "ope_portion", "position", "pressor") featureNames: Seq[String] = List(dept, room, entry, anesth, AS, age, gender, height, weight, BMI, start_time, ane_start, surg_time, ope_portion, position, pressor) scala> val rdd = sc.textFile("hdfs://localhost:9000/linkage/data_5.csv").map(line => line.split(",")).map(v => (v(0), v(1), v(2), v(3), v(4), v(5), v(6), v(7), v(8), v(9), v(10), v(11), v(12), v(13), v(14), v(15))) rdd: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String)] = MapPartitionsRDD[36] at map at <console>:30 scala> val _df = rdd.toDF(featureNames: _*) _df: org.apache.spark.sql.DataFrame = [dept: string, room: string ... 14 more fields] scala> _df.first res7: org.apache.spark.sql.Row = [15,11,1,2,2,60,0,170.5,63.6,21.88,8,9,720,24,5,TRUE] scala> 21/12/17 11:58:10 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1027945 ms exceeds timeout 120000 ms 21/12/17 11:58:10 WARN SparkContext: Killing executors is not supported by current scheduler. scala> val df = new StringIndexerModel(Array("FALSE", "TRUE")).setInputCol("pressor").setOutputCol("pressor2").transform(_df).drop("pressor") df: org.apache.spark.sql.DataFrame = [dept: string, room: string ... 14 more fields] scala> df.head() res8: org.apache.spark.sql.Row = [15,11,1,2,2,60,0,170.5,63.6,21.88,8,9,720,24,5,1.0] scala> val formula = new RFormula().setFeaturesCol("features").setLabelCol("label").setFormula("pressor2 ~ .").fit(df) formula: org.apache.spark.ml.feature.RFormulaModel = RFormulaModel: uid=rFormula_3af9781efef1, resolvedFormula=ResolvedRFormula(label=pressor2, terms=[dept,room,entry,anesth,AS,age,gender,height,weight,BMI,start_time,ane_start,surg_time,ope_portion,position], hasIntercept=true) scala> val decisionTree = new DecisionTreeClassifier().setFeaturesCol("features").setLabelCol("pressor2").setMaxDepth(4) decisionTree: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_f6eed9368330 scala> val pipeline = new Pipeline().setStages(Array(formula, decisionTree)) pipeline: org.apache.spark.ml.Pipeline = pipeline_e54ed73e465c scala> val trainingAndTest = df.randomSplit(Array(0.5, 0.5)) trainingAndTest: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([dept: string, room: string ... 14 more fields], [dept: string, room: string ... 14 more fields]) scala> val pipelineModel = pipeline.fit(trainingAndTest(0)) 21/12/17 14:36:13 WARN DAGScheduler: Broadcasting large task binary with size 1195.2 KiB 21/12/17 14:36:13 WARN DAGScheduler: Broadcasting large task binary with size 1195.4 KiB 21/12/17 14:36:14 WARN DAGScheduler: Broadcasting large task binary with size 1385.0 KiB 21/12/17 14:36:15 WARN DAGScheduler: Broadcasting large task binary with size 1385.9 KiB 21/12/17 14:36:15 WARN DAGScheduler: Broadcasting large task binary with size 1386.7 KiB 21/12/17 14:36:16 WARN DAGScheduler: Broadcasting large task binary with size 1387.7 KiB pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_e54ed73e465c scala> val prediction = pipelineModel.transform(trainingAndTest(1)) prediction: org.apache.spark.sql.DataFrame = [dept: string, room: string ... 19 more fields] scala> val auc = new BinaryClassificationEvaluator().evaluate(prediction) auc: Double = 0.6760108438882367 scala> prediction.select("features", "label", "pressor2", "prediction").show() 21/12/17 14:36:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. +--------------------+-----+--------+----------+ | features|label|pressor2|prediction| +--------------------+-----+--------+----------+ |(6812,[19,33,38,4...| 0.0| 0.0| 0.0| |(6812,[19,33,38,4...| 0.0| 0.0| 0.0| |(6812,[19,27,38,4...| 0.0| 0.0| 0.0| |(6812,[14,25,38,4...| 0.0| 0.0| 1.0| |(6812,[14,25,39,4...| 0.0| 0.0| 1.0| |(6812,[14,25,39,4...| 1.0| 1.0| 0.0| |(6812,[14,25,41,5...| 0.0| 0.0| 0.0| |(6812,[14,25,41,5...| 1.0| 1.0| 1.0| |(6812,[14,25,40,4...| 1.0| 1.0| 1.0| |(6812,[14,25,40,4...| 1.0| 1.0| 1.0| |(6812,[14,36,39,4...| 1.0| 1.0| 1.0| |(6812,[14,36,41,5...| 0.0| 0.0| 0.0| |(6812,[14,36,40,4...| 1.0| 1.0| 1.0| |(6812,[14,35,40,4...| 1.0| 1.0| 1.0| |(6812,[14,33,38,4...| 1.0| 1.0| 1.0| |(6812,[14,33,39,4...| 1.0| 1.0| 1.0| |(6812,[14,33,41,5...| 1.0| 1.0| 0.0| |(6812,[14,33,41,5...| 1.0| 1.0| 1.0| |(6812,[14,33,40,4...| 1.0| 1.0| 1.0| |(6812,[14,33,40,4...| 1.0| 1.0| 0.0| +--------------------+-----+--------+----------+ only showing top 20 rows scala> val lp = prediction.select("label", "prediction") lp: org.apache.spark.sql.DataFrame = [label: double, prediction: double] scala> val counttotal = prediction.count() counttotal: Long = 11642 scala> val correct = lp.filter($"label" === $"prediction").count() correct: Long = 8202 scala> val wrong = lp.filter(not($"label" === $"prediction")).count() wrong: Long = 3440 scala> val ratioCorrect = correct.toDouble / counttotal.toDouble ratioCorrect: Double = 0.7045181240336712 scala> val ratioWrong = wrong.toDouble / counttotal.toDouble ratioWrong: Double = 0.2954818759663288 scala> val truep = lp.filter($"prediction" === 0.0).filter($"label" === $"prediction").count() / counttotal.toDouble truep: Double = 0.2735784229513829 scala> val truen = lp.filter($"prediction" === 1.0).filter($"label" === $"prediction").count() / counttotal.toDouble truen: Double = 0.43093970108228824 scala> val falsep = lp.filter($"prediction" === 1.0).filter(not($"label" === $"prediction")).count() / counttotal.toDouble falsep: Double = 0.18476206837313178 scala> val falsen = lp.filter($"prediction" === 0.0).filter(not($"label" === $"prediction")).count() / counttotal.toDouble falsen: Double = 0.11071980759319705 scala> println("Total Count: " + counttotal) Total Count: 11642 scala> println("Correct: " + correct) Correct: 8202 scala> println("Wrong: " + wrong) Wrong: 3440 scala> println("Ratio wrong: " + ratioWrong) <console>:30: error: not found: value ratioWrong println("Ratio wrong: " + ratioWrong) ^ scala> println("Ratio correct: " + ratioCorrect) Ratio correct: 0.7045181240336712 scala> println("Ratio wrong: " + ratioWrong) Ratio wrong: 0.2954818759663288 scala> println("Ratio true positive: " + truep) Ratio true positive: 0.2735784229513829 scala> println("Ratio false positive: " + falsep) Ratio false positive: 0.18476206837313178 scala> println("Ratio true negative: " + truen) Ratio true negative: 0.43093970108228824 scala> println("Ratio false negative: " + falsen) Ratio false negative: 0.11071980759319705 |