aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-07 00:25:47 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-07 12:45:38 -0700
commit63051dd2bcc4bf09d413ff7cf89a37967edc33ba (patch)
tree4389cb7212c6c643ed0565551b4ad654d2218767 /bagel
parent7e1c97fc4b5a225e496ebd95c0ef6095dc4aeae9 (diff)
downloadspark-63051dd2bcc4bf09d413ff7cf89a37967edc33ba.tar.gz
spark-63051dd2bcc4bf09d413ff7cf89a37967edc33ba.tar.bz2
spark-63051dd2bcc4bf09d413ff7cf89a37967edc33ba.zip
Merge in engine improvements from the Spark Streaming project, developed
jointly with Tathagata Das and Haoyuan Li. This commit imports the changes and ports them to Mesos 0.9, but does not yet pass unit tests due to various classes not supporting a graceful stop() yet.
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala11
1 files changed, 6 insertions, 5 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 7084ff97d9..4c18cb9134 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -11,6 +11,7 @@ import scala.xml.{XML,NodeSeq}
import scala.collection.mutable.ArrayBuffer
import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
+import java.nio.ByteBuffer
object WikipediaPageRankStandalone {
def main(args: Array[String]) {
@@ -118,23 +119,23 @@ class WPRSerializer extends spark.Serializer {
}
class WPRSerializerInstance extends SerializerInstance {
- def serialize[T](t: T): Array[Byte] = {
+ def serialize[T](t: T): ByteBuffer = {
throw new UnsupportedOperationException()
}
- def deserialize[T](bytes: Array[Byte]): T = {
+ def deserialize[T](bytes: ByteBuffer): T = {
throw new UnsupportedOperationException()
}
- def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+ def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
throw new UnsupportedOperationException()
}
- def outputStream(s: OutputStream): SerializationStream = {
+ def serializeStream(s: OutputStream): SerializationStream = {
new WPRSerializationStream(s)
}
- def inputStream(s: InputStream): DeserializationStream = {
+ def deserializeStream(s: InputStream): DeserializationStream = {
new WPRDeserializationStream(s)
}
}