aboutsummaryrefslogtreecommitdiff
path: root/bagel/src
diff options
context:
space:
mode:
Diffstat (limited to 'bagel/src')
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 8ced0f9c73..06cc8c748b 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -1,6 +1,7 @@
package spark.bagel.examples
import spark._
+import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import spark.SparkContext._
import spark.bagel._
@@ -33,10 +34,10 @@ object WikipediaPageRankStandalone {
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
if (usePartitioner)
- input.map(parseArticle _).partitionBy(partitioner).cache
+ input.map(parseArticle _).partitionBy(partitioner).cache()
else
- input.map(parseArticle _).cache
- val n = links.count
+ input.map(parseArticle _).cache()
+ val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
@@ -51,7 +52,7 @@ object WikipediaPageRankStandalone {
(ranks
.filter { case (id, rank) => rank >= threshold }
.map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect.mkString)
+ .collect().mkString)
println(top)
val time = (System.currentTimeMillis - startTime) / 1000.0
@@ -113,7 +114,7 @@ object WikipediaPageRankStandalone {
}
}
-class WPRSerializer extends spark.Serializer {
+class WPRSerializer extends spark.serializer.Serializer {
def newInstance(): SerializerInstance = new WPRSerializerInstance()
}