aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-13 22:58:38 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-13 22:58:38 -0800
commit4a805aff5e381752afb2bfd579af908d623743ed (patch)
tree78f81dfdf6bcaa47ff87f8c882f829eae59c2bdb /core
parent945fe7a37ea3189b5a9f8a74e5c2fa9c1088ebfc (diff)
parent80e73ed0004cceb47a450c79aa4faa598502fa45 (diff)
downloadspark-4a805aff5e381752afb2bfd579af908d623743ed.tar.gz
spark-4a805aff5e381752afb2bfd579af908d623743ed.tar.bz2
spark-4a805aff5e381752afb2bfd579af908d623743ed.zip
Merge pull request #367 from ankurdave/graphx
GraphX: Unifying Graphs and Tables GraphX extends Spark's distributed fault-tolerant collections API and interactive console with a new graph API which leverages recent advances in graph systems (e.g., [GraphLab](http://graphlab.org)) to enable users to easily and interactively build, transform, and reason about graph structured data at scale. See http://amplab.github.io/graphx/. Thanks to @jegonzal, @rxin, @ankurdave, @dcrankshaw, @jianpingjwang, @amatsukawa, @kellrott, and @adamnovak. Tasks left: - [x] Graph-level uncache - [x] Uncache previous iterations in Pregel - [x] ~~Uncache previous iterations in GraphLab~~ (postponed to post-release) - [x] - Describe GC issue with GraphLab - [ ] Write `docs/graphx-programming-guide.md` - [x] - Mention future Bagel support in docs - [ ] - Section on caching/uncaching in docs: As with Spark, cache something that is used more than once. In an iterative algorithm, try to cache and force (i.e., materialize) something every iteration, then uncache the cached things that depended on the newly materialized RDD but that won't be referenced again. - [x] Undo modifications to core collections and instead copy them to org.apache.spark.graphx - [x] Make Graph serializable to work around capture in Spark shell - [x] Rename graph -> graphx in package name and subproject - [x] Remove standalone PageRank - [x] ~~Fix amplab/graphx#52 by checking `iter.hasNext`~~
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala87
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala23
4 files changed, 111 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index dd25d0c6ed..4148581f52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -288,7 +288,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- new ShuffledRDD[K, V, (K, V)](self, partitioner)
+ if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index edd4f381db..cd90a1561a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -549,6 +549,11 @@ abstract class RDD[T: ClassTag](
* of elements in each partition.
*/
def zipPartitions[B: ClassTag, V: ClassTag]
+ (rdd2: RDD[B], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
+
+ def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index a1a452315d..856eb772a1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -22,10 +22,72 @@ package org.apache.spark.util.collection
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
-class BitSet(numBits: Int) {
+class BitSet(numBits: Int) extends Serializable {
- private[this] val words = new Array[Long](bit2words(numBits))
- private[this] val numWords = words.length
+ private val words = new Array[Long](bit2words(numBits))
+ private val numWords = words.length
+
+ /**
+ * Compute the capacity (number of bits) that can be represented
+ * by this bitset.
+ */
+ def capacity: Int = numWords * 64
+
+ /**
+ * Set all the bits up to a given index
+ */
+ def setUntil(bitIndex: Int) {
+ val wordIndex = bitIndex >> 6 // divide by 64
+ var i = 0
+ while(i < wordIndex) { words(i) = -1; i += 1 }
+ if(wordIndex < words.size) {
+ // Set the remaining bits (note that the mask could still be zero)
+ val mask = ~(-1L << (bitIndex & 0x3f))
+ words(wordIndex) |= mask
+ }
+ }
+
+ /**
+ * Compute the bit-wise AND of the two sets returning the
+ * result.
+ */
+ def &(other: BitSet): BitSet = {
+ val newBS = new BitSet(math.max(capacity, other.capacity))
+ val smaller = math.min(numWords, other.numWords)
+ assert(newBS.numWords >= numWords)
+ assert(newBS.numWords >= other.numWords)
+ var ind = 0
+ while( ind < smaller ) {
+ newBS.words(ind) = words(ind) & other.words(ind)
+ ind += 1
+ }
+ newBS
+ }
+
+ /**
+ * Compute the bit-wise OR of the two sets returning the
+ * result.
+ */
+ def |(other: BitSet): BitSet = {
+ val newBS = new BitSet(math.max(capacity, other.capacity))
+ assert(newBS.numWords >= numWords)
+ assert(newBS.numWords >= other.numWords)
+ val smaller = math.min(numWords, other.numWords)
+ var ind = 0
+ while( ind < smaller ) {
+ newBS.words(ind) = words(ind) | other.words(ind)
+ ind += 1
+ }
+ while( ind < numWords ) {
+ newBS.words(ind) = words(ind)
+ ind += 1
+ }
+ while( ind < other.numWords ) {
+ newBS.words(ind) = other.words(ind)
+ ind += 1
+ }
+ newBS
+ }
/**
* Sets the bit at the specified index to true.
@@ -36,6 +98,11 @@ class BitSet(numBits: Int) {
words(index >> 6) |= bitmask // div by 64 and mask
}
+ def unset(index: Int) {
+ val bitmask = 1L << (index & 0x3f) // mod 64 and shift
+ words(index >> 6) &= ~bitmask // div by 64 and mask
+ }
+
/**
* Return the value of the bit with the specified index. The value is true if the bit with
* the index is currently set in this BitSet; otherwise, the result is false.
@@ -48,6 +115,20 @@ class BitSet(numBits: Int) {
(words(index >> 6) & bitmask) != 0 // div by 64 and mask
}
+ /**
+ * Get an iterator over the set bits.
+ */
+ def iterator = new Iterator[Int] {
+ var ind = nextSetBit(0)
+ override def hasNext: Boolean = ind >= 0
+ override def next() = {
+ val tmp = ind
+ ind = nextSetBit(ind+1)
+ tmp
+ }
+ }
+
+
/** Return the number of bits set to true in this BitSet. */
def cardinality(): Int = {
var sum = 0
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 87e009a4de..5ded5d0b6d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -84,6 +84,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
protected var _bitset = new BitSet(_capacity)
+ def getBitSet = _bitset
+
// Init of the array in constructor (instead of in declaration) to work around a Scala compiler
// specialization bug that would generate two arrays (one for Object and one for specialized T).
protected var _data: Array[T] = _
@@ -161,7 +163,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var i = 1
- while (true) {
+ val maxProbe = _data.size
+ while (i < maxProbe) {
if (!_bitset.get(pos)) {
return INVALID_POS
} else if (k == _data(pos)) {
@@ -179,6 +182,22 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
/** Return the value at the specified position. */
def getValue(pos: Int): T = _data(pos)
+ def iterator = new Iterator[T] {
+ var pos = nextPos(0)
+ override def hasNext: Boolean = pos != INVALID_POS
+ override def next(): T = {
+ val tmp = getValue(pos)
+ pos = nextPos(pos+1)
+ tmp
+ }
+ }
+
+ /** Return the value at the specified position. */
+ def getValueSafe(pos: Int): T = {
+ assert(_bitset.get(pos))
+ _data(pos)
+ }
+
/**
* Return the next position with an element stored, starting from the given position inclusively.
*/
@@ -259,7 +278,7 @@ object OpenHashSet {
* A set of specialized hash function implementation to avoid boxing hash code computation
* in the specialized implementation of OpenHashSet.
*/
- sealed class Hasher[@specialized(Long, Int) T] {
+ sealed class Hasher[@specialized(Long, Int) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}