微信扫一扫

028-83195727 , 15928970361
business@forhy.com

推荐系统itembase算法scala实现

scala,spark,算法2016-08-03

#

尊重版权,转载注明地址

#

博主最近在学习scala,scala是面向函数编程,这与面向对象编程的java有着很大的差别,学习的第一个demo当然是声名显赫的wordcount,代码如下:

    val conf = new SparkConf()
    conf.setMaster("local[1]").setAppName("word count")
    val sc = new SparkContext(conf)
    val line = sc.textFile("D:/HDFS/Input/1.txt")
    val reduceByKey = line.flatMap(file => file.split(" ")).map(word => (word, 1)).reduceByKey(_ + _ )

当然啦,博主从工作开始便有幸接触到推荐系统的开发设计工作,非常lucky。工程化推荐系统主要经历了三个发展阶段

第一阶段:
推荐系统主要包括算法与服务两部分,纯java编写,算法部分雏形期大概耗时30人天,演变过程经历了从单线程到多线程,从单机到master-slave

第二阶段
随着hadoop的发展,再加之mahout的出现,慢慢的将部分协同过滤算法(看了还看,买了还买)迁移到hadoop平台上,毕竟人家集群机器多呀,分布式不要你操心呀~~~好吧,我只想说两个字: 真棒!

第三阶段
hadoop毕竟是基于IO,离线计算过程中存在着大量的磁盘读写,怎么办呐?中间结果放内存呀~~这时候spark出生了,spark提供了scala,java,phyon API,非常友好,当然了最近也是相当火爆

前段时间有用spark java写过协同过滤算法增量计算的实现 借着学习scala的机会,重新实现了一遍

**初始数据**
user,itemcode,ref
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0
**scala代码**
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[1]").setAppName("itemBase");
    val sc = new SparkContext(conf);
    //数据清洗
    val dataClean = sc.textFile("D:/HDFS/Input/recommend.txt").map { line =>
      val tokens = line.split(",")
      (tokens(0).toLong, (tokens(1).toLong, if (tokens.length > 2) tokens(2).toFloat else 0f))
    }.aggregateByKey(Array[(Long, Float)]())(_ :+ _, _ ++ _).filter(_._2.size > 2).values.persist(StorageLevel.MEMORY_ONLY_SER)
    //全局计算模
    val norms = dataClean.flatMap(_.map(y => (y._1, y._2 * y._2))).reduceByKey(_ + _)
    //广播数据
    val normsMap = sc.broadcast(norms.collectAsMap())
    //共生矩阵
    val matrix = dataClean.map(list => list.sortWith(_._1 > _._1)).flatMap(occMatrix).reduceByKey(_ + _)
    //计算相似度
    val similarity = matrix.map(a => (a._1._1, (a._1._2, 1 / (1 + Math.sqrt(normsMap.value.get(a._1._1).get
      + normsMap.value.get(a._1._2).get - 2 * a._2)))))
    similarity.collect().foreach(println)
    sc.stop
  }

  def occMatrix(a: Array[(Long, Float)]): ArrayBuffer[((Long, Long), Float)] = {
    val array = ArrayBuffer[((Long, Long), Float)]()
    //笛卡尔共生
    for (i <- 0 to (a.size - 1); j <- (i + 1) to (a.size - 1)) {
      array += (((a(i)._1, a(j)._1), a(i)._2 * a(j)._2))
    }
    array
  }

occMatrix方法中array参数如果是list或者array会无法add,一直返回为空,此处纠结博主好久~~~后来查阅资料才发现跟java完全不是一个概念,scala真的好反人类。
StorageLevel.MEMORY_ONLY_SER 表示序列化只存内存,该枚举有多值,具体含义参考spark官方文档,建议大家使用带有后缀的SER。

**输出结果**
(103,(104,0.14037600977966974))
(102,(103,0.1975496259559987))
(104,(107,0.13472338607037426))
(101,(107,0.10275248635596666))
(101,(102,0.14201473202245876))
(104,(105,0.16736577623297264))
(102,(104,0.12789210656028413))
(105,(106,0.14201473202245876))
(101,(104,0.16015261286229274))
(103,(105,0.11208890297777215))
(101,(106,0.1424339656566283))
(101,(103,0.15548737703860027))
(102,(106,0.14972506706560876))
(104,(106,0.18181818181818182))
(102,(105,0.14328432723886902))
(101,(105,0.1158457425543559))
(103,(106,0.1424339656566283))
(105,(107,0.2204812092115424))

至此推荐算法itembase的scala实现就已经完成啦~~~纯java实现需要30人天,大约1.5W行代码,spark java实现需要三个人天,大约400行代码,spark scala 实现需要一个人天,大约20行代码。。。猿猿们,工业4.0时代我们快失业啦!!!