推荐系统itembase算法scala实现
scala,spark,算法2016-08-03
尊重版权,转载注明地址
博主最近在学习scala,scala是面向函数编程,这与面向对象编程的java有着很大的差别,学习的第一个demo当然是声名显赫的wordcount,代码如下:
val conf = new SparkConf()
conf.setMaster("local[1]").setAppName("word count")
val sc = new SparkContext(conf)
val line = sc.textFile("D:/HDFS/Input/1.txt")
val reduceByKey = line.flatMap(file => file.split(" ")).map(word => (word, 1)).reduceByKey(_ + _ )
当然啦,博主从工作开始便有幸接触到推荐系统的开发设计工作,非常lucky。工程化推荐系统主要经历了三个发展阶段
第一阶段:
推荐系统主要包括算法与服务两部分,纯java编写,算法部分雏形期大概耗时30人天,演变过程经历了从单线程到多线程,从单机到master-slave
第二阶段
随着hadoop的发展,再加之mahout的出现,慢慢的将部分协同过滤算法(看了还看,买了还买)迁移到hadoop平台上,毕竟人家集群机器多呀,分布式不要你操心呀~~~好吧,我只想说两个字: 真棒!
第三阶段
hadoop毕竟是基于IO,离线计算过程中存在着大量的磁盘读写,怎么办呐?中间结果放内存呀~~这时候spark出生了,spark提供了scala,java,phyon API,非常友好,当然了最近也是相当火爆
前段时间有用spark java写过协同过滤算法增量计算的实现 借着学习scala的机会,重新实现了一遍
**初始数据**
user,itemcode,ref
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0
**scala代码**
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[1]").setAppName("itemBase");
val sc = new SparkContext(conf);
//数据清洗
val dataClean = sc.textFile("D:/HDFS/Input/recommend.txt").map { line =>
val tokens = line.split(",")
(tokens(0).toLong, (tokens(1).toLong, if (tokens.length > 2) tokens(2).toFloat else 0f))
}.aggregateByKey(Array[(Long, Float)]())(_ :+ _, _ ++ _).filter(_._2.size > 2).values.persist(StorageLevel.MEMORY_ONLY_SER)
//全局计算模
val norms = dataClean.flatMap(_.map(y => (y._1, y._2 * y._2))).reduceByKey(_ + _)
//广播数据
val normsMap = sc.broadcast(norms.collectAsMap())
//共生矩阵
val matrix = dataClean.map(list => list.sortWith(_._1 > _._1)).flatMap(occMatrix).reduceByKey(_ + _)
//计算相似度
val similarity = matrix.map(a => (a._1._1, (a._1._2, 1 / (1 + Math.sqrt(normsMap.value.get(a._1._1).get
+ normsMap.value.get(a._1._2).get - 2 * a._2)))))
similarity.collect().foreach(println)
sc.stop
}
def occMatrix(a: Array[(Long, Float)]): ArrayBuffer[((Long, Long), Float)] = {
val array = ArrayBuffer[((Long, Long), Float)]()
//笛卡尔共生
for (i <- 0 to (a.size - 1); j <- (i + 1) to (a.size - 1)) {
array += (((a(i)._1, a(j)._1), a(i)._2 * a(j)._2))
}
array
}
occMatrix方法中array参数如果是list或者array会无法add,一直返回为空,此处纠结博主好久~~~后来查阅资料才发现跟java完全不是一个概念,scala真的好反人类。
StorageLevel.MEMORY_ONLY_SER 表示序列化只存内存,该枚举有多值,具体含义参考spark官方文档,建议大家使用带有后缀的SER。
**输出结果**
(103,(104,0.14037600977966974))
(102,(103,0.1975496259559987))
(104,(107,0.13472338607037426))
(101,(107,0.10275248635596666))
(101,(102,0.14201473202245876))
(104,(105,0.16736577623297264))
(102,(104,0.12789210656028413))
(105,(106,0.14201473202245876))
(101,(104,0.16015261286229274))
(103,(105,0.11208890297777215))
(101,(106,0.1424339656566283))
(101,(103,0.15548737703860027))
(102,(106,0.14972506706560876))
(104,(106,0.18181818181818182))
(102,(105,0.14328432723886902))
(101,(105,0.1158457425543559))
(103,(106,0.1424339656566283))
(105,(107,0.2204812092115424))
至此推荐算法itembase的scala实现就已经完成啦~~~纯java实现需要30人天,大约1.5W行代码,spark java实现需要三个人天,大约400行代码,spark scala 实现需要一个人天,大约20行代码。。。猿猿们,工业4.0时代我们快失业啦!!!