0%

巨量資料分析期末作業 紀錄

環境

OS: Ubuntu 18.04

  • hadoop-2.6.4
  • scala 2.10.6
  • spark 1.6.2

作業要求

  1. 必須使用到 Hadoop、Spark、Scala,使用其他的語言(如:Python)一律 0 分

作業內容

我從 Kaggle 上找到一份 COVID-19 Dataset,主要是以患者目前狀況來預測是否為高風險。

請先將資料集集上傳至 hdfs 後,進入 spark-shell
本篇 hdfs 中資料集路徑為 CovidData/CovidData.csv

步驟

  1. 將文字讀入,刪除空列及第一列(欄位名稱)並以逗號分割
    1
    val lines = sc.textFile("hdfs://master:9000/CovidData/CovidData.csv").filter(x=> !x.isEmpty).map(x=>x.split(",")).mapPartitionsWithIndex { (index,lines) => if (index==0) lines.drop(1) else lines }
  2. 執行 count function 確認資料筆數正確,再確認第一筆資料非欄位名稱
1
2
3
lines.count()
lines.first

  1. 建立 CovidData class

    1
    2
    3

    case class CovidData(USMER: Int, MEDICAL_UNIT: Int, SEX: Int, PATIENT_TYPE: Int, DATE_DIED: String, INTUBED: Int, PNEUMONIA: Int, AGE: Int, PREGNANT: Int, DIABETES: Int, COPD: Int, ASTHMA: Int, INMSUPR: Int, HIPERTENSION: Int, OTHER_DISEASE: Int, CARDIOVASCULAR: Int, OBESITY: Int, RENAL_CHRONIC: Int, TOBACCO: Int, CLASIFFICATION_FINAL: Int, ICU: Int)

  2. 將讀入的資料轉成 CovidData DataFrame

1
val CovidDataDF = lines.map(x=> CovidData(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt.toInt,x(8).toInt,x(9).toInt,x(10).toInt,x(11).toInt,x(12).toInt,x(13).toInt,x(14).toInt,x(15).toInt,x(16).toInt,x(17).toInt,x(18).toInt,x(19).toInt,x(20).toInt)).toDF()
  1. 顯示 CovidDataDF 的資料框綱要

    1
    CovidDataDF.printSchema
  2. 顯示前 20 筆資料

    1
    CovidDataDF.show
  3. 列印筆數

    1
    CovidDataDF.count
  4. 匯入字串索引器

1
import org.apache.spark.ml.feature.StringIndexer
  1. 重編 ICU 欄位為數值索引

    1
    val strInder = new StringIndexer().setInputCol("ICU").setOutputCol("label").fit(CovidDataDF)
  2. 放入 CovidDataDF 資料集並顯示出來

    1
    strInder.transform(CovidDataDF).show
  3. 統計各嚴重程度筆數

    1
    strInder.transform(CovidDataDF).orderBy("label").groupBy("label").count.show
  4. 查看原來新冠肺炎資料(按索引順序)

    1
    strInder.labels
  5. 將所有懷孕、慢性阻塞性肺疾病和高血壓資料併入向量欄位

    1
    2
    import org.apache.spark.ml.feature.VectorAssembler // 匯入向量轉換器
    val vecAssemer = new VectorAssembler().setInputCols(Array("PREGNANT", "COPD", "HIPERTENSION")).setOutputCol("features")
  6. 先用 CovidDataDF 資料集來轉換並查看

    1
    vecAssemer.transform(CovidDataDF).show
  7. 重症程度索引數值轉換回字串重症程度名稱

    1
    2
    import org.apache.spark.ml.feature.IndexToString
    val indToRisk = new IndexToString().setInputCol("prediction").setOutputCol("predRisk").setLabels(strInder.labels)
  8. 切割訓練和測試資料集

    1
    2
    3
    4
    val splits = CovidDataDF.randomSplit(Array(0.7, 0.3)) // 將 訓練集:測試集 切割為 7:3

    val CovidDataTrainSet = splits(0)
    val CovidDataTestSet = splits(1)
  9. 查看分割後資料集

    1
    2
    3
    4
    5
    6
    CovidDataTrainSet.count
    CovidDataTestSet.count

    CovidDataTrainSet.groupBy("ICU").count.show
    CovidDataTestSet.groupBy("ICU").count.show

  10. 匯入隨機森林分類器及模型

    1
    import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel}
  11. 建立隨機森林分類器物件

    1
    val rfc = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(5).setMaxBins(5)
  12. 建立 pipeline

    1
    2
    3
    import org.apache.spark.ml.Pipeline
    val pipeline = new Pipeline().setStages(Array(strInder, vecAssemer, rfc, indToRisk))

  13. 開始訓練

    1
    val pipeLineModel = pipeline.fit(CovidDataTrainSet)
  14. 計算預測值

    1
    2
    val trainPred = pipeLineModel.transform(CovidDataTrainSet)
    val testPred = pipeLineModel.transform(CovidDataTestSet)
  15. 查看實際值和預測值

    1
    2
    trainPred.select("label", "features", "prediction", "predRisk").show(10)
    testPred.select("label", "features", "prediction", "predRisk").show(10)
  16. 計算準確度的物件

    1
    2
    3
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision")

  17. 計算準確度

    1
    2
    evaluator.evaluate(trainPred)
    evaluator.evaluate(testPred)
  18. 參數網格物件(各種超參數組合)

    1
    2
    import org.apache.spark.ml.tuning.ParamGridBuilder
    val param = new ParamGridBuilder().addGrid(rfc.impurity, Array("gini", "entropy")).addGrid(rfc.maxDepth, Array(3,5)).addGrid(rfc.numTrees, Array(5,7)).addGrid(rfc.maxBins, Array(5,10,20)).build()
  19. 交叉驗證器物件

    1
    2
    import org.apache.spark.ml.tuning.CrossValidator
    val crossValidator = new CrossValidator().setEstimator(pipeline).setEstimatorParamMaps(param).setNumFolds(10).setEvaluator(evaluator)
  20. 進行交叉驗證

    1
    val crossValidatorModel = crossValidator.fit(CovidDataTrainSet)
  21. 計算預測值

    1
    2
    3
    val bestModel = crossValidatorModel.bestModel
    val bestTrainPred = bestModel.transform(CovidDataTrainSet)
    val bestTestPred = bestModel.transform(CovidDataTestSet)
  22. 計算準確度

    1
    2
    3
    evaluator.evaluate(bestTrainPred)
    evaluator.evaluate(bestTestPred)

  23. 找出最佳組合超參數

    1
    2
    3
    4
    5
    6
    crossValidatorModel.getEstimatorParamMaps
    crossValidatorModel.avgMetrics

    val bestParam=crossValidatorModel.getEstimatorParamMaps.zip(crossValidatorModel.avgMetrics).maxBy(_._2)._1
    println("最佳組合是:\n"+bestParam)

  24. 計算資料集統計值

    1
    CovidDataDF.describe("USMER", "MEDICAL_UNIT", "SEX", "PATIENT_TYPE", "INTUBED", "PNEUMONIA", "AGE", "PREGNANT", "DIABETES", "COPD", "ASTHMA", "INMSUPR", "HIPERTENSION", "OTHER_DISEASE", "CARDIOVASCULAR", "OBESITY", "RENAL_CHRONIC", "TOBACCO", "CLASIFFICATION_FINAL").show
  25. 建立最大和最小新冠肺炎數值資料框

    1
    2
    3
    4
    5
    val newFeatures=sc.makeRDD(Array(Array(2,13,2,2,0,99,99,121,98,98,98,98,98,98,98,98,98,98,98,7,99),Array(1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1)))


    val newCovidDataDF = newFeatures.map(a => CovidData (a(0), a(1), a(2),a(3), "" ,a(5),a(6),a(7),a(8),a(9),a(10),a(11),a(12),a(13),a(14),a(15),a(16),a(17),a(18),a(19),a(20))).toDF()

  26. 進行預測

    1
    val newPred = bestModel.transform(newCovidDataDF)
  27. 顯示出預測的重症程度

    1
    newPred.select("features","predRisk").show