From b4067cbad4a46cda0799a891ded152531ca83b62 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 12 Oct 2012 18:19:21 -0700 Subject: More doc updates, and moved Serializer to a subpackage. --- .../spark/bagel/examples/WikipediaPageRankStandalone.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'bagel/src/main') diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala index 8ced0f9c73..06cc8c748b 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala @@ -1,6 +1,7 @@ package spark.bagel.examples import spark._ +import serializer.{DeserializationStream, SerializationStream, SerializerInstance} import spark.SparkContext._ import spark.bagel._ @@ -33,10 +34,10 @@ object WikipediaPageRankStandalone { val partitioner = new HashPartitioner(sc.defaultParallelism) val links = if (usePartitioner) - input.map(parseArticle _).partitionBy(partitioner).cache + input.map(parseArticle _).partitionBy(partitioner).cache() else - input.map(parseArticle _).cache - val n = links.count + input.map(parseArticle _).cache() + val n = links.count() val defaultRank = 1.0 / n val a = 0.15 @@ -51,7 +52,7 @@ object WikipediaPageRankStandalone { (ranks .filter { case (id, rank) => rank >= threshold } .map { case (id, rank) => "%s\t%s\n".format(id, rank) } - .collect.mkString) + .collect().mkString) println(top) val time = (System.currentTimeMillis - startTime) / 1000.0 @@ -113,7 +114,7 @@ object WikipediaPageRankStandalone { } } -class WPRSerializer extends spark.Serializer { +class WPRSerializer extends spark.serializer.Serializer { def newInstance(): SerializerInstance = new WPRSerializerInstance() } -- cgit v1.2.3