aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md4
-rwxr-xr-xbin/compute-classpath.sh2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala650
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala283
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala461
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala461
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala6
-rw-r--r--graph/pom.xml106
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala659
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Edge.scala34
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala32
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala56
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala313
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala21
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala135
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala54
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala166
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala36
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala65
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala31
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala671
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala49
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala25
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala76
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala75
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala114
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala252
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala21
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala104
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala44
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala93
-rw-r--r--pom.xml1
-rw-r--r--project/SparkBuild.scala14
40 files changed, 5248 insertions, 54 deletions
diff --git a/README.md b/README.md
index 28ad1e4604..139bdc070c 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,7 @@
+# GraphX Branch of Spark
+
+This is experimental code for the Apache spark project.
+
# Apache Spark
Lightning-Fast Cluster Computing - <http://spark.incubator.apache.org/>
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index c7819d4932..6ad1ca6ef8 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -30,6 +30,7 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
if [ -f "$FWDIR/RELEASE" ]; then
@@ -45,6 +46,7 @@ if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/graph/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
fi
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0aafc0a2fc..b3a2cb39fc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -959,7 +959,7 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
- new PairRDDFunctions(rdd)
+ rdd.pairRDDFunctions
implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 29968c273c..9b8384bcbb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,6 +156,7 @@ object SparkEnv extends Logging {
val serializer = serializerManager.setDefault(
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ logInfo("spark.serializer is " + System.getProperty("spark.serializer"))
val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a6518abf45..2f94ae5fa8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -264,8 +264,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
- def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
+ def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, partitioner))
+ }
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -275,6 +278,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Optional[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@@ -287,6 +292,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@@ -325,16 +332,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] =
+ def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other))
+ }
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
+ def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, numPartitions))
+ }
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -343,6 +356,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@@ -354,6 +369,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
@@ -365,6 +382,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@@ -376,6 +395,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
@@ -412,55 +433,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (JList[V], JList[W])] =
+ : JavaPairRDD[K, (JList[V], JList[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
+ }
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
+ implicit val w1m: ClassManifest[W1] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
+ implicit val w2m: ClassManifest[W2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
+ }
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
+ }
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
+ implicit val w1m: ClassManifest[W1] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
+ implicit val w2m: ClassManifest[W2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
+ }
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
- = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
-
+ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
+ }
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
+ implicit val w1m: ClassManifest[W1] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
+ implicit val w2m: ClassManifest[W2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
+ }
/** Alias for cogroup. */
- def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
+ implicit val wm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
+ }
/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
+ implicit val w1m: ClassManifest[W1] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
+ implicit val w2m: ClassManifest[W2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
+ }
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
new file mode 100644
index 0000000000..5f95559f15
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.nio.ByteBuffer
+
+
+import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.BitSet
+
+
+import org.apache.spark._
+import org.apache.spark.rdd._
+import org.apache.spark.SparkContext._
+import org.apache.spark.Partitioner._
+
+import org.apache.spark.storage.StorageLevel
+
+
+
+
+
+
+/**
+ * The BlockIndex is the internal map structure used inside the index
+ * of the IndexedRDD.
+ */
+class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int]
+
+
+/**
+ * The RDDIndex is an opaque type used to represent the organization
+ * of values in an RDD
+ */
+class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) {
+ def persist(newLevel: StorageLevel): RDDIndex[K] = {
+ rdd.persist(newLevel)
+ return this
+ }
+
+ def partitioner: Partitioner = rdd.partitioner.get
+}
+
+
+
+/**
+ * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and
+ * organizing the values to enable faster join operations.
+ *
+ * In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD
+ * exposes an index member which can be used to "key" other IndexedRDDs
+ *
+ */
+class IndexedRDD[K: ClassManifest, V: ClassManifest](
+ @transient val index: RDDIndex[K],
+ @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
+ extends RDD[(K, V)](index.rdd.context,
+ List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
+
+
+ /**
+ * An internal representation which joins the block indices with the values
+ */
+ protected[spark] val tuples =
+ new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
+
+
+ /**
+ * The partitioner is defined by the index
+ */
+ override val partitioner = index.rdd.partitioner
+
+
+ /**
+ * The actual partitions are defined by the tuples.
+ */
+ override def getPartitions: Array[Partition] = tuples.getPartitions
+
+
+ /**
+ * The preferred locations are computed based on the preferred locations of the tuples.
+ */
+ override def getPreferredLocations(s: Partition): Seq[String] =
+ tuples.getPreferredLocations(s)
+
+
+ /**
+ * Caching an IndexedRDD causes the index and values to be cached separately.
+ */
+ override def persist(newLevel: StorageLevel): RDD[(K,V)] = {
+ index.persist(newLevel)
+ valuesRDD.persist(newLevel)
+ return this
+ }
+
+
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
+ val cleanF = index.rdd.context.clean(f)
+ val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
+ valuesRDD.mapPartitions(iter => iter.map{
+ case (values, bs) =>
+ val newValues = new Array[U](values.size)
+ for ( ind <- bs ) {
+ newValues(ind) = f(values(ind))
+ }
+ (newValues.toIndexedSeq, bs)
+ }, preservesPartitioning = true)
+ new IndexedRDD[K,U](index, newValuesRDD)
+ }
+
+
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = {
+ val cleanF = index.rdd.context.clean(f)
+ val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
+ index.rdd.zipPartitions(valuesRDD){
+ (keysIter, valuesIter) =>
+ val index = keysIter.next()
+ assert(keysIter.hasNext() == false)
+ val (oldValues, bs) = valuesIter.next()
+ assert(valuesIter.hasNext() == false)
+ // Allocate the array to store the results into
+ val newValues: Array[U] = new Array[U](oldValues.size)
+ // Populate the new Values
+ for( (k,i) <- index ) {
+ if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
+ }
+ Array((newValues.toIndexedSeq, bs)).iterator
+ }
+ new IndexedRDD[K,U](index, newValues)
+ }
+
+
+ def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = {
+ if(index != other.index) {
+ throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
+ }
+ val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
+ (thisIter, otherIter) =>
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val (otherValues, otherBS) = otherIter.next()
+ assert(!otherIter.hasNext)
+ val newBS = thisBS & otherBS
+ val newValues = thisValues.view.zip(otherValues)
+ Iterator((newValues.toIndexedSeq, newBS))
+ }
+ new IndexedRDD(index, newValuesRDD)
+ }
+
+
+ def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = {
+ if(index != other.index) {
+ throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
+ }
+ val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
+ (thisIter, otherIter) =>
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val (otherValues, otherBS) = otherIter.next()
+ assert(!otherIter.hasNext)
+ val otherOption = otherValues.view.zipWithIndex
+ .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
+ val newValues = thisValues.view.zip(otherOption)
+ Iterator((newValues.toIndexedSeq, thisBS))
+ }
+ new IndexedRDD(index, newValuesRDD)
+ }
+
+
+
+ def leftJoin[W: ClassManifest](
+ other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a):
+ IndexedRDD[K, (V, Option[W]) ] = {
+ val cleanMerge = index.rdd.context.clean(merge)
+
+ other match {
+ case other: IndexedRDD[_, _] if index == other.index => {
+ leftZipJoin(other)
+ }
+ case _ => {
+ // Get the partitioner from the index
+ val partitioner = index.rdd.partitioner match {
+ case Some(p) => p
+ case None => throw new SparkException("An index must have a partitioner.")
+ }
+ // Shuffle the other RDD using the partitioner for this index
+ val otherShuffled =
+ if (other.partitioner == Some(partitioner)) other
+ else other.partitionBy(partitioner)
+ val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
+ index.rdd.zipPartitions(valuesRDD, other) {
+ (thisIndexIter, thisIter, tuplesIter) =>
+ val index = thisIndexIter.next()
+ assert(!thisIndexIter.hasNext)
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val newW = new Array[W](thisValues.size)
+ // track which values are matched with values in other
+ val wBS = new BitSet(thisValues.size)
+ for( (k, w) <- tuplesIter if index.contains(k) ) {
+ val ind = index.get(k)
+ if(thisBS(ind)) {
+ if(wBS(ind)) {
+ newW(ind) = cleanMerge(newW(ind), w)
+ } else {
+ newW(ind) = w
+ wBS(ind) = true
+ }
+ }
+ }
+
+ val otherOption = newW.view.zipWithIndex
+ .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
+ val newValues = thisValues.view.zip(otherOption)
+
+ Iterator((newValues.toIndexedSeq, thisBS))
+ } // end of newValues
+ new IndexedRDD(index, newValues)
+ }
+ }
+ }
+
+
+
+ //
+ // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = {
+ // if(index != other.index) {
+ // throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!")
+ // }
+ // index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
+ // (thisIndexIter, thisIter, otherIter) =>
+ // val index = thisIndexIter.next()
+ // assert(!thisIndexIter.hasNext)
+ // val (thisValues, thisBS) = thisIter.next()
+ // assert(!thisIter.hasNext)
+ // val (otherValues, otherBS) = otherIter.next()
+ // assert(!otherIter.hasNext)
+ // val newBS = thisBS & otherBS
+ // index.iterator.filter{ case (k,i) => newBS(i) }.map{
+ // case (k,i) => (k, (thisValues(i), otherValues(i)))
+ // }
+ // }
+ // }
+
+
+/* This is probably useful but we are not using it
+ def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest](
+ other: RDD[(K,W)])(
+ f: (K, V, W) => Z,
+ merge: (Z,Z) => Z = (a:Z, b:Z) => a):
+ IndexedRDD[K,Z] = {
+ val cleanF = index.rdd.context.clean(f)
+ val cleanMerge = index.rdd.context.clean(merge)
+ other match {
+ case other: IndexedRDD[_, _] if index == other.index => {
+ val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
+ (thisIndexIter, thisIter, otherIter) =>
+ val index = thisIndexIter.next()
+ assert(!thisIndexIter.hasNext)
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val (otherValues, otherBS) = otherIter.next()
+ assert(!otherIter.hasNext)
+ val newValues = new Array[Z](thisValues.size)
+ val newBS = thisBS & otherBS
+ for( (k,i) <- index ) {
+ if (newBS(i)) {
+ newValues(i) = cleanF(k, thisValues(i), otherValues(i))
+ }
+ }
+ List((newValues, newBS)).iterator
+ }
+ new IndexedRDD(index, newValues)
+ }
+
+ case _ => {
+ // Get the partitioner from the index
+ val partitioner = index.rdd.partitioner match {
+ case Some(p) => p
+ case None => throw new SparkException("An index must have a partitioner.")
+ }
+ // Shuffle the other RDD using the partitioner for this index
+ val otherShuffled =
+ if (other.partitioner == Some(partitioner)) other
+ else other.partitionBy(partitioner)
+
+ val newValues = index.rdd.zipPartitions(valuesRDD, other) {
+ (thisIndexIter, thisIter, tuplesIter) =>
+ val index = thisIndexIter.next()
+ assert(!thisIndexIter.hasNext)
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+
+ val newValues = new Array[Z](thisValues.size)
+ // track which values are matched with values in other
+ val tempBS = new BitSet(thisValues.size)
+
+ for( (k, w) <- tuplesIter if index.contains(k) ) {
+ val ind = index.get(k)
+ if(thisBS(ind)) {
+ val result = cleanF(k, thisValues(ind), w)
+ if(tempBS(ind)) {
+ newValues(ind) = cleanMerge(newValues(ind), result)
+ } else {
+ newValues(ind) = result
+ tempBS(ind) = true
+ }
+ }
+ }
+ List((newValues, tempBS)).iterator
+ } // end of newValues
+ new IndexedRDD(index, newValues)
+ }
+ }
+ }
+*/
+
+/*
+ def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest](
+ other: RDD[(K,W)])(
+ f: (K, V, Option[W]) => Z,
+ merge: (Z,Z) => Z = (a:Z, b:Z) => a):
+ IndexedRDD[K,Z] = {
+ val cleanF = index.rdd.context.clean(f)
+ val cleanMerge = index.rdd.context.clean(merge)
+ other match {
+ case other: IndexedRDD[_, _] if index == other.index => {
+ val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
+ (thisIndexIter, thisIter, otherIter) =>
+ val index = thisIndexIter.next()
+ assert(!thisIndexIter.hasNext)
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val (otherValues, otherBS) = otherIter.next()
+ assert(!otherIter.hasNext)
+ val newValues = new Array[Z](thisValues.size)
+ for( (k,i) <- index ) {
+ if (thisBS(i)) {
+ val otherVal = if(otherBS(i)) Some(otherValues(i)) else None
+ newValues(i) = cleanF(k, thisValues(i), otherVal)
+ }
+ }
+ List((newValues, thisBS)).iterator
+ }
+ new IndexedRDD(index, newValues)
+ }
+
+ case _ => {
+ // Get the partitioner from the index
+ val partitioner = index.rdd.partitioner match {
+ case Some(p) => p
+ case None => throw new SparkException("An index must have a partitioner.")
+ }
+ // Shuffle the other RDD using the partitioner for this index
+ val otherShuffled =
+ if (other.partitioner == Some(partitioner)) other
+ else other.partitionBy(partitioner)
+ val newValues = index.rdd.zipPartitions(valuesRDD, other) {
+ (thisIndexIter, thisIter, tuplesIter) =>
+ val index = thisIndexIter.next()
+ assert(!thisIndexIter.hasNext)
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+
+ val newValues = new Array[Z](thisValues.size)
+ // track which values are matched with values in other
+ val tempBS = new BitSet(thisValues.size)
+
+ for( (k, w) <- tuplesIter if index.contains(k) ) {
+ val ind = index.get(k)
+ if(thisBS(ind)) {
+ val result = cleanF(k, thisValues(ind), Option(w))
+ if(tempBS(ind)) {
+ newValues(ind) = cleanMerge(newValues(ind), result)
+ } else {
+ newValues(ind) = result
+ tempBS(ind) = true
+ }
+ }
+ }
+
+ // Process remaining keys in lef join
+ for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) {
+ newValues(ind) = cleanF(k, thisValues(ind), None)
+ }
+ List((newValues, thisBS)).iterator
+ } // end of newValues
+ new IndexedRDD(index, newValues)
+ }
+ }
+ }
+
+*/
+
+
+ /**
+ * The IndexedRDD has its own optimized version of the pairRDDFunctions.
+ */
+ override def pairRDDFunctions[K1, V1](
+ implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]):
+ PairRDDFunctions[K1, V1] = {
+ new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]])
+ }
+
+
+ override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = {
+ val cleanF = index.rdd.context.clean(f)
+ val newValues = index.rdd.zipPartitions(valuesRDD){
+ (keysIter, valuesIter) =>
+ val index = keysIter.next()
+ assert(keysIter.hasNext() == false)
+ val (oldValues, bs) = valuesIter.next()
+ assert(valuesIter.hasNext() == false)
+ // Allocate the array to store the results into
+ val newBS = new BitSet(oldValues.size)
+ // Populate the new Values
+ for( (k,i) <- index ) {
+ newBS(i) = bs(i) && cleanF( (k, oldValues(i)) )
+ }
+ Array((oldValues, newBS)).iterator
+ }
+ new IndexedRDD[K,V](index, newValues)
+ }
+
+
+ /**
+ * Provide the RDD[(K,V)] equivalent output.
+ */
+ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
+ // Walk the index to construct the key, value pairs
+ indexMap.iterator
+ // Extract rows with key value pairs and indicators
+ .map{ case (k, ind) => (bs(ind), k, ind) }
+ // Remove tuples that aren't actually present in the array
+ .filter( _._1 )
+ // Extract the pair (removing the indicator from the tuple)
+ .map( x => (x._2, values(x._3) ) )
+ }
+ }
+
+} // End of IndexedRDD
+
+
+
+
+object IndexedRDD {
+
+
+ def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] =
+ apply(rdd, (a:V, b:V) => a )
+
+ def apply[K: ClassManifest, V: ClassManifest](
+ rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = {
+ // Preaggregate and shuffle if necessary
+ // Preaggregation.
+ val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
+ val partitioner = new HashPartitioner(rdd.partitions.size)
+ val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
+
+ val groups = preAgg.mapPartitions( iter => {
+ val indexMap = new BlockIndex[K]()
+ val values = new ArrayBuffer[V]
+ val bs = new BitSet
+ for ((k,v) <- iter) {
+ if(!indexMap.contains(k)) {
+ val ind = indexMap.size
+ indexMap.put(k, ind)
+ values.append(v)
+ bs(ind) = true
+ } else {
+ val ind = indexMap.get(k)
+ values(ind) = reduceFunc(values(ind), v)
+ }
+ }
+ Iterator( (indexMap, (values.toIndexedSeq, bs)) )
+ }, true).cache
+ // extract the index and the values
+ val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
+ val values: RDD[(IndexedSeq[V], BitSet)] =
+ groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
+ new IndexedRDD[K,V](new RDDIndex(index), values)
+ }
+
+
+
+ def apply[K: ClassManifest, V: ClassManifest](
+ rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] =
+ apply(rdd, index, (a:V,b:V) => a)
+
+
+ def apply[K: ClassManifest, V: ClassManifest](
+ rdd: RDD[(K,V)], index: RDDIndex[K],
+ reduceFunc: (V, V) => V): IndexedRDD[K,V] =
+ apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
+ // {
+ // // Get the index Partitioner
+ // val partitioner = index.rdd.partitioner match {
+ // case Some(p) => p
+ // case None => throw new SparkException("An index must have a partitioner.")
+ // }
+ // // Preaggregate and shuffle if necessary
+ // val partitioned =
+ // if (rdd.partitioner != Some(partitioner)) {
+ // // Preaggregation.
+ // val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
+ // rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
+ // } else {
+ // rdd
+ // }
+
+ // // Use the index to build the new values table
+ // val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+ // // There is only one map
+ // val index = indexIter.next()
+ // assert(!indexIter.hasNext())
+ // val values = new Array[V](index.size)
+ // val bs = new BitSet(index.size)
+ // for ((k,v) <- tblIter) {
+ // if (!index.contains(k)) {
+ // throw new SparkException("Error: Trying to bind an external index " +
+ // "to an RDD which contains keys that are not in the index.")
+ // }
+ // val ind = index(k)
+ // if (bs(ind)) {
+ // values(ind) = reduceFunc(values(ind), v)
+ // } else {
+ // values(ind) = v
+ // bs(ind) = true
+ // }
+ // }
+ // List((values, bs)).iterator
+ // })
+ // new IndexedRDD[K,V](index, values)
+ // } // end of apply
+
+
+ def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ rdd: RDD[(K,V)],
+ index: RDDIndex[K],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C): IndexedRDD[K,C] = {
+ // Get the index Partitioner
+ val partitioner = index.rdd.partitioner match {
+ case Some(p) => p
+ case None => throw new SparkException("An index must have a partitioner.")
+ }
+ // Preaggregate and shuffle if necessary
+ val partitioned =
+ if (rdd.partitioner != Some(partitioner)) {
+ // Preaggregation.
+ val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue,
+ mergeCombiners)
+ rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
+ } else {
+ rdd.mapValues(x => createCombiner(x))
+ }
+
+ // Use the index to build the new values table
+ val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+ // There is only one map
+ val index = indexIter.next()
+ assert(!indexIter.hasNext())
+ val values = new Array[C](index.size)
+ val bs = new BitSet(index.size)
+ for ((k,c) <- tblIter) {
+ if (!index.contains(k)) {
+ throw new SparkException("Error: Trying to bind an external index " +
+ "to an RDD which contains keys that are not in the index.")
+ }
+ val ind = index(k)
+ if (bs(ind)) {
+ values(ind) = mergeCombiners(values(ind), c)
+ } else {
+ values(ind) = c
+ bs(ind) = true
+ }
+ }
+ Iterator((values, bs))
+ })
+ new IndexedRDD(index, values)
+ } // end of apply
+
+
+ /**
+ * Construct and index of the unique values in a given RDD.
+ */
+ def makeIndex[K: ClassManifest](keys: RDD[K],
+ partitioner: Option[Partitioner] = None): RDDIndex[K] = {
+ // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD
+ // Ugly hack :-(. In order to partition the keys they must have values.
+ val tbl = keys.mapPartitions(_.map(k => (k, false)), true)
+ // Shuffle the table (if necessary)
+ val shuffledTbl = partitioner match {
+ case None => {
+ if (tbl.partitioner.isEmpty) {
+ // @todo: I don't need the boolean its only there to be the second type of the shuffle.
+ new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
+ } else { tbl }
+ }
+ case Some(partitioner) =>
+ tbl.partitionBy(partitioner)
+ }
+
+ val index = shuffledTbl.mapPartitions( iter => {
+ val indexMap = new BlockIndex[K]()
+ for ( (k,_) <- iter ){
+ if(!indexMap.contains(k)){
+ val ind = indexMap.size
+ indexMap.put(k, ind)
+ }
+ }
+ Iterator(indexMap)
+ }, true).cache
+ new RDDIndex(index)
+ }
+
+} // end of object IndexedRDD
+
+
+
+
+
diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala
new file mode 100644
index 0000000000..fd7c16089d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import scala.collection.mutable.BitSet
+
+import org.apache.spark._
+
+
+
+class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V])
+ extends PairRDDFunctions[K,V](self) {
+
+ /**
+ * Construct a new IndexedRDD that is indexed by only the keys in the RDD
+ */
+ def reindex(): IndexedRDD[K,V] = IndexedRDD(self)
+
+
+ // /**
+ // * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ // * this also retains the original RDD's partitioning.
+ // */
+ // override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
+ // val cleanF = self.index.rdd.context.clean(f)
+ // val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{
+ // case (values, bs) =>
+ // val newValues = new Array[U](values.size)
+ // for ( ind <- bs ) {
+ // newValues(ind) = f(values(ind))
+ // }
+ // (newValues.toSeq, bs)
+ // }, preservesPartitioning = true)
+ // new IndexedRDD[K,U](self.index, newValuesRDD)
+ // }
+
+ /**
+ * Pass each value in the key-value pair RDD through a flatMap function without changing the
+ * keys; this also retains the original RDD's partitioning.
+ */
+ override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
+ val cleanF = self.index.rdd.context.clean(f)
+ val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{
+ case (values, bs) =>
+ val newValues = new Array[U](values.size)
+ val newBS = new BitSet(values.size)
+ for ( ind <- bs ) {
+ val res = f(values(ind))
+ if(!res.isEmpty) {
+ newValues(ind) = res.toIterator.next()
+ newBS(ind) = true
+ }
+ }
+ (newValues.toIndexedSeq, newBS)
+ }, preservesPartitioning = true)
+ new IndexedRDD[K,U](self.index, newValuesRDD)
+ }
+
+
+ /**
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
+ * Note that V and C can be different -- for example, one might group an RDD of type
+ * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
+ *
+ * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+ * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+ * - `mergeCombiners`, to combine two C's into a single one.
+ */
+ override def combineByKey[C: ClassManifest](createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true,
+ serializerClass: String = null): RDD[(K, C)] = {
+ mapValues(createCombiner)
+ }
+
+
+ // /**
+ // * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ // * resulting RDD with the existing partitioner/parallelism level.
+ // */
+ // override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ // val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
+ // new IndexedRDD[K, Seq[V]](self.index, newValues)
+ // }
+
+
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
+ override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
+ IndexedRDD[K, (Seq[V], Seq[W])] = {
+ //RDD[(K, (Seq[V], Seq[W]))] = {
+ other match {
+ case other: IndexedRDD[_, _] if self.index == other.index => {
+ // if both RDDs share exactly the same index and therefore the same super set of keys
+ // then we simply merge the value RDDs.
+ // However it is possible that both RDDs are missing a value for a given key in
+ // which case the returned RDD should have a null value
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
+ self.valuesRDD.zipPartitions(other.valuesRDD){
+ (thisIter, otherIter) =>
+ val (thisValues, thisBS) = thisIter.next()
+ assert(!thisIter.hasNext)
+ val (otherValues, otherBS) = otherIter.next()
+ assert(!otherIter.hasNext)
+
+ val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
+ val newBS = thisBS | otherBS
+
+ for( ind <- newBS ) {
+ val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
+ val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
+ newValues(ind) = (a, b)
+ }
+ Iterator((newValues.toIndexedSeq, newBS))
+ }
+ new IndexedRDD(self.index, newValues)
+ }
+ case other: IndexedRDD[_, _]
+ if self.index.rdd.partitioner == other.index.rdd.partitioner => {
+ // If both RDDs are indexed using different indices but with the same partitioners
+ // then we we need to first merge the indicies and then use the merged index to
+ // merge the values.
+ val newIndex =
+ self.index.rdd.zipPartitions(other.index.rdd)(
+ (thisIter, otherIter) => {
+ val thisIndex = thisIter.next()
+ assert(!thisIter.hasNext)
+ val otherIndex = otherIter.next()
+ assert(!otherIter.hasNext)
+ val newIndex = new BlockIndex[K]()
+ // @todo Merge only the keys that correspond to non-null values
+ // Merge the keys
+ newIndex.putAll(thisIndex)
+ newIndex.putAll(otherIndex)
+ // We need to rekey the index
+ var ctr = 0
+ for (e <- newIndex.entrySet) {
+ e.setValue(ctr)
+ ctr += 1
+ }
+ List(newIndex).iterator
+ }).cache()
+ // Use the new index along with the this and the other indices to merge the values
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
+ newIndex.zipPartitions(self.tuples, other.tuples)(
+ (newIndexIter, thisTuplesIter, otherTuplesIter) => {
+ // Get the new index for this partition
+ val newIndex = newIndexIter.next()
+ assert(!newIndexIter.hasNext)
+ // Get the corresponding indicies and values for this and the other IndexedRDD
+ val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
+ assert(!thisTuplesIter.hasNext)
+ val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
+ assert(!otherTuplesIter.hasNext)
+ // Preallocate the new Values array
+ val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
+ val newBS = new BitSet(newIndex.size)
+
+ // Lookup the sequences in both submaps
+ for ((k,ind) <- newIndex) {
+ // Get the left key
+ val a = if (thisIndex.contains(k)) {
+ val ind = thisIndex.get(k)
+ if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
+ } else Seq.empty[V]
+ // Get the right key
+ val b = if (otherIndex.contains(k)) {
+ val ind = otherIndex.get(k)
+ if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
+ } else Seq.empty[W]
+ // If at least one key was present then we generate a tuple.
+ if (!a.isEmpty || !b.isEmpty) {
+ newValues(ind) = (a, b)
+ newBS(ind) = true
+ }
+ }
+ Iterator((newValues.toIndexedSeq, newBS))
+ })
+ new IndexedRDD(new RDDIndex(newIndex), newValues)
+ }
+ case _ => {
+ // Get the partitioner from the index
+ val partitioner = self.index.rdd.partitioner match {
+ case Some(p) => p
+ case None => throw new SparkException("An index must have a partitioner.")
+ }
+ // Shuffle the other RDD using the partitioner for this index
+ val otherShuffled =
+ if (other.partitioner == Some(partitioner)) {
+ other
+ } else {
+ new ShuffledRDD[K, W, (K,W)](other, partitioner)
+ }
+ // Join the other RDD with this RDD building a new valueset and new index on the fly
+ val groups =
+ self.tuples.zipPartitions(otherShuffled)(
+ (thisTuplesIter, otherTuplesIter) => {
+ // Get the corresponding indicies and values for this IndexedRDD
+ val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
+ assert(!thisTuplesIter.hasNext())
+ // Construct a new index
+ val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
+ // Construct a new array Buffer to store the values
+ val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
+ val newBS = new BitSet(thisValues.size)
+ // populate the newValues with the values in this IndexedRDD
+ for ((k,i) <- thisIndex) {
+ if (thisBS(i)) {
+ newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
+ newBS(i) = true
+ }
+ }
+ // Now iterate through the other tuples updating the map
+ for ((k,w) <- otherTuplesIter){
+ if (newIndex.contains(k)) {
+ val ind = newIndex.get(k)
+ if(newBS(ind)) {
+ newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
+ } else {
+ // If the other key was in the index but not in the values
+ // of this indexed RDD then create a new values entry for it
+ newBS(ind) = true
+ newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
+ }
+ } else {
+ // update the index
+ val ind = newIndex.size
+ newIndex.put(k, ind)
+ newBS(ind) = true
+ // Update the values
+ newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
+ }
+ }
+ // // Finalize the new values array
+ // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
+ // newValues.view.map{
+ // case null => null
+ // case (s, ab) => Seq((s, ab.toSeq))
+ // }.toSeq
+ Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
+ }).cache()
+
+ // Extract the index and values from the above RDD
+ val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
+ groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
+
+ new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
+ }
+ }
+ }
+
+
+}
+
+//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { }
+
+
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 93b78e1232..8a66297f6f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -68,7 +68,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
- def combineByKey[C](createCombiner: V => C,
+ def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
@@ -108,7 +108,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
/**
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
- def combineByKey[C](createCombiner: V => C,
+ def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = {
@@ -245,7 +245,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- new ShuffledRDD[K, V, (K, V)](self, partitioner)
+ new ShuffledRDD[K, V, (K, V)](self, partitioner)
}
/**
@@ -253,7 +253,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
- def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+ def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
@@ -265,7 +265,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
- def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+
+ def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
+ RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
@@ -281,7 +283,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
- def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
@@ -296,7 +298,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+ def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
@@ -324,7 +326,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+ def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, defaultPartitioner(self, other))
}
@@ -333,7 +335,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+ def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
join(other, new HashPartitioner(numPartitions))
}
@@ -343,7 +345,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
- def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, defaultPartitioner(self, other))
}
@@ -353,7 +355,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, new HashPartitioner(numPartitions))
}
@@ -363,7 +365,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
- def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+ def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, defaultPartitioner(self, other))
}
@@ -373,7 +375,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+ def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, new HashPartitioner(numPartitions))
}
@@ -392,16 +394,25 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
- def mapValues[U](f: V => U): RDD[(K, U)] = {
+ def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
+
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
+ self.map{ case (k,v) => (k, f(k,v)) }
+ }
+
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
- def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+ def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
@@ -410,7 +421,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -425,7 +436,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
+ def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
@@ -441,7 +452,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
@@ -449,7 +460,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@@ -458,7 +469,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}
@@ -466,18 +477,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
+ def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
/** Alias for cogroup. */
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
/** Alias for cogroup. */
- def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@@ -698,6 +709,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
*/
def values: RDD[V] = self.map(_._2)
+
+
+ def indexed(): IndexedRDD[K,V] = IndexedRDD(self)
+
+ def indexed(numPartitions: Int): IndexedRDD[K,V] =
+ IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions)))
+
+ def indexed(partitioner: Partitioner): IndexedRDD[K,V] =
+ IndexedRDD(self.partitionBy(partitioner))
+
+ def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] =
+ IndexedRDD(self, existingIndex)
+
+
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0355618e43..d14b4c60c7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -805,6 +805,26 @@ abstract class RDD[T: ClassManifest](
return buf.toArray
}
+
+
+ /**
+ * For RDD[(K,V)] this function returns a pair-functions object for this RDD
+ */
+ def pairRDDFunctions[K, V](
+ implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]):
+ PairRDDFunctions[K, V] = {
+ new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
+ }
+
+
+ /**
+ * Construct an index over the unique elements in this RDD. The
+ * index can then be used to organize a RDD[(T,V)].
+ */
+ def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] =
+ IndexedRDD.makeIndex(this, partitioner)
+
+
/**
* Return the first element in this RDD.
*/
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 55b25f145a..263ff59ba6 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -63,14 +63,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Allow the user to register their own classes by setting spark.kryo.registrator
- try {
- Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
- logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
- } catch {
- case _: Exception => println("Failed to register spark.kryo.registrator")
+ Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ logDebug("Running user registrator: " + regCls)
+ val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ reg.registerClasses(kryo)
}
// Register Chill's classes; we do this after our ranges and the user's own classes to let
@@ -122,7 +118,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
}
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
+private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging {
val kryo = ks.newKryo()
// Make these lazy vals to avoid creating a buffer unless we use them
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 2955986fec..5082730ae3 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
* instance of the serializer object has been created, the get method returns that instead of
* creating a new one.
*/
-private[spark] class SerializerManager {
+private[spark] class SerializerManager extends org.apache.spark.Logging {
private val serializers = new ConcurrentHashMap[String, Serializer]
private var _default: Serializer = _
diff --git a/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala
new file mode 100644
index 0000000000..dadb183bdc
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import com.google.common.io.Files
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.rdd.IndexedRDD
+
+import org.apache.spark.SparkContext._
+import org.apache.spark._
+
+
+
+class IndexedRDDSuite extends FunSuite with SharedSparkContext {
+
+ def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = {
+ val set = new collection.mutable.HashSet[RDD[_]]
+ def visit(rdd: RDD[_]) {
+ for (dep <- rdd.dependencies) {
+ set += dep.rdd
+ visit(dep.rdd)
+ }
+ }
+ visit(rdd)
+ set
+ }
+
+ test("groupByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with duplicates") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with negative key hash codes") {
+ val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesForMinus1 = groups.find(_._1 == -1).get._2
+ assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with many output partitions") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10)
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("reduceByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with collectAsMap") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val sums = pairs.reduceByKey(_+_).collectAsMap()
+ assert(sums.size === 2)
+ assert(sums(1) === 7)
+ assert(sums(2) === 1)
+ }
+
+ test("reduceByKey with many output partitons") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10)
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with partitioner") {
+ val p = new Partitioner() {
+ def numPartitions = 2
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p)
+ val sums = pairs.reduceByKey(_+_)
+ assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+ assert(sums.partitioner === Some(p))
+ // count the dependencies to make sure there is only 1 ShuffledRDD
+ val deps = lineage(sums)
+
+ assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection
+ }
+
+
+
+ test("joinIndexVsPair") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("joinIndexVsIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("joinSharedIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z')),
+ (4, (-4, 'w')),
+ (4, (4, 'w'))
+ ))
+ }
+
+
+ test("join all-to-all") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (1, 'y')),
+ (1, (2, 'x')),
+ (1, (2, 'y')),
+ (1, (3, 'x')),
+ (1, (3, 'y'))
+ ))
+ }
+
+ test("leftOuterJoinIndex") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+ test("leftOuterJoinIndextoIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+ test("leftOuterJoinIndextoSharedIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (4, (-4, Some('w'))),
+ (3, (1, None))
+ ))
+ }
+
+test("leftOuterJoinIndextoIndexExternal") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+
+ test("rightOuterJoin") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+ test("rightOuterJoinIndex2Index") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+
+ test("rightOuterJoinIndex2Indexshared") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+
+ test("join with no matches index") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 0)
+ }
+
+ test("join with no matches shared index") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 0)
+ }
+
+
+ test("join with many output partitions") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("join with many output partitions and two indices") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+
+ test("groupWith") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.groupWith(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
+ (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
+ (3, (ArrayBuffer(1), ArrayBuffer())),
+ (4, (ArrayBuffer(), ArrayBuffer('w')))
+ ))
+ }
+
+ test("zero-partition RDD") {
+ val emptyDir = Files.createTempDir()
+ val file = sc.textFile(emptyDir.getAbsolutePath)
+ assert(file.partitions.size == 0)
+ assert(file.collect().toList === Nil)
+ // Test that a shuffle on the file works, because this used to be a bug
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ }
+
+ test("keys and values") {
+ val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed()
+ assert(rdd.keys.collect().toList === List(1, 2))
+ assert(rdd.values.collect().toList === List("a", "b"))
+ }
+
+ test("default partitioner uses partition size") {
+ // specify 2000 partitions
+ val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+ // do a map, which loses the partitioner
+ val b = a.map(a => (a, (a * 2).toString))
+ // then a group by, and see we didn't revert to 2 partitions
+ val c = b.groupByKey()
+ assert(c.partitions.size === 2000)
+ }
+
+ // test("default partitioner uses largest partitioner indexed to indexed") {
+ // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed()
+ // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed()
+ // val c = a.join(b)
+ // assert(c.partitions.size === 2000)
+ // }
+
+
+
+ test("subtract") {
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ // Ideally we could keep the original partitioner...
+ assert(c.partitioner === None)
+ }
+
+ test("subtractByKey") {
+
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed()
+ val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ // test("subtractByKey with narrow dependency") {
+ // // use a deterministic partitioner
+ // val p = new Partitioner() {
+ // def numPartitions = 5
+ // def getPartition(key: Any) = key.asInstanceOf[Int]
+ // }
+
+ // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p))
+ // // partitionBy so we have a narrow dependency
+ // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index)
+ // // more partitions/no partitioner so a shuffle dependency
+ // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index)
+ // val c = a.subtractByKey(b)
+ // assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ // assert(c.partitioner.get === p)
+ // }
+
+ test("foldByKey") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
+ val sums = pairs.foldByKey(0)(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("foldByKey with mutable result type") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
+ val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
+ // Fold the values using in-place mutation
+ val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
+ assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
+ // Check that the mutable objects in the original RDD were not changed
+ assert(bufs.collect().toSet === Set(
+ (1, ArrayBuffer(1)),
+ (1, ArrayBuffer(2)),
+ (1, ArrayBuffer(3)),
+ (1, ArrayBuffer(1)),
+ (2, ArrayBuffer(1))))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala
new file mode 100644
index 0000000000..3a2ce4e4da
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+
+import org.scalatest.FunSuite
+import org.scalatest.prop.Checkers
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+
+import com.google.common.io.Files
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.rdd.IndexedRDD
+
+import org.apache.spark.SparkContext._
+import org.apache.spark._
+
+
+
+class IndexedRDDSuite extends FunSuite with SharedSparkContext {
+
+ def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = {
+ val set = new collection.mutable.HashSet[RDD[_]]
+ def visit(rdd: RDD[_]) {
+ for (dep <- rdd.dependencies) {
+ set += dep.rdd
+ visit(dep.rdd)
+ }
+ }
+ visit(rdd)
+ set
+ }
+
+ test("groupByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with duplicates") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with negative key hash codes") {
+ val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed()
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesForMinus1 = groups.find(_._1 == -1).get._2
+ assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey with many output partitions") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10)
+ val groups = pairs.groupByKey().collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("reduceByKey") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with collectAsMap") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed()
+ val sums = pairs.reduceByKey(_+_).collectAsMap()
+ assert(sums.size === 2)
+ assert(sums(1) === 7)
+ assert(sums(2) === 1)
+ }
+
+ test("reduceByKey with many output partitons") {
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10)
+ val sums = pairs.reduceByKey(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("reduceByKey with partitioner") {
+ val p = new Partitioner() {
+ def numPartitions = 2
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p)
+ val sums = pairs.reduceByKey(_+_)
+ assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+ assert(sums.partitioner === Some(p))
+ // count the dependencies to make sure there is only 1 ShuffledRDD
+ val deps = lineage(sums)
+
+ assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection
+ }
+
+
+
+ test("joinIndexVsPair") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("joinIndexVsIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("joinSharedIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z')),
+ (4, (-4, 'w')),
+ (4, (4, 'w'))
+ ))
+ }
+
+
+ test("join all-to-all") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (1, 'y')),
+ (1, (2, 'x')),
+ (1, (2, 'y')),
+ (1, (3, 'x')),
+ (1, (3, 'y'))
+ ))
+ }
+
+ test("leftOuterJoinIndex") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+ test("leftOuterJoinIndextoIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+ test("leftOuterJoinIndextoSharedIndex") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index)
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 6)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (4, (-4, Some('w'))),
+ (3, (1, None))
+ ))
+ }
+
+test("leftOuterJoinIndextoIndexExternal") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.leftOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (1, Some('x'))),
+ (1, (2, Some('x'))),
+ (2, (1, Some('y'))),
+ (2, (1, Some('z'))),
+ (3, (1, None))
+ ))
+ }
+
+
+ test("rightOuterJoin") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+ test("rightOuterJoinIndex2Index") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed()
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed()
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+
+ test("rightOuterJoinIndex2Indexshared") {
+ val index = sc.parallelize( 1 to 6 ).makeIndex()
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.rightOuterJoin(rdd2).collect()
+ assert(joined.size === 5)
+ assert(joined.toSet === Set(
+ (1, (Some(1), 'x')),
+ (1, (Some(2), 'x')),
+ (2, (Some(1), 'y')),
+ (2, (Some(1), 'z')),
+ (4, (None, 'w'))
+ ))
+ }
+
+
+ test("join with no matches index") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 0)
+ }
+
+ test("join with no matches shared index") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 0)
+ }
+
+
+ test("join with many output partitions") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+ test("join with many output partitions and two indices") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20)
+ val joined = rdd1.join(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (1, 'x')),
+ (1, (2, 'x')),
+ (2, (1, 'y')),
+ (2, (1, 'z'))
+ ))
+ }
+
+
+ test("groupWith") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index)
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index)
+ val joined = rdd1.groupWith(rdd2).collect()
+ assert(joined.size === 4)
+ assert(joined.toSet === Set(
+ (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
+ (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
+ (3, (ArrayBuffer(1), ArrayBuffer())),
+ (4, (ArrayBuffer(), ArrayBuffer('w')))
+ ))
+ }
+
+ test("zero-partition RDD") {
+ val emptyDir = Files.createTempDir()
+ val file = sc.textFile(emptyDir.getAbsolutePath)
+ assert(file.partitions.size == 0)
+ assert(file.collect().toList === Nil)
+ // Test that a shuffle on the file works, because this used to be a bug
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ }
+
+ test("keys and values") {
+ val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed()
+ assert(rdd.keys.collect().toList === List(1, 2))
+ assert(rdd.values.collect().toList === List("a", "b"))
+ }
+
+ test("default partitioner uses partition size") {
+ // specify 2000 partitions
+ val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+ // do a map, which loses the partitioner
+ val b = a.map(a => (a, (a * 2).toString))
+ // then a group by, and see we didn't revert to 2 partitions
+ val c = b.groupByKey()
+ assert(c.partitions.size === 2000)
+ }
+
+ // test("default partitioner uses largest partitioner indexed to indexed") {
+ // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed()
+ // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed()
+ // val c = a.join(b)
+ // assert(c.partitions.size === 2000)
+ // }
+
+
+
+ test("subtract") {
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ // Ideally we could keep the original partitioner...
+ assert(c.partitioner === None)
+ }
+
+ test("subtractByKey") {
+
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed()
+ val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ // test("subtractByKey with narrow dependency") {
+ // // use a deterministic partitioner
+ // val p = new Partitioner() {
+ // def numPartitions = 5
+ // def getPartition(key: Any) = key.asInstanceOf[Int]
+ // }
+
+ // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p))
+ // // partitionBy so we have a narrow dependency
+ // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index)
+ // // more partitions/no partitioner so a shuffle dependency
+ // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index)
+ // val c = a.subtractByKey(b)
+ // assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ // assert(c.partitioner.get === p)
+ // }
+
+ test("foldByKey") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
+ val sums = pairs.foldByKey(0)(_+_).collect()
+ assert(sums.toSet === Set((1, 7), (2, 1)))
+ }
+
+ test("foldByKey with mutable result type") {
+ val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) )
+
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index)
+ val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
+ // Fold the values using in-place mutation
+ val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
+ assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
+ // Check that the mutable objects in the original RDD were not changed
+ assert(bufs.collect().toSet === Set(
+ (1, ArrayBuffer(1)),
+ (1, ArrayBuffer(2)),
+ (1, ArrayBuffer(3)),
+ (1, ArrayBuffer(1)),
+ (2, ArrayBuffer(1))))
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index cfafbaf23e..8dd7fb40e8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -31,16 +31,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
import com.esotericsoftware.kryo._
class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double)(
+ def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)(
self: PRVertex, messageSum: Option[Double], superstep: Int
): (PRVertex, Array[PRMessage]) = {
val newValue = messageSum match {
case Some(msgSum) if msgSum != 0 =>
- 0.15 / numVertices + 0.85 * msgSum
+ 0.15 + 0.85 * msgSum
case _ => self.value
}
- val terminate = superstep >= 10
+ val terminate = superstep >= terminateSteps
val outbox: Array[PRMessage] =
if (!terminate)
diff --git a/graph/pom.xml b/graph/pom.xml
new file mode 100644
index 0000000000..1cd9cda98b
--- /dev/null
+++ b/graph/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-graph</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Graph</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
new file mode 100644
index 0000000000..92632db491
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -0,0 +1,659 @@
+package org.apache.spark.graph
+
+import org.apache.spark._
+
+
+
+object Analytics extends Logging {
+
+ /**
+ * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+ */
+ def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ numIter: Int,
+ resetProb: Double = 0.15) = {
+ // Compute the out degree of each vertex
+ val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
+ (vid, vdata, deg) => (deg.getOrElse(0), 1.0)
+ }
+
+ println(pagerankGraph.statistics)
+
+ Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
+ (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply
+ (me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather
+ (a: Double, b: Double) => a + b, // merge
+ 1.0,
+ numIter).mapVertices{ case (id, (outDeg, r)) => r }
+ }
+
+
+ /**
+ * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+ */
+ def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ tol: Float,
+ maxIter: Int = Integer.MAX_VALUE,
+ resetProb: Double = 0.15) = {
+ // Compute the out degree of each vertex
+ val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
+ (id, data, degIter) => (degIter.sum, 1.0, 1.0)
+ }
+
+ // Run PageRank
+ GraphLab.iterate(pagerankGraph)(
+ (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather
+ (a: Double, b: Double) => a + b,
+ (id, data, a: Option[Double]) =>
+ (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
+ (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter
+ maxIter).mapVertices { case (vid, data) => data._2 }
+ }
+
+
+ /**
+ * Compute the connected component membership of each vertex
+ * and return an RDD with the vertex value containing the
+ * lowest vertex id in the connected component containing
+ * that vertex.
+ */
+ def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
+ val ccGraph = graph.mapVertices { case (vid, _) => vid }
+ GraphLab.iterate(ccGraph)(
+ (me_id, edge) => edge.otherVertexAttr(me_id), // gather
+ (a: Vid, b: Vid) => math.min(a, b), // merge
+ (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
+ (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)), // scatter
+ numIter,
+ gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
+ )
+ }
+
+ def main(args: Array[String]) = {
+ val host = args(0)
+ val taskType = args(1)
+ val fname = args(2)
+ val options = args.drop(3).map { arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+
+ def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
+ loggers.map{
+ loggerName =>
+ val logger = org.apache.log4j.Logger.getLogger(loggerName)
+ val prevLevel = logger.getLevel()
+ logger.setLevel(level)
+ loggerName -> prevLevel
+ }.toMap
+ }
+// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark"))
+
+ val serializer = "org.apache.spark.serializer.KryoSerializer"
+ System.setProperty("spark.serializer", serializer)
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+
+ taskType match {
+ case "pagerank" => {
+
+ var numIter = Int.MaxValue
+ var isDynamic = false
+ var tol:Float = 0.001F
+ var outFname = ""
+ var numVPart = 4
+ var numEPart = 4
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("tol", v) => tol = v.toFloat
+ case ("output", v) => outFname = v
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("numEPart", v) => numEPart = v.toInt
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| PageRank |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ if(isDynamic) println(" \t |-> Tolerance: " + tol)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "PageRank(" + fname + ")")
+
+ val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
+ minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
+
+ val startTime = System.currentTimeMillis
+ logInfo("GRAPHX: starting tasks")
+ logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
+ logInfo("GRAPHX: Number of edges " + graph.edges.count)
+
+ val pr = Analytics.pagerank(graph, numIter)
+ // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
+ // else Analytics.pagerank(graph, numIter)
+ logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
+ if (!outFname.isEmpty) {
+ println("Saving pageranks of pages to " + outFname)
+ pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ }
+ logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+ sc.stop()
+ }
+
+ case "cc" => {
+
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ //val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
+ val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
+ minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
+ val cc = Analytics.connectedComponents(graph, numIter)
+ //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
+ // else Analytics.connectedComponents(graph, numIter)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+
+ sc.stop()
+ }
+//
+// case "shortestpath" => {
+//
+// var numIter = Int.MaxValue
+// var isDynamic = true
+// var sources: List[Int] = List.empty
+//
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("source", v) => sources ++= List(v.toInt)
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+//
+//
+// if(!isDynamic && numIter == Int.MaxValue) {
+// println("Set number of iterations!")
+// sys.exit(1)
+// }
+//
+// if(sources.isEmpty) {
+// println("No sources provided!")
+// sys.exit(1)
+// }
+//
+// println("======================================")
+// println("| Shortest Path |")
+// println("--------------------------------------")
+// println(" Using parameters:")
+// println(" \tDynamic: " + isDynamic)
+// println(" \tNumIter: " + numIter)
+// println(" \tSources: [" + sources.mkString(", ") + "]")
+// println("======================================")
+//
+// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
+// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
+// //val sp = Analytics.shortestPath(graph, sources, numIter)
+// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
+// // else Analytics.shortestPath(graph, sources, numIter)
+// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
+//
+// sc.stop()
+// }
+
+
+ // case "als" => {
+
+ // var numIter = 5
+ // var lambda = 0.01
+ // var latentK = 10
+ // var usersFname = "usersFactors.tsv"
+ // var moviesFname = "moviesFname.tsv"
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("lambda", v) => lambda = v.toDouble
+ // case ("latentK", v) => latentK = v.toInt
+ // case ("usersFname", v) => usersFname = v
+ // case ("moviesFname", v) => moviesFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // println("======================================")
+ // println("| Alternating Least Squares |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tNumIter: " + numIter)
+ // println(" \tLambda: " + lambda)
+ // println(" \tLatentK: " + latentK)
+ // println(" \tusersFname: " + usersFname)
+ // println(" \tmoviesFname: " + moviesFname)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ALS(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
+ // graph.numVPart = numVPart
+ // graph.numEPart = numEPart
+
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
+ // assert(maxUser < minMovie)
+
+ // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
+ // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(usersFname)
+ // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(moviesFname)
+
+ // sc.stop()
+ // }
+
+
+ case _ => {
+ println("Invalid task type.")
+ }
+ }
+ }
+
+ // /**
+ // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
+ // */
+ // def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ // tol: Double, maxIter: Int = 10) = {
+ // // Compute the out degree of each vertex
+ // val pagerankGraph = graph.updateVertices[Int, (Int, Double, Double)](graph.outDegrees,
+ // (vertex, degIter) => (degIter.sum, 1.0, 1.0)
+ // )
+
+ // // Run PageRank
+ // GraphLab.iterateGAS(pagerankGraph)(
+ // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
+ // (a: Double, b: Double) => a + b,
+ // (vertex, a: Option[Double]) =>
+ // (vertex.data._1, (0.15 + 0.85 * a.getOrElse(0.0)), vertex.data._2), // apply
+ // (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
+ // maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
+ // }
+
+ // /**
+ // * Compute the connected component membership of each vertex
+ // * and return an RDD with the vertex value containing the
+ // * lowest vertex id in the connected component containing
+ // * that vertex.
+ // */
+ // def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
+ // val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
+ // GraphLab.iterateGA[Int, ED, Int](ccGraph)(
+ // (me_id, edge) => edge.otherVertex(me_id).data, // gather
+ // (a: Int, b: Int) => math.min(a, b), // merge
+ // (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
+ // numIter,
+ // gatherDirection = EdgeDirection.Both)
+ // }
+
+ // /**
+ // * Compute the shortest path to a set of markers
+ // */
+ // def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = {
+ // val sourceSet = sources.toSet
+ // val spGraph = graph.mapVertices {
+ // case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0 else Double.MaxValue))
+ // }
+ // GraphLab.iterateGA[Double, Double, Double](spGraph)(
+ // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
+ // (a: Double, b: Double) => math.min(a, b), // merge
+ // (v, a: Option[Double]) => math.min(v.data, a.getOrElse(Double.MaxValue)), // apply
+ // numIter,
+ // gatherDirection = EdgeDirection.In)
+ // }
+
+ // /**
+ // * Compute the connected component membership of each vertex
+ // * and return an RDD with the vertex value containing the
+ // * lowest vertex id in the connected component containing
+ // * that vertex.
+ // */
+ // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ // numIter: Int = Int.MaxValue) = {
+
+ // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
+ // val edges = graph.edges // .mapValues(v => None)
+ // val ccGraph = new Graph(vertices, edges)
+
+ // ccGraph.iterateDynamic(
+ // (me_id, edge) => edge.otherVertex(me_id).data, // gather
+ // (a: Int, b: Int) => math.min(a, b), // merge
+ // Integer.MAX_VALUE,
+ // (v, a: Int) => math.min(v.data, a), // apply
+ // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
+ // numIter,
+ // gatherEdges = EdgeDirection.Both,
+ // scatterEdges = EdgeDirection.Both).vertices
+ // //
+ // // graph_ret.vertices.collect.foreach(println)
+ // // graph_ret.edges.take(10).foreach(println)
+ // }
+
+
+ // /**
+ // * Compute the shortest path to a set of markers
+ // */
+ // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Double],
+ // sources: List[Int], numIter: Int) = {
+ // val sourceSet = sources.toSet
+ // val vertices = graph.vertices.mapPartitions(
+ // iter => iter.map {
+ // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Double.MaxValue) )
+ // });
+
+ // val edges = graph.edges // .mapValues(v => None)
+ // val spGraph = new Graph(vertices, edges)
+
+ // val niterations = Int.MaxValue
+ // spGraph.iterateDynamic(
+ // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
+ // (a: Double, b: Double) => math.min(a, b), // merge
+ // Double.MaxValue,
+ // (v, a: Double) => math.min(v.data, a), // apply
+ // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
+ // numIter,
+ // gatherEdges = EdgeDirection.In,
+ // scatterEdges = EdgeDirection.Out).vertices
+ // }
+
+
+ // /**
+ // *
+ // */
+ // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
+ // latentK: Int, lambda: Double, numIter: Int) = {
+ // val vertices = graph.vertices.mapPartitions( _.map {
+ // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
+ // }).cache
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val edges = graph.edges // .mapValues(v => None)
+ // val alsGraph = new Graph(vertices, edges)
+ // alsGraph.numVPart = graph.numVPart
+ // alsGraph.numEPart = graph.numEPart
+
+ // val niterations = Int.MaxValue
+ // alsGraph.iterateDynamic[(Array[Double], Array[Double])](
+ // (me_id, edge) => { // gather
+ // val X = edge.otherVertex(me_id).data
+ // val y = edge.data
+ // val Xy = X.map(_ * y)
+ // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
+ // (Xy, XtX)
+ // },
+ // (a, b) => {
+ // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
+ // // runtime
+ // var i = 0
+ // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
+ // i = 0
+ // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
+ // a
+ // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
+ // },
+ // (Array.empty[Double], Array.empty[Double]), // default value is empty
+ // (vertex, accum) => { // apply
+ // val XyArray = accum._1
+ // val XtXArray = accum._2
+ // if(XyArray.isEmpty) vertex.data // no neighbors
+ // else {
+ // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
+ // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
+ // (if(i == j) lambda else 1.0F) //regularization
+ // }
+ // val Xy = DenseMatrix.create(latentK,1,XyArray)
+ // val w = XtX \ Xy
+ // w.data
+ // }
+ // },
+ // (me_id, edge) => true,
+ // numIter,
+ // gatherEdges = EdgeDirection.Both,
+ // scatterEdges = EdgeDirection.Both,
+ // vertex => vertex.id < maxUser).vertices
+ // }
+
+ // def main(args: Array[String]) = {
+ // val host = args(0)
+ // val taskType = args(1)
+ // val fname = args(2)
+ // val options = args.drop(3).map { arg =>
+ // arg.dropWhile(_ == '-').split('=') match {
+ // case Array(opt, v) => (opt -> v)
+ // case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ // }
+ // }
+
+ // System.setProperty("spark.serializer", "spark.KryoSerializer")
+ // //System.setProperty("spark.shuffle.compress", "false")
+ // System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
+
+ // taskType match {
+ // case "pagerank" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = false
+ // var tol:Double = 0.001
+ // var outFname = ""
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case ("tol", v) => tol = v.toDouble
+ // case ("output", v) => outFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+ // println("======================================")
+ // println("| PageRank |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // if(isDynamic) println(" \t |-> Tolerance: " + tol)
+ // println(" \tNumIter: " + numIter)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "PageRank(" + fname + ")")
+
+ // val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
+
+ // val startTime = System.currentTimeMillis
+ // logInfo("GRAPHX: starting tasks")
+ // logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
+ // logInfo("GRAPHX: Number of edges " + graph.edges.count)
+
+ // val pr = Analytics.pagerank(graph, numIter)
+ // // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
+ // // else Analytics.pagerank(graph, numIter)
+ // logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
+ // if (!outFname.isEmpty) {
+ // println("Saving pageranks of pages to " + outFname)
+ // pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ // }
+ // logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+ // sc.stop()
+ // }
+
+ // case "cc" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = false
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+ // println("======================================")
+ // println("| Connected Components |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // println(" \tNumIter: " + numIter)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => 1.0)
+ // val cc = Analytics.connectedComponents(graph, numIter)
+ // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
+ // // else Analytics.connectedComponents(graph, numIter)
+ // println("Components: " + cc.vertices.map(_.data).distinct())
+
+ // sc.stop()
+ // }
+
+ // case "shortestpath" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = true
+ // var sources: List[Int] = List.empty
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case ("source", v) => sources ++= List(v.toInt)
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+
+ // if(sources.isEmpty) {
+ // println("No sources provided!")
+ // sys.exit(1)
+ // }
+
+ // println("======================================")
+ // println("| Shortest Path |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // println(" \tNumIter: " + numIter)
+ // println(" \tSources: [" + sources.mkString(", ") + "]")
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
+ // val sp = Analytics.shortestPath(graph, sources, numIter)
+ // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
+ // // else Analytics.shortestPath(graph, sources, numIter)
+ // println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
+
+ // sc.stop()
+ // }
+
+
+ // case "als" => {
+
+ // var numIter = 5
+ // var lambda = 0.01
+ // var latentK = 10
+ // var usersFname = "usersFactors.tsv"
+ // var moviesFname = "moviesFname.tsv"
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("lambda", v) => lambda = v.toDouble
+ // case ("latentK", v) => latentK = v.toInt
+ // case ("usersFname", v) => usersFname = v
+ // case ("moviesFname", v) => moviesFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // println("======================================")
+ // println("| Alternating Least Squares |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tNumIter: " + numIter)
+ // println(" \tLambda: " + lambda)
+ // println(" \tLatentK: " + latentK)
+ // println(" \tusersFname: " + usersFname)
+ // println(" \tmoviesFname: " + moviesFname)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ALS(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
+ // graph.numVPart = numVPart
+ // graph.numEPart = numEPart
+
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
+ // assert(maxUser < minMovie)
+
+ // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
+ // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(usersFname)
+ // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(moviesFname)
+
+ // sc.stop()
+ // }
+
+
+ // case _ => {
+ // println("Invalid task type.")
+ // }
+ // }
+ // }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
new file mode 100644
index 0000000000..67b6454017
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
@@ -0,0 +1,34 @@
+package org.apache.spark.graph
+
+
+/**
+ * A single directed edge consisting of a source id, target id,
+ * and the data associated with the Edgee.
+ *
+ * @tparam ED type of the edge attribute
+ */
+case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
+ var srcId: Vid = 0,
+ var dstId: Vid = 0,
+ var attr: ED = nullValue[ED]) {
+
+ /**
+ * Given one vertex in the edge return the other vertex.
+ *
+ * @param vid the id one of the two vertices on the edge.
+ * @return the id of the other vertex on the edge.
+ */
+ def otherVertexId(vid: Vid): Vid =
+ if (srcId == vid) dstId else { assert(dstId == vid); srcId }
+
+
+ /**
+ * Return the relative direction of the edge to the corresponding vertex.
+ *
+ * @param vid the id of one of the two vertices in the edge.
+ * @return the relative direction of the edge to the corresponding vertex.
+ */
+ def relativeDirection(vid: Vid): EdgeDirection =
+ if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
new file mode 100644
index 0000000000..99af2d5458
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
@@ -0,0 +1,32 @@
+package org.apache.spark.graph
+
+
+/**
+ * The direction of directed edge relative to a vertex used to select
+ * the set of adjacent neighbors when running a neighborhood query.
+ */
+sealed abstract class EdgeDirection {
+ def reverse: EdgeDirection = this match {
+ case EdgeDirection.In => EdgeDirection.In
+ case EdgeDirection.Out => EdgeDirection.Out
+ case EdgeDirection.Both => EdgeDirection.Both
+ }
+}
+
+
+object EdgeDirection {
+ /**
+ * Edges arriving at a vertex.
+ */
+ case object In extends EdgeDirection
+
+ /**
+ * Edges originating from a vertex
+ */
+ case object Out extends EdgeDirection
+
+ /**
+ * All edges adjacent to a vertex
+ */
+ case object Both extends EdgeDirection
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
new file mode 100644
index 0000000000..ef3aa199bd
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
@@ -0,0 +1,56 @@
+package org.apache.spark.graph
+
+/**
+ * An edge triplet represents two vertices and edge along with their attributes.
+ *
+ * @tparam VD the type of the vertex attribute.
+ * @tparam ED the type of the edge attribute
+ *
+ * @todo specialize edge triplet for basic types, though when I last tried
+ * specializing I got a warning about inherenting from a type that is not
+ * a trait.
+ */
+class EdgeTriplet[VD, ED] extends Edge[ED] {
+// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
+// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
+
+
+ /**
+ * The source vertex attribute
+ */
+ var srcAttr: VD = _ //nullValue[VD]
+
+ /**
+ * The destination vertex attribute
+ */
+ var dstAttr: VD = _ //nullValue[VD]
+
+ /**
+ * Set the edge properties of this triplet.
+ */
+ protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = {
+ srcId = other.srcId
+ dstId = other.dstId
+ attr = other.attr
+ this
+ }
+
+ /**
+ * Given one vertex in the edge return the other vertex.
+ *
+ * @param vid the id one of the two vertices on the edge.
+ * @return the attribute for the other vertex on the edge.
+ */
+ def otherVertexAttr(vid: Vid): VD =
+ if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
+
+ /**
+ * Get the vertex object for the given vertex in the edge.
+ *
+ * @param vid the id of one of the two vertices on the edge
+ * @return the attr for the vertex with that id.
+ */
+ def vertexAttr(vid: Vid): VD =
+ if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
new file mode 100644
index 0000000000..50a44e51e5
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -0,0 +1,313 @@
+package org.apache.spark.graph
+
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
+
+
+
+/**
+ * The Graph abstractly represents a graph with arbitrary objects associated
+ * with vertices and edges. The graph provides basic operations to access and
+ * manipulate the data associated with vertices and edges as well as the
+ * underlying structure. Like Spark RDDs, the graph is a functional
+ * data-structure in which mutating operations return new graphs.
+ *
+ * @tparam VD The type of object associated with each vertex.
+ *
+ * @tparam ED The type of object associated with each edge
+ */
+abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
+
+ /**
+ * Get the vertices and their data.
+ *
+ * @note vertex ids are unique.
+ * @return An RDD containing the vertices in this graph
+ *
+ * @see Vertex for the vertex type.
+ *
+ */
+ val vertices: RDD[(Vid,VD)]
+
+ /**
+ * Get the Edges and their data as an RDD. The entries in the RDD contain
+ * just the source id and target id along with the edge data.
+ *
+ * @return An RDD containing the edges in this graph
+ *
+ * @see Edge for the edge type.
+ * @see edgesWithVertices to get an RDD which contains all the edges along
+ * with their vertex data.
+ *
+ * @todo Should edges return 3 tuples instead of Edge objects? In this case
+ * we could rename EdgeTriplet to Edge?
+ */
+ val edges: RDD[Edge[ED]]
+
+ /**
+ * Get the edges with the vertex data associated with the adjacent pair of
+ * vertices.
+ *
+ * @return An RDD containing edge triplets.
+ *
+ * @example This operation might be used to evaluate a graph coloring where
+ * we would like to check that both vertices are a different color.
+ * {{{
+ * type Color = Int
+ * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
+ * val numInvalid = graph.edgesWithVertices()
+ * .map(e => if(e.src.data == e.dst.data) 1 else 0).sum
+ * }}}
+ *
+ * @see edges() If only the edge data and adjacent vertex ids are required.
+ *
+ */
+ val triplets: RDD[EdgeTriplet[VD, ED]]
+
+ /**
+ * Return a graph that is cached when first created. This is used to pin a
+ * graph in memory enabling multiple queries to reuse the same construction
+ * process.
+ *
+ * @see RDD.cache() for a more detailed explanation of caching.
+ */
+ def cache(): Graph[VD, ED]
+
+
+ /**
+ * Compute statistics describing the graph representation.
+ */
+ def statistics: Map[String, Any]
+
+
+
+ /**
+ * Construct a new graph where each vertex value has been transformed by the
+ * map function.
+ *
+ * @note This graph is not changed and that the new graph has the same
+ * structure. As a consequence the underlying index structures can be
+ * reused.
+ *
+ * @param map the function from a vertex object to a new vertex value.
+ *
+ * @tparam VD2 the new vertex data type
+ *
+ * @example We might use this operation to change the vertex values from one
+ * type to another to initialize an algorithm.
+ * {{{
+ * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
+ * val root = 42
+ * var bfsGraph = rawGraph
+ * .mapVertices[Int]((vid, data) => if(vid == root) 0 else Math.MaxValue)
+ * }}}
+ *
+ */
+ def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED]
+
+ /**
+ * Construct a new graph where each the value of each edge is transformed by
+ * the map operation. This function is not passed the vertex value for the
+ * vertices adjacent to the edge. If vertex values are desired use the
+ * mapTriplets function.
+ *
+ * @note This graph is not changed and that the new graph has the same
+ * structure. As a consequence the underlying index structures can be
+ * reused.
+ *
+ * @param map the function from an edge object to a new edge value.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ * @example This function might be used to initialize edge attributes.
+ *
+ */
+ def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2]
+
+ /**
+ * Construct a new graph where each the value of each edge is transformed by
+ * the map operation. This function passes vertex values for the adjacent
+ * vertices to the map function. If adjacent vertex values are not required,
+ * consider using the mapEdges function instead.
+ *
+ * @note This graph is not changed and that the new graph has the same
+ * structure. As a consequence the underlying index structures can be
+ * reused.
+ *
+ * @param map the function from an edge object to a new edge value.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ * @example This function might be used to initialize edge attributes based
+ * on the attributes associated with each vertex.
+ * {{{
+ * val rawGraph: Graph[Int, Int] = someLoadFunction()
+ * val graph = rawGraph.mapTriplets[Int]( edge =>
+ * edge.src.data - edge.dst.data)
+ * }}}
+ *
+ */
+ def mapTriplets[ED2: ClassManifest](
+ map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+
+
+ /**
+ * Construct a new graph with all the edges reversed. If this graph contains
+ * an edge from a to b then the returned graph contains an edge from b to a.
+ *
+ */
+ def reverse: Graph[VD, ED]
+
+
+ /**
+ * This function takes a vertex and edge predicate and constructs the subgraph
+ * that consists of vertices and edges that satisfy the predict. The resulting
+ * graph contains the vertices and edges that satisfy:
+ *
+ * V' = {v : for all v in V where vpred(v)}
+ * E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
+ *
+ * @param epred the edge predicate which takes a triplet and evaluates to true
+ * if the edge is to remain in the subgraph. Note that only edges in which both
+ * vertices satisfy the vertex predicate are considered.
+ *
+ * @param vpred the vertex predicate which takes a vertex object and evaluates
+ * to true if the vertex is to be included in the subgraph
+ *
+ * @return the subgraph containing only the vertices and edges that satisfy the
+ * predicates.
+ */
+ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+
+
+
+ /**
+ * @todo document function
+ */
+ def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2]
+
+
+ /**
+ * @todo document function
+ */
+ def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2]
+
+
+ /**
+ * The mapReduceTriplets function is used to compute statistics about
+ * the neighboring edges and vertices of each vertex. The user supplied
+ * `mapFunc` function is invoked on each edge of the graph generating 0 or
+ * more "messages" to be "sent" to either vertex in the edge.
+ * The `reduceFunc` is then used to combine the output of the map phase
+ * destined to each vertex.
+ *
+ * @tparam A the type of "message" to be sent to each vertex
+ *
+ * @param mapFunc the user defined map function which returns 0 or
+ * more messages to neighboring vertices.
+ * @param reduceFunc the user defined reduce function which should be
+ * commutative and assosciative and is used to combine the output of
+ * the map phase.
+ *
+ * @example We can use this function to compute the inDegree of each
+ * vertex
+ * {{{
+ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
+ * val inDeg: RDD[(Vid, Int)] =
+ * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _)
+ * }}}
+ *
+ * @note By expressing computation at the edge level we achieve maximum
+ * parallelism. This is one of the core functions in the Graph API in that enables
+ * neighborhood level computation. For example this function can be used to
+ * count neighbors satisfying a predicate or implement PageRank.
+ *
+ */
+ def mapReduceTriplets[A: ClassManifest](
+ mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+ reduceFunc: (A, A) => A)
+ : RDD[(Vid, A)]
+
+
+ /**
+ * Join the vertices with an RDD and then apply a function from the the
+ * vertex and RDD entry to a new vertex value and type.
+ * The input table should contain at most one entry for each vertex.
+ * If no entry is provided the map function is invoked passing none.
+ *
+ * @tparam U the type of entry in the table of updates
+ * @tparam VD2 the new vertex value type
+ *
+ * @param table the table to join with the vertices in the graph. The table
+ * should contain at most one entry for each vertex.
+ * @param mapFunc the function used to compute the new vertex values. The
+ * map function is invoked for all vertices, even those that do not have a
+ * corresponding entry in the table.
+ *
+ * @example This function is used to update the vertices with new values
+ * based on external data. For example we could add the out degree to each
+ * vertex record
+ * {{{
+ * val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
+ * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees()
+ * val graph = rawGraph.outerJoinVertices(outDeg) {
+ * (vid, data, optDeg) => optDeg.getOrElse(0)
+ * }
+ * }}}
+ *
+ */
+ def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
+ (mapFunc: (Vid, VD, Option[U]) => VD2)
+ : Graph[VD2, ED]
+
+
+ // Save a copy of the GraphOps object so there is always one unique GraphOps object
+ // for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
+ val ops = new GraphOps(this)
+}
+
+
+/**
+ * The Graph Singleton contains basic routines to create graphs
+ */
+object Graph {
+
+ import org.apache.spark.graph.impl._
+ import org.apache.spark.SparkContext._
+
+ /**
+ * Construct a graph from a list of Edges.
+ *
+ * @param rawEdges a collection of edges in (src,dst) form.
+ * @param uniqueEdges if multiple identical edges are found they are combined
+ * and the edge attribute is set to the sum. Otherwise duplicate edges are
+ * treated as separate.
+ *
+ *
+ */
+ def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean = true): Graph[Int, Int] = {
+ // Reduce to unique edges.
+ val edges: RDD[Edge[Int]] =
+ if (uniqueEdges) {
+ rawEdges.map((_, 1)).reduceByKey(_ + _).map { case ((s, t), cnt) => Edge(s, t, cnt) }
+ } else {
+ rawEdges.map { case (s, t) => Edge(s, t, 1) }
+ }
+ // Determine unique vertices
+ /** @todo Should this reduceByKey operation be indexed? */
+ val vertices: RDD[(Vid, Int)] =
+ edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _)
+
+ // Return graph
+ GraphImpl(vertices, edges)
+ }
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges)
+ }
+
+ implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
new file mode 100644
index 0000000000..29ea38ec67
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.graph
+
+import com.esotericsoftware.kryo.Kryo
+
+import org.apache.spark.graph.impl.MessageToPartition
+import org.apache.spark.serializer.KryoRegistrator
+import org.apache.spark.graph.impl._
+
+class GraphKryoRegistrator extends KryoRegistrator {
+
+ def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Edge[Object]])
+ kryo.register(classOf[MutableTuple2[Object, Object]])
+ kryo.register(classOf[MessageToPartition[Object]])
+ kryo.register(classOf[(Vid, Object)])
+ kryo.register(classOf[EdgePartition[Object]])
+
+ // This avoids a large number of hash table lookups.
+ kryo.setReferences(false)
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
new file mode 100644
index 0000000000..8ba708ba32
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -0,0 +1,135 @@
+package org.apache.spark.graph
+
+import scala.collection.JavaConversions._
+import org.apache.spark.rdd.RDD
+
+/**
+ * This object implement the graphlab gather-apply-scatter api.
+ */
+object GraphLab {
+
+ /**
+ * Execute the GraphLab Gather-Apply-Scatter API
+ *
+ * @todo finish documenting GraphLab Gather-Apply-Scatter API
+ *
+ * @param graph The graph on which to execute the GraphLab API
+ * @param gatherFunc The gather function is executed on each edge triplet
+ * adjacent to a vertex and returns an accumulator which
+ * is then merged using the merge function.
+ * @param mergeFunc An accumulative associative operation on the result of
+ * the gather type.
+ * @param applyFunc Takes a vertex and the final result of the merge operations
+ * on the adjacent edges and returns a new vertex value.
+ * @param scatterFunc Executed after the apply function the scatter function takes
+ * a triplet and signals whether the neighboring vertex program
+ * must be recomputed.
+ * @param numIter The maximum number of iterations to run.
+ * @param gatherDirection The direction of edges to consider during the gather phase
+ * @param scatterDirection The direction of edges to consider during the scatter phase
+ *
+ * @tparam VD The graph vertex attribute type
+ * @tparam ED The graph edge attribute type
+ * @tparam A The type accumulated during the gather phase
+ * @return the resulting graph after the algorithm converges
+ */
+ def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
+ mergeFunc: (A, A) => A,
+ applyFunc: (Vid, VD, Option[A]) => VD,
+ scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
+ numIter: Int = Integer.MAX_VALUE,
+ gatherDirection: EdgeDirection = EdgeDirection.In,
+ scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = {
+
+
+ // Add an active attribute to all vertices to track convergence.
+ var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
+ case (id, data) => (true, data)
+ }.cache()
+
+ // The gather function wrapper strips the active attribute and
+ // only invokes the gather function on active vertices
+ def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
+ if (e.vertexAttr(vid)._1) {
+ val edgeTriplet = new EdgeTriplet[VD,ED]
+ edgeTriplet.set(e)
+ edgeTriplet.srcAttr = e.srcAttr._2
+ edgeTriplet.dstAttr = e.dstAttr._2
+ Some(gatherFunc(vid, edgeTriplet))
+ } else {
+ None
+ }
+ }
+
+ // The apply function wrapper strips the vertex of the active attribute
+ // and only invokes the apply function on active vertices
+ def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
+ val (active, vData) = data
+ if (active) (true, applyFunc(vid, vData, accum))
+ else (false, vData)
+ }
+
+ // The scatter function wrapper strips the vertex of the active attribute
+ // and only invokes the scatter function on active vertices
+ def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
+ val vid = e.otherVertexId(rawVid)
+ if (e.vertexAttr(vid)._1) {
+ val edgeTriplet = new EdgeTriplet[VD,ED]
+ edgeTriplet.set(e)
+ edgeTriplet.srcAttr = e.srcAttr._2
+ edgeTriplet.dstAttr = e.dstAttr._2
+ Some(scatterFunc(vid, edgeTriplet))
+ } else {
+ None
+ }
+ }
+
+ // Used to set the active status of vertices for the next round
+ def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
+ val (prevActive, vData) = data
+ (newActive, vData)
+ }
+
+ // Main Loop ---------------------------------------------------------------------
+ var i = 0
+ var numActive = activeGraph.numVertices
+ while (i < numIter && numActive > 0) {
+
+ // Gather
+ val gathered: RDD[(Vid, A)] =
+ activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
+
+ // Apply
+ activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
+
+
+
+ // Scatter is basically a gather in the opposite direction so we reverse the edge direction
+ // activeGraph: Graph[(Boolean, VD), ED]
+ val scattered: RDD[(Vid, Boolean)] =
+ activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+
+ activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
+
+ // Calculate the number of active vertices
+ numActive = activeGraph.vertices.map{
+ case (vid, data) => if (data._1) 1 else 0
+ }.reduce(_ + _)
+ println("Number active vertices: " + numActive)
+ i += 1
+ }
+
+ // Remove the active attribute from the vertex data before returning the graph
+ activeGraph.mapVertices{case (vid, data) => data._2 }
+ }
+}
+
+
+
+
+
+
+
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
new file mode 100644
index 0000000000..052f9acdeb
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -0,0 +1,54 @@
+package org.apache.spark.graph
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph.impl.GraphImpl
+
+
+object GraphLoader {
+
+ /**
+ * Load an edge list from file initializing the Graph RDD
+ */
+ def textFile[ED: ClassManifest](
+ sc: SparkContext,
+ path: String,
+ edgeParser: Array[String] => ED,
+ minEdgePartitions: Int = 1,
+ minVertexPartitions: Int = 1)
+ : GraphImpl[Int, ED] = {
+
+ // Parse the edge data table
+ val edges = sc.textFile(path, minEdgePartitions).flatMap { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if(lineArray.length < 2) {
+ println("Invalid line: " + line)
+ assert(false)
+ }
+ val source = lineArray(0)
+ val target = lineArray(1)
+ val tail = lineArray.drop(2)
+ val edata = edgeParser(tail)
+ Array(Edge(source.trim.toInt, target.trim.toInt, edata))
+ } else {
+ Array.empty[Edge[ED]]
+ }
+ }.cache()
+
+ val graph = fromEdges(edges)
+ // println("Loaded graph:" +
+ // "\n\t#edges: " + graph.numEdges +
+ // "\n\t#vertices: " + graph.numVertices)
+
+ graph
+ }
+
+ def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
+ val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
+ .reduceByKey(_ + _)
+ .map{ case (vid, degree) => (vid, degree) }
+ GraphImpl(vertices, edges)
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
new file mode 100644
index 0000000000..92198a4995
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -0,0 +1,166 @@
+package org.apache.spark.graph
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.util.ClosureCleaner
+
+
+class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
+
+
+
+ lazy val numEdges: Long = graph.edges.count()
+
+ lazy val numVertices: Long = graph.vertices.count()
+
+ lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In)
+
+ lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out)
+
+ lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
+
+
+ /**
+ * This function is used to compute a statistic for the neighborhood of each
+ * vertex and returns a value for all vertices (including those without
+ * neighbors).
+ *
+ * @note Because the a default value is provided all vertices will have a
+ * corresponding entry in the returned RDD.
+ *
+ * @param mapFunc the function applied to each edge adjacent to each vertex.
+ * The mapFunc can optionally return None in which case it does not
+ * contribute to the final sum.
+ * @param reduceFunc the function used to merge the results of each map
+ * operation.
+ * @param default the default value to use for each vertex if it has no
+ * neighbors or the map function repeatedly evaluates to none
+ * @param direction the direction of edges to consider (e.g., In, Out, Both).
+ * @tparam VD2 The returned type of the aggregation operation.
+ *
+ * @return A Spark.RDD containing tuples of vertex identifiers and
+ * their resulting value. There will be exactly one entry for ever vertex in
+ * the original graph.
+ *
+ * @example We can use this function to compute the average follower age
+ * for each user
+ * {{{
+ * val graph: Graph[Int,Int] = loadGraph()
+ * val averageFollowerAge: RDD[(Int, Int)] =
+ * graph.aggregateNeighbors[(Int,Double)](
+ * (vid, edge) => (edge.otherVertex(vid).data, 1),
+ * (a, b) => (a._1 + b._1, a._2 + b._2),
+ * -1,
+ * EdgeDirection.In)
+ * .mapValues{ case (sum,followers) => sum.toDouble / followers}
+ * }}}
+ *
+ * @todo Should this return a graph with the new vertex values?
+ *
+ */
+ def aggregateNeighbors[A: ClassManifest](
+ mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ reduceFunc: (A, A) => A,
+ dir: EdgeDirection)
+ : RDD[(Vid, A)] = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ // Define a new map function over edge triplets
+ val mf = (et: EdgeTriplet[VD,ED]) => {
+ // Compute the message to the dst vertex
+ val dst =
+ if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
+ mapFunc(et.dstId, et)
+ } else { Option.empty[A] }
+ // Compute the message to the source vertex
+ val src =
+ if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
+ mapFunc(et.srcId, et)
+ } else { Option.empty[A] }
+ // construct the return array
+ (src, dst) match {
+ case (None, None) => Array.empty[(Vid, A)]
+ case (Some(srcA),None) => Array((et.srcId, srcA))
+ case (None, Some(dstA)) => Array((et.dstId, dstA))
+ case (Some(srcA), Some(dstA)) =>
+ Array((et.srcId, srcA), (et.dstId, dstA))
+ }
+ }
+
+ ClosureCleaner.clean(mf)
+ graph.mapReduceTriplets(mf, reduceFunc)
+ } // end of aggregateNeighbors
+
+
+
+
+
+
+
+ def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
+ val nbrs = graph.aggregateNeighbors[Array[Vid]](
+ (vid, edge) => Some(Array(edge.otherVertexId(vid))),
+ (a, b) => a ++ b,
+ edgeDirection)
+
+ graph.vertices.leftOuterJoin(nbrs).mapValues{
+ case (_, Some(nbrs)) => nbrs
+ case (_, None) => Array.empty[Vid]
+ }
+ }
+
+
+ private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
+ graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
+ }
+
+
+ /**
+ * Join the vertices with an RDD and then apply a function from the the
+ * vertex and RDD entry to a new vertex value. The input table should
+ * contain at most one entry for each vertex. If no entry is provided the
+ * map function is skipped and the old value is used.
+ *
+ * @tparam U the type of entry in the table of updates
+ * @param table the table to join with the vertices in the graph. The table
+ * should contain at most one entry for each vertex.
+ * @param mapFunc the function used to compute the new vertex values. The
+ * map function is invoked only for vertices with a corresponding entry in
+ * the table otherwise the old vertex value is used.
+ *
+ * @note for small tables this function can be much more efficient than
+ * leftJoinVertices
+ *
+ * @example This function is used to update the vertices with new values
+ * based on external data. For example we could add the out degree to each
+ * vertex record
+ * {{{
+ * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
+ * .mapVertices(v => 0)
+ * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
+ * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
+ * (v, deg) => deg )
+ * }}}
+ *
+ * @todo Should this function be curried to enable type inference? For
+ * example
+ * {{{
+ * graph.joinVertices(tbl)( (v, row) => row )
+ * }}}
+ */
+ def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
+ : Graph[VD, ED] = {
+ ClosureCleaner.clean(mapFunc)
+ val uf = (id: Vid, data: VD, o: Option[U]) => {
+ o match {
+ case Some(u) => mapFunc(id, data, u)
+ case None => data
+ }
+ }
+ ClosureCleaner.clean(uf)
+ graph.outerJoinVertices(table)(uf)
+ }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
new file mode 100644
index 0000000000..065d196ff6
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -0,0 +1,36 @@
+package org.apache.spark.graph
+
+import org.apache.spark.rdd.RDD
+
+
+object Pregel {
+
+ def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
+ vprog: (Vid, VD, A) => VD,
+ sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ mergeMsg: (A, A) => A,
+ initialMsg: A,
+ numIter: Int)
+ : Graph[VD, ED] = {
+
+ var g = graph
+ //var g = graph.cache()
+ var i = 0
+
+ def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
+
+ // Receive the first set of messages
+ g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
+
+ while (i < numIter) {
+ // compute the messages
+ val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
+ // receive the messages
+ g = g.joinVertices(messages)(vprog)
+ // count the iteration
+ i += 1
+ }
+ // Return the final graph
+ g
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
new file mode 100644
index 0000000000..dbfccde8b9
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -0,0 +1,65 @@
+package org.apache.spark.graph.impl
+
+import scala.collection.mutable.ArrayBuilder
+import org.apache.spark.graph._
+
+
+/**
+ * A partition of edges in 3 large columnar arrays.
+ */
+class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
+ val srcIds: Array[Vid],
+ val dstIds: Array[Vid],
+ val data: Array[ED]
+ ){
+
+ // private var _data: Array[ED] = _
+ // private var _dataBuilder = ArrayBuilder.make[ED]
+
+ // var srcIds = new VertexArrayList
+ // var dstIds = new VertexArrayList
+
+ def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
+
+ def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+ val newData = new Array[ED2](data.size)
+ val edge = new Edge[ED]()
+ for(i <- 0 until data.size){
+ edge.srcId = srcIds(i)
+ edge.dstId = dstIds(i)
+ edge.attr = data(i)
+ newData(i) = f(edge)
+ }
+ new EdgePartition(srcIds, dstIds, newData)
+ }
+
+ def foreach(f: Edge[ED] => Unit) {
+ val edge = new Edge[ED]
+ for(i <- 0 until data.size){
+ edge.srcId = srcIds(i)
+ edge.dstId = dstIds(i)
+ edge.attr = data(i)
+ f(edge)
+ }
+ }
+
+
+ def size: Int = srcIds.size
+
+ def iterator = new Iterator[Edge[ED]] {
+ private val edge = new Edge[ED]
+ private var pos = 0
+
+ override def hasNext: Boolean = pos < EdgePartition.this.size
+
+ override def next(): Edge[ED] = {
+ edge.srcId = srcIds(pos)
+ edge.dstId = dstIds(pos)
+ edge.attr = data(pos)
+ pos += 1
+ edge
+ }
+ }
+}
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
new file mode 100644
index 0000000000..cc3a443fa2
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -0,0 +1,31 @@
+package org.apache.spark.graph.impl
+
+import scala.collection.mutable.ArrayBuilder
+import org.apache.spark.graph._
+
+
+//private[graph]
+class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
+ED: ClassManifest]{
+ val srcIds = new VertexArrayList
+ val dstIds = new VertexArrayList
+ var dataBuilder = ArrayBuilder.make[ED]
+
+
+ /** Add a new edge to the partition. */
+ def add(src: Vid, dst: Vid, d: ED) {
+ srcIds.add(src)
+ dstIds.add(dst)
+ dataBuilder += d
+ }
+
+ def toEdgePartition: EdgePartition[ED] = {
+ new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
+ }
+
+
+}
+
+
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
new file mode 100644
index 0000000000..413177b2da
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -0,0 +1,671 @@
+package org.apache.spark.graph.impl
+
+import scala.collection.JavaConversions._
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArrayBuilder
+import scala.collection.mutable.BitSet
+
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.Partitioner
+import org.apache.spark.HashPartitioner
+import org.apache.spark.util.ClosureCleaner
+
+import org.apache.spark.rdd
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.IndexedRDD
+import org.apache.spark.rdd.RDDIndex
+
+
+import org.apache.spark.graph._
+import org.apache.spark.graph.impl.GraphImpl._
+import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
+
+/**
+ * The Iterator type returned when constructing edge triplets
+ */
+class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
+ val vidToIndex: VertexIdToIndexMap,
+ val vertexArray: Array[VD],
+ val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
+
+ private var pos = 0
+ private val et = new EdgeTriplet[VD, ED]
+
+ override def hasNext: Boolean = pos < edgePartition.size
+ override def next() = {
+ et.srcId = edgePartition.srcIds(pos)
+ // assert(vmap.containsKey(e.src.id))
+ et.srcAttr = vertexArray(vidToIndex(et.srcId))
+ et.dstId = edgePartition.dstIds(pos)
+ // assert(vmap.containsKey(e.dst.id))
+ et.dstAttr = vertexArray(vidToIndex(et.dstId))
+ et.attr = edgePartition.data(pos)
+ pos += 1
+ et
+ }
+
+ override def toList: List[EdgeTriplet[VD, ED]] = {
+ val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
+ val currentEdge = new EdgeTriplet[VD, ED]
+ for (i <- (0 until edgePartition.size)) {
+ currentEdge.srcId = edgePartition.srcIds(i)
+ // assert(vmap.containsKey(e.src.id))
+ currentEdge.srcAttr = vertexArray(vidToIndex(currentEdge.srcId))
+ currentEdge.dstId = edgePartition.dstIds(i)
+ // assert(vmap.containsKey(e.dst.id))
+ currentEdge.dstAttr = vertexArray(vidToIndex(currentEdge.dstId))
+ currentEdge.attr = edgePartition.data(i)
+ lb += currentEdge
+ }
+ lb.toList
+ }
+} // end of Edge Triplet Iterator
+
+
+
+object EdgeTripletBuilder {
+ def makeTriplets[VD: ClassManifest, ED: ClassManifest](
+ localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
+ vTableReplicatedValues: IndexedRDD[Pid, Array[VD]],
+ eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = {
+ val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => {
+ val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next()
+ assert(iter.hasNext == false)
+ new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
+ }
+ ClosureCleaner.clean(iterFun)
+ localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable)
+ .mapPartitions( iterFun ) // end of map partition
+ }
+}
+
+
+// {
+// val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => {
+// val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next()
+// assert(iter.hasNext == false)
+// // Return an iterator that looks up the hash map to find matching
+// // vertices for each edge.
+// new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
+// }
+// ClosureCleaner.clean(iterFun)
+// localVidMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable)
+// .mapPartitions( iterFun ) // end of map partition
+// }
+// }
+
+
+/**
+ * A Graph RDD that supports computation on graphs.
+ */
+class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
+ @transient val vTable: IndexedRDD[Vid, VD],
+ @transient val vid2pid: IndexedRDD[Vid, Array[Pid]],
+ @transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
+ @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]])
+ extends Graph[VD, ED] {
+
+// def this() = this(null,null,null)
+
+
+ /**
+ * (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the
+ * vertex data after it is replicated. Within each partition, it holds a map
+ * from vertex ID to the index where that vertex's attribute is stored. This
+ * index refers to an array in the same partition in vTableReplicatedValues.
+ *
+ * (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data
+ * and is arranged as described above.
+ */
+ @transient val vTableReplicatedValues =
+ createVTableReplicated(vTable, vid2pid, localVidMap)
+
+
+ /** Return a RDD of vertices. */
+ @transient override val vertices: RDD[(Vid, VD)] = vTable
+
+
+ /** Return a RDD of edges. */
+ @transient override val edges: RDD[Edge[ED]] = {
+ eTable.mapPartitions { iter => iter.next()._2.iterator }
+ }
+
+
+ /** Return a RDD that brings edges with its source and destination vertices together. */
+ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
+ EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable)
+
+
+ // {
+ // val iterFun = (iter: Iterator[(Pid, (VertexHashMap[VD], EdgePartition[ED]))]) => {
+ // val (pid, (vmap, edgePartition)) = iter.next()
+ // //assert(iter.hasNext == false)
+ // // Return an iterator that looks up the hash map to find matching
+ // // vertices for each edge.
+ // new EdgeTripletIterator(vmap, edgePartition)
+ // }
+ // ClosureCleaner.clean(iterFun)
+ // vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition
+ // }
+
+
+
+
+ override def cache(): Graph[VD, ED] = {
+ eTable.cache()
+ vid2pid.cache()
+ vTable.cache()
+ this
+ }
+
+
+ override def statistics: Map[String, Any] = {
+ val numVertices = this.numVertices
+ val numEdges = this.numEdges
+ val replicationRatio =
+ vid2pid.map(kv => kv._2.size).sum / vTable.count
+ val loadArray =
+ eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
+ val minLoad = loadArray.min
+ val maxLoad = loadArray.max
+ Map(
+ "Num Vertices" -> numVertices, "Num Edges" -> numEdges,
+ "Replication" -> replicationRatio, "Load Array" -> loadArray,
+ "Min Load" -> minLoad, "Max Load" -> maxLoad)
+ }
+
+
+ override def reverse: Graph[VD, ED] = {
+ val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]]
+ new GraphImpl(vTable, vid2pid, localVidMap, etable)
+ }
+
+
+ override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
+ .asInstanceOf[IndexedRDD[Vid, VD2]]
+ new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
+ }
+
+ override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
+ val newETable = eTable.mapValues(eBlock => eBlock.map(f))
+ .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ }
+
+
+ override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
+ Graph[VD, ED2] = {
+ val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{
+ case ((edgePartition, vidToIndex), vertexArray) =>
+ val et = new EdgeTriplet[VD, ED]
+ edgePartition.map{e =>
+ et.set(e)
+ et.srcAttr = vertexArray(vidToIndex(e.srcId))
+ et.dstAttr = vertexArray(vidToIndex(e.dstId))
+ f(et)
+ }
+ }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ }
+
+ // override def correctEdges(): Graph[VD, ED] = {
+ // val sc = vertices.context
+ // val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
+ // val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
+ // Graph(vertices, newEdges)
+ // }
+
+
+ override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
+
+ /** @todo The following code behaves deterministically on each
+ * vertex predicate but uses additional space. Should we swithc to
+ * this version
+ */
+ // val predGraph = mapVertices(v => (v.data, vpred(v)))
+ // val newETable = predGraph.triplets.filter(t =>
+ // if(v.src.data._2 && v.dst.data._2) {
+ // val src = Vertex(t.src.id, t.src.data._1)
+ // val dst = Vertex(t.dst.id, t.dst.data._1)
+ // epred(new EdgeTriplet[VD, ED](src, dst, t.data))
+ // } else { false })
+
+ // val newVTable = predGraph.vertices.filter(v => v.data._1)
+ // .map(v => (v.id, v.data._1)).indexed()
+
+ // Reuse the partitioner (but not the index) from this graph
+ val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner)
+
+
+ // Restrict the set of edges to those that satisfy the vertex and the edge predicate.
+ val newETable = createETable(
+ triplets.filter(
+ t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t)
+ )
+ .map( t => Edge(t.srcId, t.dstId, t.attr) ),
+ eTable.index.partitioner.numPartitions
+ )
+
+ // Construct the Vid2Pid map. Here we assume that the filter operation
+ // behaves deterministically.
+ // @todo reindex the vertex and edge tables
+ val newVid2Pid = createVid2Pid(newETable, newVTable.index)
+ val newVidMap = createLocalVidMap(newETable)
+
+ new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
+ }
+
+
+ // Because of the edgepartitioner, we know that all edges with the same src and dst
+ // will be in the same partition
+
+ // We will want to keep the same partitioning scheme. Use newGraph() rather than
+ // new GraphImpl()
+ // TODO(crankshaw) is there a better way to do this using RDD.groupBy()
+ // functions?
+
+ override def groupEdgeTriplets[ED2: ClassManifest](
+ f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
+ //override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
+
+ // I think that
+ // myRDD.mapPartitions { part =>
+ // val (vmap, edges) = part.next()
+ // gives me access to the vertex map and the set of
+ // edges within that partition
+
+ // This is what happens during mapPartitions
+ // The iterator iterates over all partitions
+ // val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U])
+
+ // TODO(crankshaw) figure out how to actually get the new Edge RDD and what
+ // type that should have
+ val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter =>
+ // toList lets us operate on all EdgeTriplets in a single partition at once
+ partIter
+ .toList
+ // groups all ETs in this partition that have the same src and dst
+ // Because all ETs with the same src and dst will live on the same
+ // partition due to the EdgePartitioner, this guarantees that these
+ // ET groups will be complete.
+ .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
+ //.groupBy { e => (e.src, e.dst) }
+ // Apply the user supplied supplied edge group function to
+ // each group of edges
+ // The result of this line is Map[(Long, Long, ED2]
+ .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
+ // convert the resulting map back to a list of tuples
+ .toList
+ // TODO(crankshaw) needs an iterator over the tuples?
+ // Why can't I map over the list?
+ .toIterator
+ // map over those tuples that contain src and dst info plus the
+ // new edge data to make my new edges
+ .map { case ((src, dst), data) => Edge(src, dst, data) }
+
+ // How do I convert from a scala map to a list?
+ // I want to be able to apply a function like:
+ // f: (key, value): (K, V) => result: [R]
+ // so that I can transfrom a Map[K, V] to List[R]
+
+ // Maybe look at collections.breakOut
+ // see http://stackoverflow.com/questions/1715681/scala-2-8-breakout
+ // and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
+
+ }
+
+ // @todo eliminate the need to call createETable
+ val newETable = createETable(newEdges,
+ eTable.index.partitioner.numPartitions)
+
+
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+
+ }
+
+
+ override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
+ Graph[VD,ED2] = {
+
+ val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter =>
+ partIter.toList
+ .groupBy { e: Edge[ED] => (e.srcId, e.dstId) }
+ .mapValues { ts => f(ts.toIterator) }
+ .toList
+ .toIterator
+ .map { case ((src, dst), data) => Edge(src, dst, data) }
+ }
+ // @todo eliminate the need to call createETable
+ val newETable = createETable(newEdges,
+ eTable.index.partitioner.numPartitions)
+
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ }
+
+
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // Lower level transformation methods
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ override def mapReduceTriplets[A: ClassManifest](
+ mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+ reduceFunc: (A, A) => A)
+ : RDD[(Vid, A)] = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ // Map and preaggregate
+ val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{
+ case (pid, ((vidToIndex, vertexArray), edgePartition)) =>
+ // We can reuse the vidToIndex map for aggregation here as well.
+ /** @todo Since this has the downside of not allowing "messages" to arbitrary
+ * vertices we should consider just using a fresh map.
+ */
+ val msgArray = new Array[A](vertexArray.size)
+ val msgBS = new BitSet(vertexArray.size)
+ // Iterate over the partition
+ val et = new EdgeTriplet[VD, ED]
+ edgePartition.foreach{e =>
+ et.set(e)
+ et.srcAttr = vertexArray(vidToIndex(e.srcId))
+ et.dstAttr = vertexArray(vidToIndex(e.dstId))
+ mapFunc(et).foreach{ case (vid, msg) =>
+ // verify that the vid is valid
+ assert(vid == et.srcId || vid == et.dstId)
+ val ind = vidToIndex(vid)
+ // Populate the aggregator map
+ if(msgBS(ind)) {
+ msgArray(ind) = reduceFunc(msgArray(ind), msg)
+ } else {
+ msgArray(ind) = msg
+ msgBS(ind) = true
+ }
+ }
+ }
+ // Return the aggregate map
+ vidToIndex.long2IntEntrySet().fastIterator()
+ // Remove the entries that did not receive a message
+ .filter{ entry => msgBS(entry.getValue()) }
+ // Construct the actual pairs
+ .map{ entry =>
+ val vid = entry.getLongKey()
+ val ind = entry.getValue()
+ val msg = msgArray(ind)
+ (vid, msg)
+ }
+ }.partitionBy(vTable.index.rdd.partitioner.get)
+ // do the final reduction reusing the index map
+ IndexedRDD(preAgg, vTable.index, reduceFunc)
+ }
+
+
+ override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
+ (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
+ : Graph[VD2, ED] = {
+ ClosureCleaner.clean(updateF)
+ val newVTable = vTable.leftJoin(updates).mapValuesWithKeys(
+ (vid, vu) => updateF(vid, vu._1, vu._2) )
+ new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
+ }
+
+
+} // end of class GraphImpl
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+object GraphImpl {
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]):
+ GraphImpl[VD,ED] = {
+
+ apply(vertices, edges,
+ vertices.context.defaultParallelism, edges.context.defaultParallelism)
+ }
+
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
+ numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = {
+
+ val vtable = vertices.indexed(numVPart)
+ val etable = createETable(edges, numEPart)
+ val vid2pid = createVid2Pid(etable, vtable.index)
+ val localVidMap = createLocalVidMap(etable)
+ new GraphImpl(vtable, vid2pid, localVidMap, etable)
+ }
+
+
+
+ /**
+ * Create the edge table RDD, which is much more efficient for Java heap storage than the
+ * normal edges data structure (RDD[(Vid, Vid, ED)]).
+ *
+ * The edge table contains multiple partitions, and each partition contains only one RDD
+ * key-value pair: the key is the partition id, and the value is an EdgePartition object
+ * containing all the edges in a partition.
+ */
+ protected def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]], numPartitions: Int)
+ : IndexedRDD[Pid, EdgePartition[ED]] = {
+ val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
+ edges
+ .map { e =>
+ // Random partitioning based on the source vertex id.
+ // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
+ // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
+ val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
+ //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
+
+ // Should we be using 3-tuple or an optimized class
+ MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex({ (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).indexed()
+ }
+
+
+ protected def createVid2Pid[ED: ClassManifest](
+ eTable: IndexedRDD[Pid, EdgePartition[ED]],
+ vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = {
+ val preAgg = eTable.mapPartitions { iter =>
+ val (pid, edgePartition) = iter.next()
+ val vSet = new VertexSet
+ edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
+ vSet.iterator.map { vid => (vid.toLong, pid) }
+ }
+ IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
+ (p: Pid) => ArrayBuffer(p),
+ (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
+ (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
+ .mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]]
+ }
+
+
+ protected def createLocalVidMap[ED: ClassManifest](
+ eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = {
+ eTable.mapValues{ epart =>
+ val vidToIndex = new VertexIdToIndexMap()
+ var i = 0
+ epart.foreach{ e =>
+ if(!vidToIndex.contains(e.srcId)) {
+ vidToIndex.put(e.srcId, i)
+ i += 1
+ }
+ if(!vidToIndex.contains(e.dstId)) {
+ vidToIndex.put(e.dstId, i)
+ i += 1
+ }
+ }
+ vidToIndex
+ }
+ }
+
+
+ protected def createVTableReplicated[VD: ClassManifest](
+ vTable: IndexedRDD[Vid, VD],
+ vid2pid: IndexedRDD[Vid, Array[Pid]],
+ replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]):
+ IndexedRDD[Pid, Array[VD]] = {
+ // Join vid2pid and vTable, generate a shuffle dependency on the joined
+ // result, and get the shuffle id so we can use it on the slave.
+ val msgsByPartition = vTable.zipJoin(vid2pid)
+ .flatMap { case (vid, (vdata, pids)) =>
+ pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
+ }
+ .partitionBy(replicationMap.partitioner.get).cache()
+
+ val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){
+ (mapIter, msgsIter) =>
+ val (IndexedSeq(vidToIndex), bs) = mapIter.next()
+ assert(!mapIter.hasNext)
+ // Populate the vertex array using the vidToIndex map
+ val vertexArray = new Array[VD](vidToIndex.size)
+ for (msg <- msgsIter) {
+ val ind = vidToIndex(msg.data._1)
+ vertexArray(ind) = msg.data._2
+ }
+ Iterator((IndexedSeq(vertexArray), bs))
+ }
+
+ new IndexedRDD(replicationMap.index, newValuesRDD)
+
+ // @todo assert edge table has partitioner
+
+ // val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] =
+ // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
+ // val vidToIndex = new VertexIdToIndexMap
+ // var i = 0
+ // for (msg <- iter) {
+ // vidToIndex.put(msg.data._1, i)
+ // i += 1
+ // }
+ // Array((pid, vidToIndex)).iterator
+ // }, preservesPartitioning = true).indexed(eTable.index)
+
+ // val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] =
+ // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
+ // val vertexArray = ArrayBuilder.make[VD]
+ // for (msg <- iter) {
+ // vertexArray += msg.data._2
+ // }
+ // Array((pid, vertexArray.result)).iterator
+ // }, preservesPartitioning = true).indexed(eTable.index)
+
+ // (localVidMap, vTableReplicatedValues)
+ }
+
+
+ protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ val mixingPrime: Vid = 1125899906842597L
+ (math.abs(src) * mixingPrime).toInt % numParts
+ }
+
+
+
+ /**
+ * This function implements a classic 2D-Partitioning of a sparse matrix.
+ * Suppose we have a graph with 11 vertices that we want to partition
+ * over 9 machines. We can use the following sparse matrix representation:
+ *
+ * __________________________________
+ * v0 | P0 * | P1 | P2 * |
+ * v1 | **** | * | |
+ * v2 | ******* | ** | **** |
+ * v3 | ***** | * * | * |
+ * ----------------------------------
+ * v4 | P3 * | P4 *** | P5 ** * |
+ * v5 | * * | * | |
+ * v6 | * | ** | **** |
+ * v7 | * * * | * * | * |
+ * ----------------------------------
+ * v8 | P6 * | P7 * | P8 * *|
+ * v9 | * | * * | |
+ * v10 | * | ** | * * |
+ * v11 | * <-E | *** | ** |
+ * ----------------------------------
+ *
+ * The edge denoted by E connects v11 with v1 and is assigned to
+ * processor P6. To get the processor number we divide the matrix
+ * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
+ * adjacent to v11 can only be in the first colum of
+ * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
+ * As a consequence we can guarantee that v11 will need to be
+ * replicated to at most 2 * sqrt(numProc) machines.
+ *
+ * Notice that P0 has many edges and as a consequence this
+ * partitioning would lead to poor work balance. To improve
+ * balance we first multiply each vertex id by a large prime
+ * to effectively shuffle the vertex locations.
+ *
+ * One of the limitations of this approach is that the number of
+ * machines must either be a perfect square. We partially address
+ * this limitation by computing the machine assignment to the next
+ * largest perfect square and then mapping back down to the actual
+ * number of machines. Unfortunately, this can also lead to work
+ * imbalance and so it is suggested that a perfect square is used.
+ *
+ *
+ */
+ protected def edgePartitionFunction2D(src: Vid, dst: Vid,
+ numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
+ val mixingPrime: Vid = 1125899906842597L
+ val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+ }
+
+
+ /**
+ * Assign edges to an aribtrary machine corresponding to a
+ * random vertex cut.
+ */
+ protected def randomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ math.abs((src, dst).hashCode()) % numParts
+ }
+
+
+ /**
+ * @todo(crankshaw) how does this effect load balancing?
+ */
+ protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
+ numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
+ val mixingPrime: Vid = 1125899906842597L
+ // Partitions by canonical edge direction
+ val src = math.min(srcOrig, dstOrig)
+ val dst = math.max(srcOrig, dstOrig)
+ val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+ }
+
+} // end of object GraphImpl
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
new file mode 100644
index 0000000000..b7bbf257a4
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -0,0 +1,49 @@
+package org.apache.spark.graph.impl
+
+import org.apache.spark.Partitioner
+import org.apache.spark.graph.Pid
+import org.apache.spark.rdd.{ShuffledRDD, RDD}
+
+
+/**
+ * A message used to send a specific value to a partition.
+ * @param partition index of the target partition.
+ * @param data value to send
+ */
+class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
+ @transient var partition: Pid,
+ var data: T)
+ extends Product2[Pid, T] {
+
+ override def _1 = partition
+
+ override def _2 = data
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]]
+}
+
+/**
+ * Companion object for MessageToPartition.
+ */
+object MessageToPartition {
+ def apply[T](partition: Pid, value: T) = new MessageToPartition(partition, value)
+}
+
+
+class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
+
+ /**
+ * Return a copy of the RDD partitioned using the specified partitioner.
+ */
+ def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
+ new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner)
+ }
+
+}
+
+
+object MessageToPartitionRDDFunctions {
+ implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = {
+ new MessageToPartitionRDDFunctions(rdd)
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
new file mode 100644
index 0000000000..4627c3566c
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -0,0 +1,25 @@
+package org.apache.spark
+
+package object graph {
+
+ type Vid = Long
+ type Pid = Int
+
+ type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T]
+ type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet
+ type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
+ // @todo replace with rxin's fast hashmap
+ type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+
+ /**
+ * Return the default null-like value for a data type T.
+ */
+ def nullValue[T] = null.asInstanceOf[T]
+
+
+ private[graph]
+ case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
+ @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
+ var _1: U, var _2: V)
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala
new file mode 100644
index 0000000000..eaff27a33e
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala
@@ -0,0 +1,76 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+// package org.apache.spark.graph.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+
+// import org.apache.spark.examples.bagel
+// //import org.apache.spark.bagel.examples._
+// import org.apache.spark.graph._
+
+
+// object BagelTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala
new file mode 100644
index 0000000000..01bd968550
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala
@@ -0,0 +1,75 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+
+// package org.apache.spark.graph.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+// import org.apache.spark.bagel.examples._
+// import org.apache.spark.graph._
+
+
+// object SparkTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala
new file mode 100644
index 0000000000..bc00ce2151
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala
@@ -0,0 +1,114 @@
+package org.apache.spark.graph.util
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.util.Utils
+
+import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
+import org.objectweb.asm.Opcodes._
+
+
+
+private[spark] object BytecodeUtils {
+
+ /**
+ * Test whether the given closure invokes the specified method in the specified class.
+ */
+ def invokedMethod(closure: AnyRef, targetClass: Class[_], targetMethod: String): Boolean = {
+ if (_invokedMethod(closure.getClass, "apply", targetClass, targetMethod)) {
+ true
+ } else {
+ // look at closures enclosed in this closure
+ for (f <- closure.getClass.getDeclaredFields
+ if f.getType.getName.startsWith("scala.Function")) {
+ f.setAccessible(true)
+ if (invokedMethod(f.get(closure), targetClass, targetMethod)) {
+ return true
+ }
+ }
+ return false
+ }
+ }
+
+ private def _invokedMethod(cls: Class[_], method: String,
+ targetClass: Class[_], targetMethod: String): Boolean = {
+
+ val seen = new HashSet[(Class[_], String)]
+ var stack = List[(Class[_], String)]((cls, method))
+
+ while (stack.nonEmpty) {
+ val (c, m) = stack.head
+ stack = stack.tail
+ seen.add((c, m))
+ val finder = new MethodInvocationFinder(c.getName, m)
+ getClassReader(c).accept(finder, 0)
+ for (classMethod <- finder.methodsInvoked) {
+ //println(classMethod)
+ if (classMethod._1 == targetClass && classMethod._2 == targetMethod) {
+ return true
+ } else if (!seen.contains(classMethod)) {
+ stack = classMethod :: stack
+ }
+ }
+ }
+ return false
+ }
+
+ /**
+ * Get an ASM class reader for a given class from the JAR that loaded it.
+ */
+ private def getClassReader(cls: Class[_]): ClassReader = {
+ // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
+ val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
+ val resourceStream = cls.getResourceAsStream(className)
+ // todo: Fixme - continuing with earlier behavior ...
+ if (resourceStream == null) return new ClassReader(resourceStream)
+
+ val baos = new ByteArrayOutputStream(128)
+ Utils.copyStream(resourceStream, baos, true)
+ new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+ }
+
+ /**
+ * Given the class name, return whether we should look into the class or not. This is used to
+ * skip examing a large quantity of Java or Scala classes that we know for sure wouldn't access
+ * the closures. Note that the class name is expected in ASM style (i.e. use "/" instead of ".").
+ */
+ private def skipClass(className: String): Boolean = {
+ val c = className
+ c.startsWith("java/") || c.startsWith("scala/") || c.startsWith("javax/")
+ }
+
+ /**
+ * Find the set of methods invoked by the specified method in the specified class.
+ * For example, after running the visitor,
+ * MethodInvocationFinder("spark/graph/Foo", "test")
+ * its methodsInvoked variable will contain the set of methods invoked directly by
+ * Foo.test(). Interface invocations are not returned as part of the result set because we cannot
+ * determine the actual metod invoked by inspecting the bytecode.
+ */
+ private class MethodInvocationFinder(className: String, methodName: String)
+ extends ClassVisitor(ASM4) {
+
+ val methodsInvoked = new HashSet[(Class[_], String)]
+
+ override def visitMethod(access: Int, name: String, desc: String,
+ sig: String, exceptions: Array[String]): MethodVisitor = {
+ if (name == methodName) {
+ new MethodVisitor(ASM4) {
+ override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
+ if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) {
+ if (!skipClass(owner)) {
+ methodsInvoked.add((Class.forName(owner.replace("/", ".")), name))
+ }
+ }
+ }
+ }
+ } else {
+ null
+ }
+ }
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
new file mode 100644
index 0000000000..d75a678b26
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -0,0 +1,252 @@
+package org.apache.spark.graph.util
+
+import util._
+import math._
+import scala.annotation.tailrec
+//import scala.collection.mutable
+
+
+import org.apache.spark._
+import org.apache.spark.serializer._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.graph.Graph
+import org.apache.spark.graph.Edge
+import org.apache.spark.graph.impl.GraphImpl
+
+
+// TODO(crankshaw) I might want to pull at least RMAT out into a separate class.
+// Might simplify the code to have classwide variables and such.
+object GraphGenerators {
+
+ val RMATa = 0.45
+ val RMATb = 0.15
+ val RMATc = 0.15
+ val RMATd = 0.25
+
+ def main(args: Array[String]) {
+
+
+ val serializer = "org.apache.spark.serializer.KryoSerializer"
+ System.setProperty("spark.serializer", serializer)
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
+ val host = "local[4]"
+ val sc = new SparkContext(host, "Lognormal graph generator")
+
+ val lnGraph = logNormalGraph(sc, 10000)
+
+ val rmat = rmatGraph(sc, 1000, 3000)
+
+ //for (v <- lnGraph.vertices) {
+ // println(v.id + ":\t" + v.data)
+ //}
+
+ val times = 100000
+ //val nums = (1 to times).flatMap { n => List(sampleLogNormal(4.0, 1.3, times)) }.toList
+ //val avg = nums.sum / nums.length
+ //val sumSquares = nums.foldLeft(0.0) {(total, next) =>
+ // (total + math.pow((next - avg), 2)) }
+ //val stdev = math.sqrt(sumSquares/(nums.length - 1))
+
+ //println("avg: " + avg + "+-" + stdev)
+
+
+ //for (i <- 1 to 1000) {
+ // println(sampleLogNormal(4.0, 1.3, 1000))
+ //}
+
+ sc.stop()
+
+ }
+
+
+ // Right now it just generates a bunch of edges where
+ // the edge data is the weight (default 1)
+ def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
+ // based on Pregel settings
+ val mu = 4
+ val sigma = 1.3
+ //val vertsAndEdges = (0 until numVertices).flatMap { src => {
+
+ val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{
+ src => (src, sampleLogNormal(mu, sigma, numVertices))
+ }
+
+ val edges = vertices.flatMap{
+ v => generateRandomEdges(v._1.toInt, v._2, numVertices)
+ }
+
+ GraphImpl(vertices, edges)
+ //println("Vertices:")
+ //for (v <- vertices) {
+ // println(v.id)
+ //}
+
+ //println("Edges")
+ //for (e <- edges) {
+ // println(e.src, e.dst, e.data)
+ //}
+
+ }
+
+
+ def generateRandomEdges(src: Int, numEdges: Int, maxVid: Int): Array[Edge[Int]] = {
+ val rand = new Random()
+ var dsts: Set[Int] = Set()
+ while (dsts.size < numEdges) {
+ val nextDst = rand.nextInt(maxVid)
+ if (nextDst != src) {
+ dsts += nextDst
+ }
+ }
+ dsts.map {dst => Edge[Int](src, dst, 1) }.toArray
+ }
+
+
+ /**
+ * Randomly samples from a log normal distribution
+ * whose corresponding normal distribution has the
+ * the given mean and standard deviation. It uses
+ * the formula X = exp(m+s*Z) where m, s are the
+ * mean, standard deviation of the lognormal distribution
+ * and Z~N(0, 1). In this function,
+ * m = e^(mu+sigma^2/2) and
+ * s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))].
+ *
+ * @param mu the mean of the normal distribution
+ * @param sigma the standard deviation of the normal distribution
+ * @param macVal exclusive upper bound on the value of the sample
+ */
+ def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
+ val rand = new Random()
+ val m = math.exp(mu+(sigma*sigma)/2.0)
+ val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma))
+ // Z ~ N(0, 1)
+ var X: Double = maxVal
+
+ while (X >= maxVal) {
+ val Z = rand.nextGaussian()
+ //X = math.exp((m + s*Z))
+ X = math.exp((mu + sigma*Z))
+ }
+ math.round(X.toFloat)
+ }
+
+
+
+ def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): GraphImpl[Int, Int] = {
+ // let N = requestedNumVertices
+ // the number of vertices is 2^n where n=ceil(log2[N])
+ // This ensures that the 4 quadrants are the same size at all recursion levels
+ val numVertices = math.round(math.pow(2.0, math.ceil(math.log(requestedNumVertices)/math.log(2.0)))).toInt
+ var edges: Set[Edge[Int]] = Set()
+ while (edges.size < numEdges) {
+ if (edges.size % 100 == 0) {
+ println(edges.size + " edges")
+ }
+ edges += addEdge(numVertices)
+
+ }
+ val graph = outDegreeFromEdges(sc.parallelize(edges.toList))
+ graph
+
+ }
+
+ def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
+
+ val vertices = edges.flatMap { edge => List((edge.srcId, 1)) }
+ .reduceByKey(_ + _)
+ .map{ case (vid, degree) => (vid, degree) }
+ GraphImpl(vertices, edges)
+ }
+
+ /**
+ * @param numVertices Specifies the total number of vertices in the graph (used to get
+ * the dimensions of the adjacency matrix
+ */
+ def addEdge(numVertices: Int): Edge[Int] = {
+ //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
+ val v = math.round(numVertices.toFloat/2.0).toInt
+
+ val (src, dst) = chooseCell(v, v, v)
+ Edge[Int](src, dst, 1)
+ }
+
+
+ /**
+ * This method recursively subdivides the the adjacency matrix into quadrants
+ * until it picks a single cell. The naming conventions in this paper match
+ * those of the R-MAT paper. There are a power of 2 number of nodes in the graph.
+ * The adjacency matrix looks like:
+ *
+ * dst ->
+ * (x,y) *************** _
+ * | | | |
+ * | a | b | |
+ * src | | | |
+ * | *************** | T
+ * \|/ | | | |
+ * | c | d | |
+ * | | | |
+ * *************** -
+ *
+ * where this represents the subquadrant of the adj matrix currently being
+ * subdivided. (x,y) represent the upper left hand corner of the subquadrant,
+ * and T represents the side length (guaranteed to be a power of 2).
+ *
+ * After choosing the next level subquadrant, we get the resulting sets
+ * of parameters:
+ * quad = a, x'=x, y'=y, T'=T/2
+ * quad = b, x'=x+T/2, y'=y, T'=T/2
+ * quad = c, x'=x, y'=y+T/2, T'=T/2
+ * quad = d, x'=x+T/2, y'=y+T/2, T'=T/2
+ *
+ * @param src is the
+ */
+ @tailrec
+ def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = {
+ if (t <= 1)
+ (x,y)
+ else {
+ val newT = math.round(t.toFloat/2.0).toInt
+ pickQuadrant(RMATa, RMATb, RMATc, RMATd) match {
+ case 0 => chooseCell(x, y, newT)
+ case 1 => chooseCell(x+newT, y, newT)
+ case 2 => chooseCell(x, y+newT, newT)
+ case 3 => chooseCell(x+newT, y+newT, newT)
+ }
+ }
+ }
+
+ // TODO(crankshaw) turn result into an enum (or case class for pattern matching}
+ def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = {
+ if (a+b+c+d != 1.0) {
+ throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0")
+ }
+ val rand = new Random()
+ val result = rand.nextDouble()
+ result match {
+ case x if x < a => 0 // 0 corresponds to quadrant a
+ case x if (x >= a && x < a+b) => 1 // 1 corresponds to b
+ case x if (x >= a+b && x < a+b+c) => 2 // 2 corresponds to c
+ case _ => 3 // 3 corresponds to d
+ }
+ }
+
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala
new file mode 100644
index 0000000000..cb18ef3d26
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.graph.util
+
+
+object HashUtils {
+
+ /**
+ * Compute a 64-bit hash value for the given string.
+ * See http://stackoverflow.com/questions/1660501/what-is-a-good-64bit-hash-function-in-java-for-textual-strings
+ */
+ def hash(str: String): Long = {
+ var h = 1125899906842597L
+ val len = str.length
+ var i = 0
+
+ while (i < len) {
+ h = 31 * h + str(i)
+ i += 1
+ }
+ h
+ }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
new file mode 100644
index 0000000000..145be3c126
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -0,0 +1,104 @@
+package org.apache.spark.graph
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graph.LocalSparkContext._
+
+
+class GraphSuite extends FunSuite with LocalSparkContext {
+
+// val sc = new SparkContext("local[4]", "test")
+
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+
+ test("Graph Creation") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
+ val edges = sc.parallelize(rawEdges)
+ val graph = Graph(edges)
+ assert( graph.edges.count() === rawEdges.size )
+ }
+ }
+
+ test("aggregateNeighbors") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val star = Graph(sc.parallelize(List((0, 1), (0, 2), (0, 3))))
+
+ val indegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.In)// .map((vid, attr) => (vid, attr._2.getOrElse(0)))
+ assert(indegrees.collect().toSet === Set((1, 1), (2, 1), (3, 1))) // (0, 0),
+
+ val outdegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.Out) //.map((vid, attr) => (vid, attr._2.getOrElse(0)))
+ assert(outdegrees.collect().toSet === Set((0, 3))) //, (1, 0), (2, 0), (3, 0)))
+
+ val noVertexValues = star.aggregateNeighbors[Int](
+ (vid: Vid, edge: EdgeTriplet[Int, Int]) => None,
+ (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
+ EdgeDirection.In)//.map((vid, attr) => (vid, attr))
+ assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)] ) // ((0, None), (1, None), (2, None), (3, None)))
+ }
+ }
+
+ /* test("joinVertices") {
+ sc = new SparkContext("local", "test")
+ val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"), Vertex(3, "three")), 2)
+ val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
+ val g: Graph[String, String] = new GraphImpl(vertices, edges)
+
+ val tbl = sc.parallelize(Seq((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl, (v: Vertex[String], u: Int) => v.data + u)
+
+ val v = g1.vertices.collect().sortBy(_.id)
+ assert(v(0).data === "one10")
+ assert(v(1).data === "two20")
+ assert(v(2).data === "three")
+
+ val e = g1.edges.collect()
+ assert(e(0).data === "onetwo")
+ }
+ */
+
+// test("graph partitioner") {
+// sc = new SparkContext("local", "test")
+// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two")))
+// val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge")))
+// var g = Graph(vertices, edges)
+//
+// g = g.withPartitioner(4, 7)
+// assert(g.numVertexPartitions === 4)
+// assert(g.numEdgePartitions === 7)
+//
+// g = g.withVertexPartitioner(5)
+// assert(g.numVertexPartitions === 5)
+//
+// g = g.withEdgePartitioner(8)
+// assert(g.numEdgePartitions === 8)
+//
+// g = g.mapVertices(x => x)
+// assert(g.numVertexPartitions === 5)
+// assert(g.numEdgePartitions === 8)
+//
+// g = g.mapEdges(x => x)
+// assert(g.numVertexPartitions === 5)
+// assert(g.numEdgePartitions === 8)
+//
+// val updates = sc.parallelize(Seq((1, " more")))
+// g = g.updateVertices(
+// updates,
+// (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
+// assert(g.numVertexPartitions === 5)
+// assert(g.numEdgePartitions === 8)
+//
+// g = g.reverse
+// assert(g.numVertexPartitions === 5)
+// assert(g.numEdgePartitions === 8)
+//
+// }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
new file mode 100644
index 0000000000..4a0155b6bd
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
@@ -0,0 +1,44 @@
+package org.apache.spark.graph
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkContext
+
+
+/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
+trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
+
+ @transient var sc: SparkContext = _
+
+ override def afterEach() {
+ resetSparkContext()
+ super.afterEach()
+ }
+
+ def resetSparkContext() = {
+ if (sc != null) {
+ LocalSparkContext.stop(sc)
+ sc = null
+ }
+ }
+
+}
+
+object LocalSparkContext {
+ def stop(sc: SparkContext) {
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+
+ /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
+ def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+ try {
+ f(sc)
+ } finally {
+ stop(sc)
+ }
+ }
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala
new file mode 100644
index 0000000000..d85e877ddf
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala
@@ -0,0 +1,93 @@
+package org.apache.spark.graph.util
+
+import org.scalatest.FunSuite
+
+
+class BytecodeUtilsSuite extends FunSuite {
+
+ import BytecodeUtilsSuite.TestClass
+
+ test("closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+
+ val c2 = {e: TestClass => println(e.foo); println(e.bar); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+
+ val c3 = {e: TestClass => println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ val c3 = {e: TestClass => c2(e) }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method") {
+ def zoo(e: TestClass) {
+ println(e.baz)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method which uses another closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass) {
+ c2(e)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("nested closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass, c: TestClass => Unit) {
+ c(e)
+ }
+ val c1 = {e: TestClass => zoo(e, c2)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ // The following doesn't work yet, because the byte code doesn't contain any information
+ // about what exactly "c" is.
+// test("invoke interface") {
+// val c1 = {e: TestClass => c(e)}
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+// }
+
+ private val c = {e: TestClass => println(e.baz)}
+}
+
+
+object BytecodeUtilsSuite {
+ class TestClass(val foo: Int, val bar: Long) {
+ def baz: Boolean = false
+ }
+}
diff --git a/pom.xml b/pom.xml
index 5ad7b1befb..aa380ec2ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
<modules>
<module>core</module>
<module>bagel</module>
+ <module>graph</module>
<module>examples</module>
<module>mllib</module>
<module>tools</module>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f2bbe5358f..079e698ea0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -42,15 +42,17 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings)
- .dependsOn(core, bagel, mllib)
+ .dependsOn(core, graph, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming)
+ .dependsOn(core, mllib, graph, bagel, streaming)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
+ lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core)
+
lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
@@ -58,7 +60,7 @@ object SparkBuild extends Build {
lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
- .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
+ .dependsOn(core, graph, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
// A configuration to set an alternative publishLocalConfiguration
lazy val MavenCompile = config("m2r") extend(Compile)
@@ -75,7 +77,7 @@ object SparkBuild extends Build {
lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val allProjects = Seq[ProjectReference](
- core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
+ core, repl, examples, graph, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -259,6 +261,10 @@ object SparkBuild extends Build {
name := "spark-tools"
)
+ def graphSettings = sharedSettings ++ Seq(
+ name := "spark-graphx"
+ )
+
def bagelSettings = sharedSettings ++ Seq(
name := "spark-bagel"
)