aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-05-12 10:49:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-12 10:49:03 -0700
commitaf15c82bfe2c3f73142b8f310784a0e85841539d (patch)
tree58cd123d34694923220ec0a67c0576496bd1b097 /graphx
parenta6b02fb7486356493474c7f42bb714c9cce215ca (diff)
downloadspark-af15c82bfe2c3f73142b8f310784a0e85841539d.tar.gz
spark-af15c82bfe2c3f73142b8f310784a0e85841539d.tar.bz2
spark-af15c82bfe2c3f73142b8f310784a0e85841539d.zip
Revert "SPARK-1786: Edge Partition Serialization"
This reverts commit a6b02fb7486356493474c7f42bb714c9cce215ca.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala (renamed from graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala)2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala18
11 files changed, 23 insertions, 44 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index f97f329c0e..d295d0127a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -24,9 +24,6 @@ import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx.impl._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.OpenHashSet
-
/**
* Registers GraphX classes with Kryo for improved performance.
@@ -46,8 +43,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])
- kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
- kryo.register(classOf[OpenHashSet[Int]])
- kryo.register(classOf[OpenHashSet[Long]])
+
+ // This avoids a large number of hash table lookups.
+ kryo.setReferences(false)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index a5c9cd1f8b..871e81f8d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
@@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
- val srcIds: Array[VertexId] = null,
- val dstIds: Array[VertexId] = null,
- val data: Array[ED] = null,
- val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
- val vertices: VertexPartition[VD] = null,
- val activeSet: Option[VertexSet] = None
+ @transient val srcIds: Array[VertexId],
+ @transient val dstIds: Array[VertexId],
+ @transient val data: Array[ED],
+ @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+ @transient val vertices: VertexPartition[VD],
+ @transient val activeSet: Option[VertexSet] = None
) extends Serializable {
/** Return a new `EdgePartition` with the specified edge data. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 4520beb991..ecb49bef42 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -23,7 +23,7 @@ import scala.util.Sorting
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
@@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val srcIds = new Array[VertexId](edgeArray.size)
val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 56f79a7097..ebb0b9418d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index d02e9238ad..927e32ad0f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
@@ -69,7 +69,7 @@ object RoutingTablePartition {
: Iterator[RoutingTableMessage] = {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.srcIds.iterator.foreach { srcId =>
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index dca54b8a7d..f4e221d4e0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/** Stores vertex attributes to ship to an edge partition. */
private[graphx]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 55c7a19d1b..f1d174720a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx] object VertexPartition {
/** Construct a `VertexPartition` from the given vertices. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
index 34939b2444..8d9e0204d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
private[graphx] object VertexPartitionBase {
/**
@@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
: (VertexIdToIndexMap, Array[VD], BitSet) = {
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map(pair._1) = pair._2
}
@@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
*/
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
- val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index a4f769b294..21ff615fec 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
import org.apache.spark.util.collection.BitSet
import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* An class containing additional operations for subclasses of VertexPartitionBase that provide
@@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): Self[VD] = {
- val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
index 57b01b6f2e..7b02e2ed1a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -29,7 +29,7 @@ import scala.reflect._
* Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
-class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
val keySet: OpenHashSet[K], var _values: Array[V])
extends Iterable[(K, V)]
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 28fd112f2b..d2e0c01bc3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -22,9 +22,6 @@ import scala.util.Random
import org.scalatest.FunSuite
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.KryoSerializer
-
import org.apache.spark.graphx._
class EdgePartitionSuite extends FunSuite {
@@ -123,19 +120,4 @@ class EdgePartitionSuite extends FunSuite {
assert(!ep.isActive(-1))
assert(ep.numActives == Some(2))
}
-
- test("Kryo serialization") {
- val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
- val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
- val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
- val s = new KryoSerializer(conf).newInstance()
- val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
- }
}