感谢宋同学提供的下面内容
基于物品推荐算法
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ItemBasedCF {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//读入数据
val conf = new SparkConf().setAppName("ItemBasedCFModel").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.textFile("/root/data/als/ratingdata.txt")
/*MatrixEntry代表一个分布式矩阵中的每一行(Entry)
* 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。
* 其中i是行坐标,j是列坐标,value是值。*/
val parseData: RDD[MatrixEntry] = data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble)})
data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) })
//CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的
val ratings = new CoordinateMatrix(parseData)
/* 由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性
* RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,与基于用户的CF不同的是,这里不需要进行矩阵的转置,直接就是物品的相似*/
val matrix: RowMatrix = ratings.toRowMatrix()
//需求:为某一个用户推荐商品。基本的逻辑是:首先得到某个用户评价过(买过)的商品,然后计算其他商品与该商品的相似度,并排序;从高到低,把不在用户评价过
//商品里的其他商品推荐给用户。
//例如:为用户2推荐商品
//第一步:得到用户2评价过(买过)的商品 take(5)表示取出所有的5个用户 2:表示第二个用户
//解释:SparseVector:稀疏矩阵
val user2pred = matrix.rows.take(5)(2)
val prefs: SparseVector = user2pred.asInstanceOf[SparseVector]
val uitems = prefs.indices //得到了用户2评价过(买过)的商品的ID
val ipi = (uitems zip prefs.values) //得到了用户2评价过(买过)的商品的ID和评分,即:(物品ID,评分)
//计算物品的相似性,并输出
val similarities = matrix.columnSimilarities()
val indexdsimilar = similarities.toIndexedRowMatrix().rows.map {
case IndexedRow(idx, vector) => (idx.toInt, vector)
}
//ij表示:其他用户购买的商品与用户2购买的该商品的相似度
val ij = sc.parallelize(ipi).join(indexdsimilar).flatMap {
case (i, (pi, vector: SparseVector)) => (vector.indices zip vector.values)
}
/********** begin **********/
//ij1表示:其他用户购买过,但不在用户2购买的商品的列表中的商品和评分
val ij1 = ij.filter { case (item, pref) => !uitems.contains(item) }
//将这些商品的评分求和,并降序排列,并推荐前两个物品
val ij2 = ij1.reduceByKey(_ + _).sortBy(_._2, false).take(2)
/********** end **********/
// crgjl
//取消以下1行注释
for (id <- ij2) print(id._1 + " ")
sc.stop()
}
}
基于用户的推荐算法
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object UserBasedCF {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 创建一个SparkContext
val conf = new SparkConf().setAppName("UserBasedCF").setMaster("local")
val sc = new SparkContext(conf)
// 读入数据
val data = sc.textFile("/root/data/als/ratingdata.txt")
// 解析出评分矩阵的每一行
val parseData: RDD[MatrixEntry] = data.map(_.split(",")
match { case Array(user, item, rate) =>
MatrixEntry(user.toLong, item.toLong, rate.toDouble)
})
// 构建关联矩阵
val ratings = new CoordinateMatrix(parseData)
// 转置矩阵以计算列(用户)的相似性
val matrix: RowMatrix = ratings.transpose().toRowMatrix()
// 计算得到用户的相似度矩阵
val similarities = matrix.columnSimilarities()
// 得到某个用户对所有物品的评分
val ratingOfUser1 = ratings.entries.filter(_.i == 1).
map(x => (x.j, x.value)).
sortBy(_._1).
map(_._1).
collect().
toList.
toArray
// 得到用户1相对于其他用户的相似性
val similarityOfUser1 = similarities.entries.filter(_.i == 1).
sortBy(_.value, false).
map(_.value).
collect
// 需求:为用户1推荐2个商品
// 思路:找到与用户1相似性最高的两个用户,将这两个用户评过分的物品,用户1没有评过分的物品推荐给用户1
val similarityTopUser = similarities.entries.filter(_.i == 1).
sortBy(_.value, false).
map(x => (x.j, x.value)).
collect.
take(2)
//println("与用户1最相似的两个用户如下:")
for (s <- similarityTopUser) print(s._1 + " ")
for (s <- similarityTopUser) {
// 找到这两个用户评过分的商品,与用户1没有评过分的物品
val userId = s._1
val ratingOfTemp = ratings.entries.filter(_.i == userId).
map(x => (x.j, x.value)).
sortBy(_._1).
map(_._1).
collect().
toList.
toArray
// 用户1与当前用户求差集
val dis = ratingOfTemp diff ratingOfUser1
//println("用户" + userId + "要推荐给用户1的商品id为")
for (id <- dis) print(id + " ")
}
sc.stop()
}
}
基于ALS的推荐算法
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.{DataFrame, SparkSession}
object ALS {
case class Rating(userId: Int, movieId: Int, rating: Float)
def parseRating(str: String): Rating = {
val fields = str.split(",")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
}
def main(args: Array[String]) {
val spark = SparkSession
.builder
.master("local")
.appName("ALS")
.getOrCreate()
import spark.implicits._
val ratings = spark.read.textFile("data/als/ratingdata.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
evaluatingRMSE(predictions)
spark.stop()
}
def evaluatingRMSE(predictions:DataFrame):Unit = {
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
if (rmse < 2){
print("\n" + "good")
}else{
println()
predictions.show(false)
}
}
}
基于随机森林预测贷款风险
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.mllib.evaluation.RegressionMetrics
object Credit {
case class Credit(
creditability: Double,
balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
)
def parseCredit(line: Array[Double]): Credit = {
Credit(
line(0),
line(1) - 1, line(2), line(3), line(4), line(5),
line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
)
}
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}
def evaluatingAUC(predictedResultDF:DataFrame, labelstring:String):Unit = {
val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelstring)
val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
if(predictionAUC > 0.6){
print("\n" + "good")
}else{
print(s"areaUnderROC: $predictionAUC")
}
}
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Credit")
.master("local")
.getOrCreate()
import spark.implicits._
val creditDF = parseRDD(spark.sparkContext.textFile("/root/data/germancredit.csv")).map(parseCredit).toDF()
creditDF.createTempView("credit")
val featureCols = Array("balance", "duration", "history", "purpose", "amount",
"savings", "employment", "instPercent", "sexMarried", "guarantors",
"residenceDuration", "assets", "age", "concCredit", "apartment",
"credits", "occupation", "dependents", "hasPhone", "foreign")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(creditDF)
val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(5).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
val model = classifier.fit(trainingData)
val predictions = model.transform(testData)
evaluatingAUC(predictions,"label")
spark.stop()
}
}
基于多层感知器的手机短信分类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec}
import org.apache.spark.sql.{DataFrame,SparkSession}
object SMSClassifier {
final val VECTOR_SIZE = 100
def evaluatingAUC(predictedResultDF:DataFrame, labelcol: String):Unit = {
val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelcol).setRawPredictionCol("prediction")
val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
if(predictionAUC > 0.8){
print("\n" + "good")
}else{
print(s"areaUnderROC: $predictionAUC")
}
}
def main(args: Array[String]) {
val spark = SparkSession
.builder
.master("local")
.appName("SMS Message Classification (HAM or SPAM)")
.getOrCreate()
val parsedRDD = spark.sparkContext.textFile("data/smsspamcollection/SMSSpamCollection").map(_.split("\t")).map(eachRow => {
(eachRow(0),eachRow(1).split(" "))
})
val msgDF = spark.createDataFrame(parsedRDD).toDF("label","message")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(msgDF)
val word2Vec = new Word2Vec()
.setInputCol("message")
.setOutputCol("features")
.setVectorSize(VECTOR_SIZE)
.setMinCount(1)
val layers = Array[Int](VECTOR_SIZE,6,5,2)
val mlpc = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(512)
.setSeed(1234L)
.setMaxIter(128)
.setFeaturesCol("features")
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2))
val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter))
val model = pipeline.fit(trainingData)
val predictionResultDF = model.transform(testData)
evaluatingAUC(predictionResultDF,"indexedLabel")
spark.stop()
}
}
19 条评论
大大真的太厉害了
没有没有啦
古德脚!!干的漂亮(ฅ´ω`ฅ)
ヾ(≧∇≦*)ゝ
代码,一抗一麻袋
臭杨X
思裹伊 非常的快昂 ୧(๑•̀⌄•́๑)૭
还是要谢谢宋同学,我做的话今天一天都弄不完,哈哈
哇哦 好谦虚哇
没有没有,这个涉及到Spark中的scala语言。当时学过,这个语言是真的难。⌇●﹏●⌇
对于楼主来说那不是轻轻松松 没的难度 不要谦虚了(ノ°ο°)ノ
不要这么说啦,都两年了,这个语言我早就不会了。就是因为你们这些爱分享的小可爱,大家技术才能变得更强。
没有你的博客就不能分享了 所以你最棒|´・ω・)ノ
你也棒棒的,嘿嘿
可以 共同进步
楼主你好 麻烦问一下只有一题吗
更新中,过10分钟再看哦!
还有吗
马上马上,还在更新中(๑•̀ㅁ•́ฅ)