環境
OS: Ubuntu 18.04
- hadoop-2.6.4
- scala 2.10.6
- spark 1.6.2
作業要求
- 必須使用到 Hadoop、Spark、Scala,使用其他的語言(如:Python)一律 0 分
作業內容
我從 Kaggle 上找到一份 COVID-19 Dataset,主要是以患者目前狀況來預測是否為高風險。
請先將資料集集上傳至 hdfs 後,進入 spark-shell
本篇 hdfs 中資料集路徑為 CovidData/CovidData.csv
步驟
- 將文字讀入,刪除空列及第一列(欄位名稱)並以逗號分割
1
val lines = sc.textFile("hdfs://master:9000/CovidData/CovidData.csv").filter(x=> !x.isEmpty).map(x=>x.split(",")).mapPartitionsWithIndex { (index,lines) => if (index==0) lines.drop(1) else lines }
- 執行 count function 確認資料筆數正確,再確認第一筆資料非欄位名稱
1 | lines.count() |
建立 CovidData class
1
2
3
case class CovidData(USMER: Int, MEDICAL_UNIT: Int, SEX: Int, PATIENT_TYPE: Int, DATE_DIED: String, INTUBED: Int, PNEUMONIA: Int, AGE: Int, PREGNANT: Int, DIABETES: Int, COPD: Int, ASTHMA: Int, INMSUPR: Int, HIPERTENSION: Int, OTHER_DISEASE: Int, CARDIOVASCULAR: Int, OBESITY: Int, RENAL_CHRONIC: Int, TOBACCO: Int, CLASIFFICATION_FINAL: Int, ICU: Int)將讀入的資料轉成 CovidData DataFrame
1 | val CovidDataDF = lines.map(x=> CovidData(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt.toInt,x(8).toInt,x(9).toInt,x(10).toInt,x(11).toInt,x(12).toInt,x(13).toInt,x(14).toInt,x(15).toInt,x(16).toInt,x(17).toInt,x(18).toInt,x(19).toInt,x(20).toInt)).toDF() |
顯示 CovidDataDF 的資料框綱要
1
CovidDataDF.printSchema
顯示前 20 筆資料
1
CovidDataDF.show
列印筆數
1
CovidDataDF.count
匯入字串索引器
1 | import org.apache.spark.ml.feature.StringIndexer |
重編 ICU 欄位為數值索引
1
val strInder = new StringIndexer().setInputCol("ICU").setOutputCol("label").fit(CovidDataDF)
放入 CovidDataDF 資料集並顯示出來
1
strInder.transform(CovidDataDF).show
統計各嚴重程度筆數
1
strInder.transform(CovidDataDF).orderBy("label").groupBy("label").count.show
查看原來新冠肺炎資料(按索引順序)
1
strInder.labels
將所有懷孕、慢性阻塞性肺疾病和高血壓資料併入向量欄位
1
2import org.apache.spark.ml.feature.VectorAssembler // 匯入向量轉換器
val vecAssemer = new VectorAssembler().setInputCols(Array("PREGNANT", "COPD", "HIPERTENSION")).setOutputCol("features")先用 CovidDataDF 資料集來轉換並查看
1
vecAssemer.transform(CovidDataDF).show
重症程度索引數值轉換回字串重症程度名稱
1
2import org.apache.spark.ml.feature.IndexToString
val indToRisk = new IndexToString().setInputCol("prediction").setOutputCol("predRisk").setLabels(strInder.labels)切割訓練和測試資料集
1
2
3
4val splits = CovidDataDF.randomSplit(Array(0.7, 0.3)) // 將 訓練集:測試集 切割為 7:3
val CovidDataTrainSet = splits(0)
val CovidDataTestSet = splits(1)查看分割後資料集
1
2
3
4
5
6CovidDataTrainSet.count
CovidDataTestSet.count
CovidDataTrainSet.groupBy("ICU").count.show
CovidDataTestSet.groupBy("ICU").count.show匯入隨機森林分類器及模型
1
import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel}
建立隨機森林分類器物件
1
val rfc = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(5).setMaxBins(5)
建立 pipeline
1
2
3import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(strInder, vecAssemer, rfc, indToRisk))開始訓練
1
val pipeLineModel = pipeline.fit(CovidDataTrainSet)
計算預測值
1
2val trainPred = pipeLineModel.transform(CovidDataTrainSet)
val testPred = pipeLineModel.transform(CovidDataTestSet)查看實際值和預測值
1
2trainPred.select("label", "features", "prediction", "predRisk").show(10)
testPred.select("label", "features", "prediction", "predRisk").show(10)計算準確度的物件
1
2
3import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision")計算準確度
1
2evaluator.evaluate(trainPred)
evaluator.evaluate(testPred)參數網格物件(各種超參數組合)
1
2import org.apache.spark.ml.tuning.ParamGridBuilder
val param = new ParamGridBuilder().addGrid(rfc.impurity, Array("gini", "entropy")).addGrid(rfc.maxDepth, Array(3,5)).addGrid(rfc.numTrees, Array(5,7)).addGrid(rfc.maxBins, Array(5,10,20)).build()交叉驗證器物件
1
2import org.apache.spark.ml.tuning.CrossValidator
val crossValidator = new CrossValidator().setEstimator(pipeline).setEstimatorParamMaps(param).setNumFolds(10).setEvaluator(evaluator)進行交叉驗證
1
val crossValidatorModel = crossValidator.fit(CovidDataTrainSet)
計算預測值
1
2
3val bestModel = crossValidatorModel.bestModel
val bestTrainPred = bestModel.transform(CovidDataTrainSet)
val bestTestPred = bestModel.transform(CovidDataTestSet)計算準確度
1
2
3evaluator.evaluate(bestTrainPred)
evaluator.evaluate(bestTestPred)找出最佳組合超參數
1
2
3
4
5
6crossValidatorModel.getEstimatorParamMaps
crossValidatorModel.avgMetrics
val bestParam=crossValidatorModel.getEstimatorParamMaps.zip(crossValidatorModel.avgMetrics).maxBy(_._2)._1
println("最佳組合是:\n"+bestParam)計算資料集統計值
1
CovidDataDF.describe("USMER", "MEDICAL_UNIT", "SEX", "PATIENT_TYPE", "INTUBED", "PNEUMONIA", "AGE", "PREGNANT", "DIABETES", "COPD", "ASTHMA", "INMSUPR", "HIPERTENSION", "OTHER_DISEASE", "CARDIOVASCULAR", "OBESITY", "RENAL_CHRONIC", "TOBACCO", "CLASIFFICATION_FINAL").show
建立最大和最小新冠肺炎數值資料框
1
2
3
4
5val newFeatures=sc.makeRDD(Array(Array(2,13,2,2,0,99,99,121,98,98,98,98,98,98,98,98,98,98,98,7,99),Array(1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1)))
val newCovidDataDF = newFeatures.map(a => CovidData (a(0), a(1), a(2),a(3), "" ,a(5),a(6),a(7),a(8),a(9),a(10),a(11),a(12),a(13),a(14),a(15),a(16),a(17),a(18),a(19),a(20))).toDF()進行預測
1
val newPred = bestModel.transform(newCovidDataDF)
顯示出預測的重症程度
1
newPred.select("features","predRisk").show