From 0121a26bd150e5f76d950e08cf4d536fad635a40 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 28 Sep 2012 16:14:05 -0700 Subject: Changed the way tasks' dependency files are sent to workers so that custom serializers or Kryo registrators can be loaded. --- .../scala/spark/bagel/examples/WikipediaPageRankStandalone.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'bagel/src') diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala index ed8ace3a57..8ced0f9c73 100644 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala @@ -142,7 +142,7 @@ class WPRSerializerInstance extends SerializerInstance { class WPRSerializationStream(os: OutputStream) extends SerializationStream { val dos = new DataOutputStream(os) - def writeObject[T](t: T): Unit = t match { + def writeObject[T](t: T): SerializationStream = t match { case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match { case links: Array[String] => { dos.writeInt(0) // links @@ -151,17 +151,20 @@ class WPRSerializationStream(os: OutputStream) extends SerializationStream { for (link <- links) { dos.writeUTF(link) } + this } case rank: Double => { dos.writeInt(1) // rank dos.writeUTF(id) dos.writeDouble(rank) + this } } case (id: String, rank: Double) => { dos.writeInt(2) // rank without wrapper dos.writeUTF(id) dos.writeDouble(rank) + this } } -- cgit v1.2.3