aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-05-10 12:10:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 12:10:24 -0700
commit7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35 (patch)
tree32a35b0898c5710cdbd73b56ef9dcb9914e1cf02 /graphx
parent8e94d2721a9d3d36697e13f8cc6567ae8aeee78b (diff)
downloadspark-7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35.tar.gz
spark-7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35.tar.bz2
spark-7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35.zip
SPARK-1708. Add a ClassTag on Serializer and things that depend on it
This pull request contains a rebased patch from @heathermiller (https://github.com/heathermiller/spark/pull/1) to add ClassTags on Serializer and types that depend on it (Broadcast and AccumulableCollection). Putting these in the public API signatures now will allow us to use Scala Pickling for serialization down the line without breaking binary compatibility. One question remaining is whether we also want them on Accumulator -- Accumulator is passed as part of a bigger Task or TaskResult object via the closure serializer so it doesn't seem super useful to add the ClassTag there. Broadcast and AccumulableCollection in contrast were being serialized directly. CC @rxin, @pwendell, @heathermiller Author: Matei Zaharia <matei@databricks.com> Closes #700 from mateiz/spark-1708 and squashes the following commits: 1a3d8b0 [Matei Zaharia] Use fake ClassTag in Java 3b449ed [Matei Zaharia] test fix 2209a27 [Matei Zaharia] Code style fixes 9d48830 [Matei Zaharia] Add a ClassTag on Serializer and things that depend on it
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala45
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala5
2 files changed, 27 insertions, 23 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 2f0531ee5f..1de42eeca1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -17,20 +17,22 @@
package org.apache.spark.graphx.impl
+import scala.language.existentials
+
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.graphx._
import org.apache.spark.serializer._
-import scala.language.existentials
-
private[graphx]
class VertexIdMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, _)]
writeVarLong(msg._1, optimizePositive = false)
this
@@ -38,7 +40,7 @@ class VertexIdMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
(readVarLong(optimizePositive = false), null).asInstanceOf[T]
}
}
@@ -51,7 +53,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeVarLong(msg.vid, optimizePositive = false)
writeInt(msg.data)
@@ -60,7 +62,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readInt()
new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
@@ -75,7 +77,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeVarLong(msg.vid, optimizePositive = false)
writeLong(msg.data)
@@ -84,7 +86,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
@@ -99,7 +101,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
@@ -108,7 +110,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
@@ -123,7 +125,7 @@ class IntAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
@@ -132,7 +134,7 @@ class IntAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readUnsignedVarInt()
(a, b).asInstanceOf[T]
@@ -147,7 +149,7 @@ class LongAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
@@ -156,7 +158,7 @@ class LongAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readVarLong(optimizePositive = true)
(a, b).asInstanceOf[T]
@@ -171,7 +173,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
@@ -180,7 +182,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
(a, b).asInstanceOf[T]
@@ -196,7 +198,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
private[graphx]
abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
// The implementation should override this one.
- def writeObject[T](t: T): SerializationStream
+ def writeObject[T: ClassTag](t: T): SerializationStream
def writeInt(v: Int) {
s.write(v >> 24)
@@ -309,7 +311,7 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization
private[graphx]
abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
// The implementation should override this one.
- def readObject[T](): T
+ def readObject[T: ClassTag](): T
def readInt(): Int = {
val first = s.read()
@@ -398,11 +400,12 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
private[graphx] sealed trait ShuffleSerializerInstance extends SerializerInstance {
- override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException
+ override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException
- override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException
+ override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
+ throw new UnsupportedOperationException
- override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T =
+ override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T =
throw new UnsupportedOperationException
// The implementation should override the following two.
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 73438d9535..91caa6b605 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
import scala.util.Random
+import scala.reflect.ClassTag
import org.scalatest.FunSuite
@@ -164,7 +165,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
val bout = new ByteArrayOutputStream
val stream = new ShuffleSerializationStream(bout) {
- def writeObject[T](t: T): SerializationStream = {
+ def writeObject[T: ClassTag](t: T): SerializationStream = {
writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive)
this
}
@@ -173,7 +174,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
val bin = new ByteArrayInputStream(bout.toByteArray)
val dstream = new ShuffleDeserializationStream(bin) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
readVarLong(optimizePositive).asInstanceOf[T]
}
}