感谢宋同学提供的下面内容

基于物品推荐算法

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ItemBasedCF {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //读入数据
    val conf = new SparkConf().setAppName("ItemBasedCFModel").setMaster("local")
    val sc = new SparkContext(conf)   
    val data = sc.textFile("/root/data/als/ratingdata.txt") 
    /*MatrixEntry代表一个分布式矩阵中的每一行(Entry)
     * 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。
     * 其中i是行坐标,j是列坐标,value是值。*/
    val parseData: RDD[MatrixEntry] = data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble)})
    data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) })
    //CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的
    val ratings = new CoordinateMatrix(parseData)

    /* 由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性
     * RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,与基于用户的CF不同的是,这里不需要进行矩阵的转置,直接就是物品的相似*/
    val matrix: RowMatrix = ratings.toRowMatrix()

    //需求:为某一个用户推荐商品。基本的逻辑是:首先得到某个用户评价过(买过)的商品,然后计算其他商品与该商品的相似度,并排序;从高到低,把不在用户评价过
    //商品里的其他商品推荐给用户。
    //例如:为用户2推荐商品

    //第一步:得到用户2评价过(买过)的商品  take(5)表示取出所有的5个用户  2:表示第二个用户
    //解释:SparseVector:稀疏矩阵
    val user2pred = matrix.rows.take(5)(2)
    val prefs: SparseVector = user2pred.asInstanceOf[SparseVector]
    val uitems = prefs.indices //得到了用户2评价过(买过)的商品的ID
    val ipi = (uitems zip prefs.values) //得到了用户2评价过(买过)的商品的ID和评分,即:(物品ID,评分)

    //计算物品的相似性,并输出
    val similarities = matrix.columnSimilarities()
    val indexdsimilar = similarities.toIndexedRowMatrix().rows.map {
      case IndexedRow(idx, vector) => (idx.toInt, vector)
    }

    //ij表示:其他用户购买的商品与用户2购买的该商品的相似度
    val ij = sc.parallelize(ipi).join(indexdsimilar).flatMap {
      case (i, (pi, vector: SparseVector)) => (vector.indices zip vector.values)
    }
    /********** begin **********/
    //ij1表示:其他用户购买过,但不在用户2购买的商品的列表中的商品和评分
    val ij1 = ij.filter { case (item, pref) => !uitems.contains(item) }

    //将这些商品的评分求和,并降序排列,并推荐前两个物品
    val ij2 = ij1.reduceByKey(_ + _).sortBy(_._2, false).take(2)

    /********** end **********/
    // crgjl
    //取消以下1行注释
    for (id <- ij2) print(id._1 + " ")
    sc.stop()
  }
}

基于用户的推荐算法

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object UserBasedCF {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // 创建一个SparkContext
    val conf = new SparkConf().setAppName("UserBasedCF").setMaster("local")
    val sc = new SparkContext(conf)
    // 读入数据
    val data = sc.textFile("/root/data/als/ratingdata.txt")
    // 解析出评分矩阵的每一行
    val parseData: RDD[MatrixEntry] = data.map(_.split(",")
    match { case Array(user, item, rate) =>
        MatrixEntry(user.toLong, item.toLong, rate.toDouble)
    })
    // 构建关联矩阵
    val ratings = new CoordinateMatrix(parseData)
    // 转置矩阵以计算列(用户)的相似性
    val matrix: RowMatrix = ratings.transpose().toRowMatrix()
    // 计算得到用户的相似度矩阵
    val similarities = matrix.columnSimilarities()
    // 得到某个用户对所有物品的评分
    val ratingOfUser1 = ratings.entries.filter(_.i == 1).
      map(x => (x.j, x.value)).
      sortBy(_._1).
      map(_._1).
      collect().
      toList.
      toArray
    // 得到用户1相对于其他用户的相似性
    val similarityOfUser1 = similarities.entries.filter(_.i == 1).
      sortBy(_.value, false).
      map(_.value).
      collect
    // 需求:为用户1推荐2个商品
    // 思路:找到与用户1相似性最高的两个用户,将这两个用户评过分的物品,用户1没有评过分的物品推荐给用户1
    val similarityTopUser = similarities.entries.filter(_.i == 1).
      sortBy(_.value, false).
      map(x => (x.j, x.value)).
      collect.
      take(2)
    //println("与用户1最相似的两个用户如下:")
    for (s <- similarityTopUser) print(s._1 + " ")
    for (s <- similarityTopUser) {
      // 找到这两个用户评过分的商品,与用户1没有评过分的物品
      val userId = s._1
      val ratingOfTemp = ratings.entries.filter(_.i == userId).
        map(x => (x.j, x.value)).
        sortBy(_._1).
        map(_._1).
        collect().
        toList.
        toArray
      // 用户1与当前用户求差集
      val dis = ratingOfTemp diff ratingOfUser1
      //println("用户" + userId + "要推荐给用户1的商品id为")
      for (id <- dis) print(id + " ")
    }
    sc.stop()
  }
}

基于ALS的推荐算法

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.{DataFrame, SparkSession}
object ALS {
  case class Rating(userId: Int, movieId: Int, rating: Float)
  def parseRating(str: String): Rating = {
    val fields = str.split(",")
    assert(fields.size == 3)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  }
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .master("local")
      .appName("ALS")
      .getOrCreate()
    import spark.implicits._
    val ratings = spark.read.textFile("data/als/ratingdata.txt")
      .map(parseRating)
      .toDF()
    val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
    // Build the recommendation model using ALS on the training data
    val als = new ALS()
      .setMaxIter(5)
      .setRegParam(0.01)
      .setUserCol("userId")
      .setItemCol("movieId")
      .setRatingCol("rating")
    val model = als.fit(training)
    // Evaluate the model by computing the RMSE on the test data
    // Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
    model.setColdStartStrategy("drop")
    val predictions = model.transform(test)
    evaluatingRMSE(predictions)
    spark.stop()
  }
  def evaluatingRMSE(predictions:DataFrame):Unit = {
    val evaluator = new RegressionEvaluator()
      .setMetricName("rmse")
      .setLabelCol("rating")
      .setPredictionCol("prediction")
    val rmse = evaluator.evaluate(predictions)
    if (rmse < 2){
      print("\n" + "good")
    }else{
      println()
      predictions.show(false)
    }
  }
}

基于随机森林预测贷款风险

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.mllib.evaluation.RegressionMetrics
object Credit {
  case class Credit(
                     creditability: Double,
                     balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
                     savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
                     residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
                     credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
                   )
  def parseCredit(line: Array[Double]): Credit = {
    Credit(
      line(0),
      line(1) - 1, line(2), line(3), line(4), line(5),
      line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
      line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
      line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
    )
  }
  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }
  def evaluatingAUC(predictedResultDF:DataFrame, labelstring:String):Unit = {
    val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelstring)
    val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
    if(predictionAUC > 0.6){
        print("\n" + "good")
    }else{
        print(s"areaUnderROC: $predictionAUC")
    }
  }
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Credit")
      .master("local")
      .getOrCreate()
    import spark.implicits._
    val creditDF = parseRDD(spark.sparkContext.textFile("/root/data/germancredit.csv")).map(parseCredit).toDF()
    creditDF.createTempView("credit")
    val featureCols = Array("balance", "duration", "history", "purpose", "amount",
      "savings", "employment", "instPercent", "sexMarried", "guarantors",
      "residenceDuration", "assets", "age", "concCredit", "apartment",
      "credits", "occupation", "dependents", "hasPhone", "foreign")
    val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
    val df2 = assembler.transform(creditDF)
    val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
    val df3 = labelIndexer.fit(df2).transform(df2)
    val splitSeed = 5043
    val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
    val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(5).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
    val model = classifier.fit(trainingData)
    val predictions = model.transform(testData)
    evaluatingAUC(predictions,"label")
    spark.stop()
  }
}

基于多层感知器的手机短信分类

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec}
import org.apache.spark.sql.{DataFrame,SparkSession}
object SMSClassifier {
  final val VECTOR_SIZE = 100
  def evaluatingAUC(predictedResultDF:DataFrame, labelcol: String):Unit = {
    val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelcol).setRawPredictionCol("prediction")
    val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
    if(predictionAUC > 0.8){
      print("\n" + "good")
    }else{
      print(s"areaUnderROC: $predictionAUC")
    }
  }
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .master("local")
      .appName("SMS Message Classification (HAM or SPAM)")
      .getOrCreate()
    val parsedRDD = spark.sparkContext.textFile("data/smsspamcollection/SMSSpamCollection").map(_.split("\t")).map(eachRow => {
      (eachRow(0),eachRow(1).split(" "))
    })
    val msgDF = spark.createDataFrame(parsedRDD).toDF("label","message")
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(msgDF)
    val word2Vec = new Word2Vec()
      .setInputCol("message")
      .setOutputCol("features")
      .setVectorSize(VECTOR_SIZE)
      .setMinCount(1)
    val layers = Array[Int](VECTOR_SIZE,6,5,2)
    val mlpc = new MultilayerPerceptronClassifier()
      .setLayers(layers)
      .setBlockSize(512)
      .setSeed(1234L)
      .setMaxIter(128)
      .setFeaturesCol("features")
      .setLabelCol("indexedLabel")
      .setPredictionCol("prediction")
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")
      .setLabels(labelIndexer.labels)
    val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2))
    val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter))
    val model = pipeline.fit(trainingData)
    val predictionResultDF = model.transform(testData)
    evaluatingAUC(predictionResultDF,"indexedLabel")
    spark.stop()
  }
}
最后修改:2021 年 07 月 01 日
如果觉得我的文章对你有用,请随意赞赏