aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-10 18:00:54 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-10 18:00:54 -0800
commit41d6586e8e0df94fee66a386c967b56c535e3c28 (patch)
tree21cf315a3a3e64b6bd3586c3b9230a173686645f /graphx/src
parent85a6645d318e728454e81096ca8140b5f640e782 (diff)
downloadspark-41d6586e8e0df94fee66a386c967b56c535e3c28.tar.gz
spark-41d6586e8e0df94fee66a386c967b56c535e3c28.tar.bz2
spark-41d6586e8e0df94fee66a386c967b56c535e3c28.zip
Revert changes to Spark's (PrimitiveKey)OpenHashMap; copy PKOHM to graphx
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala3
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala3
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala5
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala153
6 files changed, 159 insertions, 9 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 4176563d22..a03e73ee79 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -3,7 +3,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import org.apache.spark.graphx._
-import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index ca64e9af66..fbc29409b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -4,7 +4,8 @@ import scala.reflect.ClassTag
import scala.util.Sorting
import org.apache.spark.graphx._
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.PrimitiveVector
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
var edges = new PrimitiveVector[Edge[ED]](size)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index c5258360da..bad840f1cd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -3,7 +3,7 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import org.apache.spark.graphx._
-import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
* The Iterator type returned when constructing edge triplets. This class technically could be
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 987a646c0c..c66b8c804f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -173,9 +173,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def mapTriplets[ED2: ClassTag](
f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
- // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
- // manifest from GraphImpl (which would require serializing GraphImpl).
- val vdTag = classTag[VD]
val newEdgePartitions =
edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
(ePartIter, vTableReplicatedIter) =>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 7c83497ca9..f97ff75fb2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -2,11 +2,10 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
-import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
-
import org.apache.spark.Logging
import org.apache.spark.graphx._
-
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.BitSet
private[graphx] object VertexPartition {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
new file mode 100644
index 0000000000..1088944cd3
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.util.collection
+
+import org.apache.spark.util.collection.OpenHashSet
+
+import scala.reflect._
+
+/**
+ * A fast hash map implementation for primitive, non-null keys. This hash map supports
+ * insertions and updates, but not deletions. This map is about an order of magnitude
+ * faster than java.util.HashMap, while using much less space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+private[spark]
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+ @specialized(Long, Int, Double) V: ClassTag](
+ val keySet: OpenHashSet[K], var _values: Array[V])
+ extends Iterable[(K, V)]
+ with Serializable {
+
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(initialCapacity: Int) =
+ this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
+
+ /**
+ * Allocate an OpenHashMap with a default initial capacity, providing a true
+ * no-argument constructor.
+ */
+ def this() = this(64)
+
+ /**
+ * Allocate an OpenHashMap with a fixed initial capacity
+ */
+ def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
+
+ require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
+
+ private var _oldValues: Array[V] = null
+
+ override def size = keySet.size
+
+ /** Get the value for a given key */
+ def apply(k: K): V = {
+ val pos = keySet.getPos(k)
+ _values(pos)
+ }
+
+ /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+ def getOrElse(k: K, elseValue: V): V = {
+ val pos = keySet.getPos(k)
+ if (pos >= 0) _values(pos) else elseValue
+ }
+
+ /** Set the value for a key */
+ def update(k: K, v: V) {
+ val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+ _values(pos) = v
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+
+ /** Set the value for a key */
+ def setMerge(k: K, v: V, mergeF: (V, V) => V) {
+ val pos = keySet.addWithoutResize(k)
+ val ind = pos & OpenHashSet.POSITION_MASK
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
+ _values(ind) = v
+ } else {
+ _values(ind) = mergeF(_values(ind), v)
+ }
+ keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+
+ /**
+ * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+ * set its value to mergeValue(oldValue).
+ *
+ * @return the newly updated value.
+ */
+ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+ val pos = keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+ val newValue = defaultValue
+ _values(pos & OpenHashSet.POSITION_MASK) = newValue
+ keySet.rehashIfNeeded(k, grow, move)
+ newValue
+ } else {
+ _values(pos) = mergeValue(_values(pos))
+ _values(pos)
+ }
+ }
+
+ override def iterator = new Iterator[(K, V)] {
+ var pos = 0
+ var nextPair: (K, V) = computeNextPair()
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def computeNextPair(): (K, V) = {
+ pos = keySet.nextPos(pos)
+ if (pos >= 0) {
+ val ret = (keySet.getValue(pos), _values(pos))
+ pos += 1
+ ret
+ } else {
+ null
+ }
+ }
+
+ def hasNext = nextPair != null
+
+ def next() = {
+ val pair = nextPair
+ nextPair = computeNextPair()
+ pair
+ }
+ }
+
+ // The following member variables are declared as protected instead of private for the
+ // specialization to work (specialized class extends the unspecialized one and needs access
+ // to the "private" variables).
+ // They also should have been val's. We use var's because there is a Scala compiler bug that
+ // would throw illegal access error at runtime if they are declared as val's.
+ protected var grow = (newCapacity: Int) => {
+ _oldValues = _values
+ _values = new Array[V](newCapacity)
+ }
+
+ protected var move = (oldPos: Int, newPos: Int) => {
+ _values(newPos) = _oldValues(oldPos)
+ }
+}