aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala45
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala5
2 files changed, 27 insertions, 23 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 2f0531ee5f..1de42eeca1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -17,20 +17,22 @@
package org.apache.spark.graphx.impl
+import scala.language.existentials
+
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
+import scala.reflect.ClassTag
+
import org.apache.spark.graphx._
import org.apache.spark.serializer._
-import scala.language.existentials
-
private[graphx]
class VertexIdMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, _)]
writeVarLong(msg._1, optimizePositive = false)
this
@@ -38,7 +40,7 @@ class VertexIdMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
(readVarLong(optimizePositive = false), null).asInstanceOf[T]
}
}
@@ -51,7 +53,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeVarLong(msg.vid, optimizePositive = false)
writeInt(msg.data)
@@ -60,7 +62,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readInt()
new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
@@ -75,7 +77,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeVarLong(msg.vid, optimizePositive = false)
writeLong(msg.data)
@@ -84,7 +86,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
@@ -99,7 +101,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
@@ -108,7 +110,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
@@ -123,7 +125,7 @@ class IntAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
@@ -132,7 +134,7 @@ class IntAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readUnsignedVarInt()
(a, b).asInstanceOf[T]
@@ -147,7 +149,7 @@ class LongAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
@@ -156,7 +158,7 @@ class LongAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- override def readObject[T](): T = {
+ override def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readVarLong(optimizePositive = true)
(a, b).asInstanceOf[T]
@@ -171,7 +173,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
- def writeObject[T](t: T) = {
+ def writeObject[T: ClassTag](t: T) = {
val msg = t.asInstanceOf[(VertexId, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
@@ -180,7 +182,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
(a, b).asInstanceOf[T]
@@ -196,7 +198,7 @@ class DoubleAggMsgSerializer extends Serializer with Serializable {
private[graphx]
abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
// The implementation should override this one.
- def writeObject[T](t: T): SerializationStream
+ def writeObject[T: ClassTag](t: T): SerializationStream
def writeInt(v: Int) {
s.write(v >> 24)
@@ -309,7 +311,7 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization
private[graphx]
abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
// The implementation should override this one.
- def readObject[T](): T
+ def readObject[T: ClassTag](): T
def readInt(): Int = {
val first = s.read()
@@ -398,11 +400,12 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
private[graphx] sealed trait ShuffleSerializerInstance extends SerializerInstance {
- override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException
+ override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException
- override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException
+ override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
+ throw new UnsupportedOperationException
- override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T =
+ override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T =
throw new UnsupportedOperationException
// The implementation should override the following two.
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 73438d9535..91caa6b605 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
import scala.util.Random
+import scala.reflect.ClassTag
import org.scalatest.FunSuite
@@ -164,7 +165,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
val bout = new ByteArrayOutputStream
val stream = new ShuffleSerializationStream(bout) {
- def writeObject[T](t: T): SerializationStream = {
+ def writeObject[T: ClassTag](t: T): SerializationStream = {
writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive)
this
}
@@ -173,7 +174,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
val bin = new ByteArrayInputStream(bout.toByteArray)
val dstream = new ShuffleDeserializationStream(bin) {
- def readObject[T](): T = {
+ def readObject[T: ClassTag](): T = {
readVarLong(optimizePositive).asInstanceOf[T]
}
}