aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala14
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala30
4 files changed, 26 insertions, 32 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 1d029bf009..5e9be18990 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -391,6 +391,6 @@ object GraphImpl {
// TODO: Consider doing map side distinct before shuffle.
new ShuffledRDD[VertexId, Int, (VertexId, Int)](
edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
- .setSerializer(classOf[VertexIdMsgSerializer].getName)
+ .setSerializer(new VertexIdMsgSerializer)
}
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index e9ee09c361..fe6fe76def 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -65,11 +65,11 @@ class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
- rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName)
+ rdd.setSerializer(new IntVertexBroadcastMsgSerializer)
} else if (classTag[T] == ClassTag.Long) {
- rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName)
+ rdd.setSerializer(new LongVertexBroadcastMsgSerializer)
} else if (classTag[T] == ClassTag.Double) {
- rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName)
+ rdd.setSerializer(new DoubleVertexBroadcastMsgSerializer)
}
rdd
}
@@ -104,11 +104,11 @@ object MsgRDDFunctions {
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
- rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
+ rdd.setSerializer(new IntAggMsgSerializer)
} else if (classTag[T] == ClassTag.Long) {
- rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
+ rdd.setSerializer(new LongAggMsgSerializer)
} else if (classTag[T] == ClassTag.Double) {
- rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
+ rdd.setSerializer(new DoubleAggMsgSerializer)
}
rdd
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index c74d487e20..34a145e018 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -25,7 +25,7 @@ import org.apache.spark.graphx._
import org.apache.spark.serializer._
private[graphx]
-class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
+class VertexIdMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -46,7 +46,7 @@ class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
private[graphx]
-class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+class IntVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -70,7 +70,7 @@ class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for VertexBroadcastMessage[Long]. */
private[graphx]
-class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+class LongVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -94,7 +94,7 @@ class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for VertexBroadcastMessage[Double]. */
private[graphx]
-class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -118,7 +118,7 @@ class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for AggregationMessage[Int]. */
private[graphx]
-class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
+class IntAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -142,7 +142,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for AggregationMessage[Long]. */
private[graphx]
-class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
+class LongAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -166,7 +166,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
/** A special shuffle serializer for AggregationMessage[Double]. */
private[graphx]
-class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
+class DoubleAggMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index e5a582b47b..73438d9535 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -32,15 +32,14 @@ import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -54,15 +53,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -76,15 +74,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -98,15 +95,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("IntAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Int) = inStrm.readObject()
val inMsg2: (VertexId, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -118,15 +114,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 1L << 32)
val bout = new ByteArrayOutputStream
- val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Long) = inStrm.readObject()
val inMsg2: (VertexId, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -138,15 +133,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Double) = inStrm.readObject()
val inMsg2: (VertexId, Double) = inStrm.readObject()
assert(outMsg === inMsg1)