環境
OS: Ubuntu 18.04
- hadoop-2.6.4
- scala 2.10.6
- spark 1.6.2
作業要求
- 必須使用到 Hadoop、Spark、Scala,使用其他的語言(如:Python)一律 0 分
作業內容
我從 Kaggle 上找到一份 COVID-19 Dataset,主要是以患者目前狀況來預測是否為高風險。
請先將資料集集上傳至 hdfs 後,進入 spark-shell
本篇 hdfs 中資料集路徑為 CovidData/CovidData.csv
步驟
- 將文字讀入,刪除空列及第一列(欄位名稱)並以逗號分割
1 | val lines = sc.textFile("hdfs://master:9000/CovidData/CovidData.csv").filter(x=> !x.isEmpty).map(x=>x.split(",")).mapPartitionsWithIndex { (index,lines) => if (index==0) lines.drop(1) else lines } |
- 執行 count function 確認資料筆數正確,再確認第一筆資料非欄位名稱
1 | lines.count() |
- 建立 CovidData class
1 |
|
- 將讀入的資料轉成 CovidData DataFrame
1 | val CovidDataDF = lines.map(x=> CovidData(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt.toInt,x(8).toInt,x(9).toInt,x(10).toInt,x(11).toInt,x(12).toInt,x(13).toInt,x(14).toInt,x(15).toInt,x(16).toInt,x(17).toInt,x(18).toInt,x(19).toInt,x(20).toInt)).toDF() |
- 顯示 CovidDataDF 的資料框綱要
1 | CovidDataDF.printSchema |
- 顯示前 20 筆資料
1 | CovidDataDF.show |
- 列印筆數
1 | CovidDataDF.count |
- 匯入字串索引器
1 | import org.apache.spark.ml.feature.StringIndexer |
- 重編 ICU 欄位為數值索引
1 | val strInder = new StringIndexer().setInputCol("ICU").setOutputCol("label").fit(CovidDataDF) |
- 放入 CovidDataDF 資料集並顯示出來
1 | strInder.transform(CovidDataDF).show |
- 統計各嚴重程度筆數
1 | strInder.transform(CovidDataDF).orderBy("label").groupBy("label").count.show |
- 查看原來新冠肺炎資料(按索引順序)
1 | strInder.labels |
- 將所有懷孕、慢性阻塞性肺疾病和高血壓資料併入向量欄位
1 | import org.apache.spark.ml.feature.VectorAssembler // 匯入向量轉換器 |
- 先用 CovidDataDF 資料集來轉換並查看
1 | vecAssemer.transform(CovidDataDF).show |
- 重症程度索引數值轉換回字串重症程度名稱
1 | import org.apache.spark.ml.feature.IndexToString |
- 切割訓練和測試資料集
1 | val splits = CovidDataDF.randomSplit(Array(0.7, 0.3)) // 將 訓練集:測試集 切割為 7:3 |
- 查看分割後資料集
1 | CovidDataTrainSet.count |
- 匯入隨機森林分類器及模型
1 | import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel} |
- 建立隨機森林分類器物件
1 | val rfc = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(5).setMaxBins(5) |
- 建立 pipeline
1 | import org.apache.spark.ml.Pipeline |
- 開始訓練
1 | val pipeLineModel = pipeline.fit(CovidDataTrainSet) |
- 計算預測值
1 | val trainPred = pipeLineModel.transform(CovidDataTrainSet) |
- 查看實際值和預測值
1 | trainPred.select("label", "features", "prediction", "predRisk").show(10) |
- 計算準確度的物件
1 | import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator |
- 計算準確度
1 | evaluator.evaluate(trainPred) |
- 參數網格物件(各種超參數組合)
1 | import org.apache.spark.ml.tuning.ParamGridBuilder |
- 交叉驗證器物件
1 | import org.apache.spark.ml.tuning.CrossValidator |
- 進行交叉驗證
1 | val crossValidatorModel = crossValidator.fit(CovidDataTrainSet) |
- 計算預測值
1 | val bestModel = crossValidatorModel.bestModel |
- 計算準確度
1 | evaluator.evaluate(bestTrainPred) |
- 找出最佳組合超參數
1 | crossValidatorModel.getEstimatorParamMaps |
- 計算資料集統計值
1 | CovidDataDF.describe("USMER", "MEDICAL_UNIT", "SEX", "PATIENT_TYPE", "INTUBED", "PNEUMONIA", "AGE", "PREGNANT", "DIABETES", "COPD", "ASTHMA", "INMSUPR", "HIPERTENSION", "OTHER_DISEASE", "CARDIOVASCULAR", "OBESITY", "RENAL_CHRONIC", "TOBACCO", "CLASIFFICATION_FINAL").show |
- 建立最大和最小新冠肺炎數值資料框
1 | val newFeatures=sc.makeRDD(Array(Array(2,13,2,2,0,99,99,121,98,98,98,98,98,98,98,98,98,98,98,7,99),Array(1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1))) |
- 進行預測
1 | val newPred = bestModel.transform(newCovidDataDF) |
- 顯示出預測的重症程度
1 | newPred.select("features","predRisk").show |