diff options
40 files changed, 5248 insertions, 54 deletions
@@ -1,3 +1,7 @@ +# GraphX Branch of Spark + +This is experimental code for the Apache spark project. + # Apache Spark Lightning-Fast Cluster Computing - <http://spark.incubator.apache.org/> diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index c7819d4932..6ad1ca6ef8 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -30,6 +30,7 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi + # Build up classpath CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf" if [ -f "$FWDIR/RELEASE" ]; then @@ -45,6 +46,7 @@ if [[ $SPARK_TESTING == 1 ]]; then CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/graph/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" fi diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0aafc0a2fc..b3a2cb39fc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -959,7 +959,7 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = - new PairRDDFunctions(rdd) + rdd.pairRDDFunctions implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 29968c273c..9b8384bcbb 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,6 +156,7 @@ object SparkEnv extends Logging { val serializer = serializerManager.setDefault( System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) + logInfo("spark.serializer is " + System.getProperty("spark.serializer")) val closureSerializer = serializerManager.get( System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index a6518abf45..2f94ae5fa8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -264,8 +264,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, partitioner)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -275,6 +278,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -287,6 +292,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -325,16 +332,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other)) + } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, numPartitions)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -343,6 +356,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -354,6 +369,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * into `numPartitions` partitions. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -365,6 +382,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -376,6 +395,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD into the given number of partitions. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -412,55 +433,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) - + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) + } /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.groupWith(other))) + } /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) + } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala new file mode 100644 index 0000000000..5f95559f15 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -0,0 +1,650 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.nio.ByteBuffer + + +import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.BitSet + + +import org.apache.spark._ +import org.apache.spark.rdd._ +import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner._ + +import org.apache.spark.storage.StorageLevel + + + + + + +/** + * The BlockIndex is the internal map structure used inside the index + * of the IndexedRDD. + */ +class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int] + + +/** + * The RDDIndex is an opaque type used to represent the organization + * of values in an RDD + */ +class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) { + def persist(newLevel: StorageLevel): RDDIndex[K] = { + rdd.persist(newLevel) + return this + } + + def partitioner: Partitioner = rdd.partitioner.get +} + + + +/** + * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and + * organizing the values to enable faster join operations. + * + * In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD + * exposes an index member which can be used to "key" other IndexedRDDs + * + */ +class IndexedRDD[K: ClassManifest, V: ClassManifest]( + @transient val index: RDDIndex[K], + @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) + extends RDD[(K, V)](index.rdd.context, + List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { + + + /** + * An internal representation which joins the block indices with the values + */ + protected[spark] val tuples = + new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) + + + /** + * The partitioner is defined by the index + */ + override val partitioner = index.rdd.partitioner + + + /** + * The actual partitions are defined by the tuples. + */ + override def getPartitions: Array[Partition] = tuples.getPartitions + + + /** + * The preferred locations are computed based on the preferred locations of the tuples. + */ + override def getPreferredLocations(s: Partition): Seq[String] = + tuples.getPreferredLocations(s) + + + /** + * Caching an IndexedRDD causes the index and values to be cached separately. + */ + override def persist(newLevel: StorageLevel): RDD[(K,V)] = { + index.persist(newLevel) + valuesRDD.persist(newLevel) + return this + } + + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = { + val cleanF = index.rdd.context.clean(f) + val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = + valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + for ( ind <- bs ) { + newValues(ind) = f(values(ind)) + } + (newValues.toIndexedSeq, bs) + }, preservesPartitioning = true) + new IndexedRDD[K,U](index, newValuesRDD) + } + + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = { + val cleanF = index.rdd.context.clean(f) + val newValues: RDD[ (IndexedSeq[U], BitSet) ] = + index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newValues: Array[U] = new Array[U](oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + if (bs(i)) { newValues(i) = f(k, oldValues(i)) } + } + Array((newValues.toIndexedSeq, bs)).iterator + } + new IndexedRDD[K,U](index, newValues) + } + + + def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = { + if(index != other.index) { + throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") + } + val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newBS = thisBS & otherBS + val newValues = thisValues.view.zip(otherValues) + Iterator((newValues.toIndexedSeq, newBS)) + } + new IndexedRDD(index, newValuesRDD) + } + + + def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = { + if(index != other.index) { + throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") + } + val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val otherOption = otherValues.view.zipWithIndex + .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None } + val newValues = thisValues.view.zip(otherOption) + Iterator((newValues.toIndexedSeq, thisBS)) + } + new IndexedRDD(index, newValuesRDD) + } + + + + def leftJoin[W: ClassManifest]( + other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a): + IndexedRDD[K, (V, Option[W]) ] = { + val cleanMerge = index.rdd.context.clean(merge) + + other match { + case other: IndexedRDD[_, _] if index == other.index => { + leftZipJoin(other) + } + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val newW = new Array[W](thisValues.size) + // track which values are matched with values in other + val wBS = new BitSet(thisValues.size) + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + if(wBS(ind)) { + newW(ind) = cleanMerge(newW(ind), w) + } else { + newW(ind) = w + wBS(ind) = true + } + } + } + + val otherOption = newW.view.zipWithIndex + .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } + val newValues = thisValues.view.zip(otherOption) + + Iterator((newValues.toIndexedSeq, thisBS)) + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } + + + + // + // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { + // if(index != other.index) { + // throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!") + // } + // index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + // (thisIndexIter, thisIter, otherIter) => + // val index = thisIndexIter.next() + // assert(!thisIndexIter.hasNext) + // val (thisValues, thisBS) = thisIter.next() + // assert(!thisIter.hasNext) + // val (otherValues, otherBS) = otherIter.next() + // assert(!otherIter.hasNext) + // val newBS = thisBS & otherBS + // index.iterator.filter{ case (k,i) => newBS(i) }.map{ + // case (k,i) => (k, (thisValues(i), otherValues(i))) + // } + // } + // } + + +/* This is probably useful but we are not using it + def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest]( + other: RDD[(K,W)])( + f: (K, V, W) => Z, + merge: (Z,Z) => Z = (a:Z, b:Z) => a): + IndexedRDD[K,Z] = { + val cleanF = index.rdd.context.clean(f) + val cleanMerge = index.rdd.context.clean(merge) + other match { + case other: IndexedRDD[_, _] if index == other.index => { + val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + (thisIndexIter, thisIter, otherIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newValues = new Array[Z](thisValues.size) + val newBS = thisBS & otherBS + for( (k,i) <- index ) { + if (newBS(i)) { + newValues(i) = cleanF(k, thisValues(i), otherValues(i)) + } + } + List((newValues, newBS)).iterator + } + new IndexedRDD(index, newValues) + } + + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + + val newValues = index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + + val newValues = new Array[Z](thisValues.size) + // track which values are matched with values in other + val tempBS = new BitSet(thisValues.size) + + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + val result = cleanF(k, thisValues(ind), w) + if(tempBS(ind)) { + newValues(ind) = cleanMerge(newValues(ind), result) + } else { + newValues(ind) = result + tempBS(ind) = true + } + } + } + List((newValues, tempBS)).iterator + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } +*/ + +/* + def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest]( + other: RDD[(K,W)])( + f: (K, V, Option[W]) => Z, + merge: (Z,Z) => Z = (a:Z, b:Z) => a): + IndexedRDD[K,Z] = { + val cleanF = index.rdd.context.clean(f) + val cleanMerge = index.rdd.context.clean(merge) + other match { + case other: IndexedRDD[_, _] if index == other.index => { + val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){ + (thisIndexIter, thisIter, otherIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + val newValues = new Array[Z](thisValues.size) + for( (k,i) <- index ) { + if (thisBS(i)) { + val otherVal = if(otherBS(i)) Some(otherValues(i)) else None + newValues(i) = cleanF(k, thisValues(i), otherVal) + } + } + List((newValues, thisBS)).iterator + } + new IndexedRDD(index, newValues) + } + + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) other + else other.partitionBy(partitioner) + val newValues = index.rdd.zipPartitions(valuesRDD, other) { + (thisIndexIter, thisIter, tuplesIter) => + val index = thisIndexIter.next() + assert(!thisIndexIter.hasNext) + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + + val newValues = new Array[Z](thisValues.size) + // track which values are matched with values in other + val tempBS = new BitSet(thisValues.size) + + for( (k, w) <- tuplesIter if index.contains(k) ) { + val ind = index.get(k) + if(thisBS(ind)) { + val result = cleanF(k, thisValues(ind), Option(w)) + if(tempBS(ind)) { + newValues(ind) = cleanMerge(newValues(ind), result) + } else { + newValues(ind) = result + tempBS(ind) = true + } + } + } + + // Process remaining keys in lef join + for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) { + newValues(ind) = cleanF(k, thisValues(ind), None) + } + List((newValues, thisBS)).iterator + } // end of newValues + new IndexedRDD(index, newValues) + } + } + } + +*/ + + + /** + * The IndexedRDD has its own optimized version of the pairRDDFunctions. + */ + override def pairRDDFunctions[K1, V1]( + implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): + PairRDDFunctions[K1, V1] = { + new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]]) + } + + + override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = { + val cleanF = index.rdd.context.clean(f) + val newValues = index.rdd.zipPartitions(valuesRDD){ + (keysIter, valuesIter) => + val index = keysIter.next() + assert(keysIter.hasNext() == false) + val (oldValues, bs) = valuesIter.next() + assert(valuesIter.hasNext() == false) + // Allocate the array to store the results into + val newBS = new BitSet(oldValues.size) + // Populate the new Values + for( (k,i) <- index ) { + newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) + } + Array((oldValues, newBS)).iterator + } + new IndexedRDD[K,V](index, newValues) + } + + + /** + * Provide the RDD[(K,V)] equivalent output. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => + // Walk the index to construct the key, value pairs + indexMap.iterator + // Extract rows with key value pairs and indicators + .map{ case (k, ind) => (bs(ind), k, ind) } + // Remove tuples that aren't actually present in the array + .filter( _._1 ) + // Extract the pair (removing the indicator from the tuple) + .map( x => (x._2, values(x._3) ) ) + } + } + +} // End of IndexedRDD + + + + +object IndexedRDD { + + + def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] = + apply(rdd, (a:V, b:V) => a ) + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = { + // Preaggregate and shuffle if necessary + // Preaggregation. + val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + val partitioner = new HashPartitioner(rdd.partitions.size) + val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) + + val groups = preAgg.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + val values = new ArrayBuffer[V] + val bs = new BitSet + for ((k,v) <- iter) { + if(!indexMap.contains(k)) { + val ind = indexMap.size + indexMap.put(k, ind) + values.append(v) + bs(ind) = true + } else { + val ind = indexMap.get(k) + values(ind) = reduceFunc(values(ind), v) + } + } + Iterator( (indexMap, (values.toIndexedSeq, bs)) ) + }, true).cache + // extract the index and the values + val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) + val values: RDD[(IndexedSeq[V], BitSet)] = + groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + new IndexedRDD[K,V](new RDDIndex(index), values) + } + + + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] = + apply(rdd, index, (a:V,b:V) => a) + + + def apply[K: ClassManifest, V: ClassManifest]( + rdd: RDD[(K,V)], index: RDDIndex[K], + reduceFunc: (V, V) => V): IndexedRDD[K,V] = + apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc) + // { + // // Get the index Partitioner + // val partitioner = index.rdd.partitioner match { + // case Some(p) => p + // case None => throw new SparkException("An index must have a partitioner.") + // } + // // Preaggregate and shuffle if necessary + // val partitioned = + // if (rdd.partitioner != Some(partitioner)) { + // // Preaggregation. + // val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc) + // rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) + // } else { + // rdd + // } + + // // Use the index to build the new values table + // val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + // // There is only one map + // val index = indexIter.next() + // assert(!indexIter.hasNext()) + // val values = new Array[V](index.size) + // val bs = new BitSet(index.size) + // for ((k,v) <- tblIter) { + // if (!index.contains(k)) { + // throw new SparkException("Error: Trying to bind an external index " + + // "to an RDD which contains keys that are not in the index.") + // } + // val ind = index(k) + // if (bs(ind)) { + // values(ind) = reduceFunc(values(ind), v) + // } else { + // values(ind) = v + // bs(ind) = true + // } + // } + // List((values, bs)).iterator + // }) + // new IndexedRDD[K,V](index, values) + // } // end of apply + + + def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + rdd: RDD[(K,V)], + index: RDDIndex[K], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C): IndexedRDD[K,C] = { + // Get the index Partitioner + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Preaggregate and shuffle if necessary + val partitioned = + if (rdd.partitioner != Some(partitioner)) { + // Preaggregation. + val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, + mergeCombiners) + rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner) + } else { + rdd.mapValues(x => createCombiner(x)) + } + + // Use the index to build the new values table + val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + // There is only one map + val index = indexIter.next() + assert(!indexIter.hasNext()) + val values = new Array[C](index.size) + val bs = new BitSet(index.size) + for ((k,c) <- tblIter) { + if (!index.contains(k)) { + throw new SparkException("Error: Trying to bind an external index " + + "to an RDD which contains keys that are not in the index.") + } + val ind = index(k) + if (bs(ind)) { + values(ind) = mergeCombiners(values(ind), c) + } else { + values(ind) = c + bs(ind) = true + } + } + Iterator((values, bs)) + }) + new IndexedRDD(index, values) + } // end of apply + + + /** + * Construct and index of the unique values in a given RDD. + */ + def makeIndex[K: ClassManifest](keys: RDD[K], + partitioner: Option[Partitioner] = None): RDDIndex[K] = { + // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD + // Ugly hack :-(. In order to partition the keys they must have values. + val tbl = keys.mapPartitions(_.map(k => (k, false)), true) + // Shuffle the table (if necessary) + val shuffledTbl = partitioner match { + case None => { + if (tbl.partitioner.isEmpty) { + // @todo: I don't need the boolean its only there to be the second type of the shuffle. + new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) + } else { tbl } + } + case Some(partitioner) => + tbl.partitionBy(partitioner) + } + + val index = shuffledTbl.mapPartitions( iter => { + val indexMap = new BlockIndex[K]() + for ( (k,_) <- iter ){ + if(!indexMap.contains(k)){ + val ind = indexMap.size + indexMap.put(k, ind) + } + } + Iterator(indexMap) + }, true).cache + new RDDIndex(index) + } + +} // end of object IndexedRDD + + + + + diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala new file mode 100644 index 0000000000..fd7c16089d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import scala.collection.mutable.BitSet + +import org.apache.spark._ + + + +class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V]) + extends PairRDDFunctions[K,V](self) { + + /** + * Construct a new IndexedRDD that is indexed by only the keys in the RDD + */ + def reindex(): IndexedRDD[K,V] = IndexedRDD(self) + + + // /** + // * Pass each value in the key-value pair RDD through a map function without changing the keys; + // * this also retains the original RDD's partitioning. + // */ + // override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { + // val cleanF = self.index.rdd.context.clean(f) + // val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + // case (values, bs) => + // val newValues = new Array[U](values.size) + // for ( ind <- bs ) { + // newValues(ind) = f(values(ind)) + // } + // (newValues.toSeq, bs) + // }, preservesPartitioning = true) + // new IndexedRDD[K,U](self.index, newValuesRDD) + // } + + /** + * Pass each value in the key-value pair RDD through a flatMap function without changing the + * keys; this also retains the original RDD's partitioning. + */ + override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{ + case (values, bs) => + val newValues = new Array[U](values.size) + val newBS = new BitSet(values.size) + for ( ind <- bs ) { + val res = f(values(ind)) + if(!res.isEmpty) { + newValues(ind) = res.toIterator.next() + newBS(ind) = true + } + } + (newValues.toIndexedSeq, newBS) + }, preservesPartitioning = true) + new IndexedRDD[K,U](self.index, newValuesRDD) + } + + + /** + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C + * Note that V and C can be different -- for example, one might group an RDD of type + * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: + * + * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) + * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) + * - `mergeCombiners`, to combine two C's into a single one. + */ + override def combineByKey[C: ClassManifest](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true, + serializerClass: String = null): RDD[(K, C)] = { + mapValues(createCombiner) + } + + + // /** + // * Group the values for each key in the RDD into a single sequence. Hash-partitions the + // * resulting RDD with the existing partitioner/parallelism level. + // */ + // override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + // val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + // new IndexedRDD[K, Seq[V]](self.index, newValues) + // } + + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + IndexedRDD[K, (Seq[V], Seq[W])] = { + //RDD[(K, (Seq[V], Seq[W]))] = { + other match { + case other: IndexedRDD[_, _] if self.index == other.index => { + // if both RDDs share exactly the same index and therefore the same super set of keys + // then we simply merge the value RDDs. + // However it is possible that both RDDs are missing a value for a given key in + // which case the returned RDD should have a null value + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + self.valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + + val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) + val newBS = thisBS | otherBS + + for( ind <- newBS ) { + val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + newValues(ind) = (a, b) + } + Iterator((newValues.toIndexedSeq, newBS)) + } + new IndexedRDD(self.index, newValues) + } + case other: IndexedRDD[_, _] + if self.index.rdd.partitioner == other.index.rdd.partitioner => { + // If both RDDs are indexed using different indices but with the same partitioners + // then we we need to first merge the indicies and then use the merged index to + // merge the values. + val newIndex = + self.index.rdd.zipPartitions(other.index.rdd)( + (thisIter, otherIter) => { + val thisIndex = thisIter.next() + assert(!thisIter.hasNext) + val otherIndex = otherIter.next() + assert(!otherIter.hasNext) + val newIndex = new BlockIndex[K]() + // @todo Merge only the keys that correspond to non-null values + // Merge the keys + newIndex.putAll(thisIndex) + newIndex.putAll(otherIndex) + // We need to rekey the index + var ctr = 0 + for (e <- newIndex.entrySet) { + e.setValue(ctr) + ctr += 1 + } + List(newIndex).iterator + }).cache() + // Use the new index along with the this and the other indices to merge the values + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + newIndex.zipPartitions(self.tuples, other.tuples)( + (newIndexIter, thisTuplesIter, otherTuplesIter) => { + // Get the new index for this partition + val newIndex = newIndexIter.next() + assert(!newIndexIter.hasNext) + // Get the corresponding indicies and values for this and the other IndexedRDD + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext) + val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() + assert(!otherTuplesIter.hasNext) + // Preallocate the new Values array + val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) + val newBS = new BitSet(newIndex.size) + + // Lookup the sequences in both submaps + for ((k,ind) <- newIndex) { + // Get the left key + val a = if (thisIndex.contains(k)) { + val ind = thisIndex.get(k) + if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + } else Seq.empty[V] + // Get the right key + val b = if (otherIndex.contains(k)) { + val ind = otherIndex.get(k) + if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + } else Seq.empty[W] + // If at least one key was present then we generate a tuple. + if (!a.isEmpty || !b.isEmpty) { + newValues(ind) = (a, b) + newBS(ind) = true + } + } + Iterator((newValues.toIndexedSeq, newBS)) + }) + new IndexedRDD(new RDDIndex(newIndex), newValues) + } + case _ => { + // Get the partitioner from the index + val partitioner = self.index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) { + other + } else { + new ShuffledRDD[K, W, (K,W)](other, partitioner) + } + // Join the other RDD with this RDD building a new valueset and new index on the fly + val groups = + self.tuples.zipPartitions(otherShuffled)( + (thisTuplesIter, otherTuplesIter) => { + // Get the corresponding indicies and values for this IndexedRDD + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext()) + // Construct a new index + val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] + // Construct a new array Buffer to store the values + val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) + val newBS = new BitSet(thisValues.size) + // populate the newValues with the values in this IndexedRDD + for ((k,i) <- thisIndex) { + if (thisBS(i)) { + newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) + newBS(i) = true + } + } + // Now iterate through the other tuples updating the map + for ((k,w) <- otherTuplesIter){ + if (newIndex.contains(k)) { + val ind = newIndex.get(k) + if(newBS(ind)) { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } else { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newBS(ind) = true + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } + } else { + // update the index + val ind = newIndex.size + newIndex.put(k, ind) + newBS(ind) = true + // Update the values + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) + } + } + // // Finalize the new values array + // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + // newValues.view.map{ + // case null => null + // case (s, ab) => Seq((s, ab.toSeq)) + // }.toSeq + Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) + }).cache() + + // Extract the index and values from the above RDD + val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + + new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) + } + } + } + + +} + +//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { } + + diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 93b78e1232..8a66297f6f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -68,7 +68,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -108,7 +108,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -245,7 +245,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } - new ShuffledRDD[K, V, (K, V)](self, partitioner) + new ShuffledRDD[K, V, (K, V)](self, partitioner) } /** @@ -253,7 +253,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } @@ -265,7 +265,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.iterator.map(v => (v, None)) @@ -281,7 +283,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { @@ -296,7 +298,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -324,7 +326,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } @@ -333,7 +335,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } @@ -343,7 +345,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -353,7 +355,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -363,7 +365,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -373,7 +375,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -392,16 +394,25 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U](f: V => U): RDD[(K, U)] = { + def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { + self.map{ case (k,v) => (k, f(k,v)) } + } + /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } @@ -410,7 +421,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -425,7 +436,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") @@ -441,7 +452,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -449,7 +460,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -458,7 +469,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -466,18 +477,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -698,6 +709,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) */ def values: RDD[V] = self.map(_._2) + + + def indexed(): IndexedRDD[K,V] = IndexedRDD(self) + + def indexed(numPartitions: Int): IndexedRDD[K,V] = + IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions))) + + def indexed(partitioner: Partitioner): IndexedRDD[K,V] = + IndexedRDD(self.partitionBy(partitioner)) + + def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] = + IndexedRDD(self, existingIndex) + + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0355618e43..d14b4c60c7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -805,6 +805,26 @@ abstract class RDD[T: ClassManifest]( return buf.toArray } + + + /** + * For RDD[(K,V)] this function returns a pair-functions object for this RDD + */ + def pairRDDFunctions[K, V]( + implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): + PairRDDFunctions[K, V] = { + new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]]) + } + + + /** + * Construct an index over the unique elements in this RDD. The + * index can then be used to organize a RDD[(T,V)]. + */ + def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = + IndexedRDD.makeIndex(this, partitioner) + + /** * Return the first element in this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 55b25f145a..263ff59ba6 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -63,14 +63,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - try { - Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => - logDebug("Running user registrator: " + regCls) - val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - reg.registerClasses(kryo) - } - } catch { - case _: Exception => println("Failed to register spark.kryo.registrator") + Option(System.getProperty("spark.kryo.registrator")).foreach { regCls => + logDebug("Running user registrator: " + regCls) + val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) } // Register Chill's classes; we do this after our ranges and the user's own classes to let @@ -122,7 +118,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { +private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance with Logging { val kryo = ks.newKryo() // Make these lazy vals to avoid creating a buffer unless we use them diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 2955986fec..5082730ae3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap * instance of the serializer object has been created, the get method returns that instead of * creating a new one. */ -private[spark] class SerializerManager { +private[spark] class SerializerManager extends org.apache.spark.Logging { private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _ diff --git a/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala new file mode 100644 index 0000000000..dadb183bdc --- /dev/null +++ b/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet + +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.rdd.IndexedRDD + +import org.apache.spark.SparkContext._ +import org.apache.spark._ + + + +class IndexedRDDSuite extends FunSuite with SharedSparkContext { + + def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = { + val set = new collection.mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + for (dep <- rdd.dependencies) { + set += dep.rdd + visit(dep.rdd) + } + } + visit(rdd) + set + } + + test("groupByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with duplicates") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with negative key hash codes") { + val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesForMinus1 = groups.find(_._1 == -1).get._2 + assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with many output partitions") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("reduceByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with collectAsMap") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collectAsMap() + assert(sums.size === 2) + assert(sums(1) === 7) + assert(sums(2) === 1) + } + + test("reduceByKey with many output partitons") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10) + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with partitioner") { + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = lineage(sums) + + assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection + } + + + + test("joinIndexVsPair") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinIndexVsIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')), + (4, (-4, 'w')), + (4, (4, 'w')) + )) + } + + + test("join all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("leftOuterJoinIndex") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (4, (-4, Some('w'))), + (3, (1, None)) + )) + } + +test("leftOuterJoinIndextoIndexExternal") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + + test("rightOuterJoin") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("rightOuterJoinIndex2Index") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("rightOuterJoinIndex2Indexshared") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("join with no matches index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + test("join with no matches shared index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + + test("join with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("join with many output partitions and two indices") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + + test("groupWith") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.groupWith(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), + (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), + (3, (ArrayBuffer(1), ArrayBuffer())), + (4, (ArrayBuffer(), ArrayBuffer('w'))) + )) + } + + test("zero-partition RDD") { + val emptyDir = Files.createTempDir() + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.size == 0) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } + + test("keys and values") { + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed() + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } + + test("default partitioner uses partition size") { + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + // test("default partitioner uses largest partitioner indexed to indexed") { + // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed() + // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed() + // val c = a.join(b) + // assert(c.partitions.size === 2000) + // } + + + + test("subtract") { + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed() + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + // test("subtractByKey with narrow dependency") { + // // use a deterministic partitioner + // val p = new Partitioner() { + // def numPartitions = 5 + // def getPartition(key: Any) = key.asInstanceOf[Int] + // } + + // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p)) + // // partitionBy so we have a narrow dependency + // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index) + // // more partitions/no partitioner so a shuffle dependency + // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index) + // val c = a.subtractByKey(b) + // assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + // assert(c.partitioner.get === p) + // } + + test("foldByKey") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val sums = pairs.foldByKey(0)(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("foldByKey with mutable result type") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() + // Fold the values using in-place mutation + val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() + assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) + // Check that the mutable objects in the original RDD were not changed + assert(bufs.collect().toSet === Set( + (1, ArrayBuffer(1)), + (1, ArrayBuffer(2)), + (1, ArrayBuffer(3)), + (1, ArrayBuffer(1)), + (2, ArrayBuffer(1)))) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala new file mode 100644 index 0000000000..3a2ce4e4da --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet + +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.rdd.IndexedRDD + +import org.apache.spark.SparkContext._ +import org.apache.spark._ + + + +class IndexedRDDSuite extends FunSuite with SharedSparkContext { + + def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = { + val set = new collection.mutable.HashSet[RDD[_]] + def visit(rdd: RDD[_]) { + for (dep <- rdd.dependencies) { + set += dep.rdd + visit(dep.rdd) + } + } + visit(rdd) + set + } + + test("groupByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with duplicates") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with negative key hash codes") { + val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed() + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesForMinus1 = groups.find(_._1 == -1).get._2 + assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with many output partitions") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("reduceByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with collectAsMap") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() + val sums = pairs.reduceByKey(_+_).collectAsMap() + assert(sums.size === 2) + assert(sums(1) === 7) + assert(sums(2) === 1) + } + + test("reduceByKey with many output partitons") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10) + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with partitioner") { + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = lineage(sums) + + assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection + } + + + + test("joinIndexVsPair") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinIndexVsIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("joinSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')), + (4, (-4, 'w')), + (4, (4, 'w')) + )) + } + + + test("join all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("leftOuterJoinIndex") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("leftOuterJoinIndextoSharedIndex") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (4, (-4, Some('w'))), + (3, (1, None)) + )) + } + +test("leftOuterJoinIndextoIndexExternal") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + + test("rightOuterJoin") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("rightOuterJoinIndex2Index") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("rightOuterJoinIndex2Indexshared") { + val index = sc.parallelize( 1 to 6 ).makeIndex() + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + + test("join with no matches index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + test("join with no matches shared index") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + + test("join with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("join with many output partitions and two indices") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + + test("groupWith") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) + val joined = rdd1.groupWith(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), + (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), + (3, (ArrayBuffer(1), ArrayBuffer())), + (4, (ArrayBuffer(), ArrayBuffer('w'))) + )) + } + + test("zero-partition RDD") { + val emptyDir = Files.createTempDir() + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.size == 0) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } + + test("keys and values") { + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed() + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } + + test("default partitioner uses partition size") { + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + // test("default partitioner uses largest partitioner indexed to indexed") { + // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed() + // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed() + // val c = a.join(b) + // assert(c.partitions.size === 2000) + // } + + + + test("subtract") { + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed() + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + // test("subtractByKey with narrow dependency") { + // // use a deterministic partitioner + // val p = new Partitioner() { + // def numPartitions = 5 + // def getPartition(key: Any) = key.asInstanceOf[Int] + // } + + // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p)) + // // partitionBy so we have a narrow dependency + // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index) + // // more partitions/no partitioner so a shuffle dependency + // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index) + // val c = a.subtractByKey(b) + // assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + // assert(c.partitioner.get === p) + // } + + test("foldByKey") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val sums = pairs.foldByKey(0)(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("foldByKey with mutable result type") { + val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) + + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) + val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() + // Fold the values using in-place mutation + val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() + assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) + // Check that the mutable objects in the original RDD were not changed + assert(bufs.collect().toSet === Set( + (1, ArrayBuffer(1)), + (1, ArrayBuffer(2)), + (1, ArrayBuffer(3)), + (1, ArrayBuffer(1)), + (2, ArrayBuffer(1)))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index cfafbaf23e..8dd7fb40e8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -31,16 +31,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double)( + def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)( self: PRVertex, messageSum: Option[Double], superstep: Int ): (PRVertex, Array[PRMessage]) = { val newValue = messageSum match { case Some(msgSum) if msgSum != 0 => - 0.15 / numVertices + 0.85 * msgSum + 0.15 + 0.85 * msgSum case _ => self.value } - val terminate = superstep >= 10 + val terminate = superstep >= terminateSteps val outbox: Array[PRMessage] = if (!terminate) diff --git a/graph/pom.xml b/graph/pom.xml new file mode 100644 index 0000000000..1cd9cda98b --- /dev/null +++ b/graph/pom.xml @@ -0,0 +1,106 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.spark-project</groupId> + <artifactId>parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.spark-project</groupId> + <artifactId>spark-graph</artifactId> + <packaging>jar</packaging> + <name>Spark Graph</name> + <url>http://spark-project.org/</url> + + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop1</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop1</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop1</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>hadoop2</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala new file mode 100644 index 0000000000..92632db491 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -0,0 +1,659 @@ +package org.apache.spark.graph + +import org.apache.spark._ + + + +object Analytics extends Logging { + + /** + * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + */ + def pagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + numIter: Int, + resetProb: Double = 0.15) = { + // Compute the out degree of each vertex + val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => (deg.getOrElse(0), 1.0) + } + + println(pagerankGraph.statistics) + + Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)( + (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply + (me_id, edge) => Some(edge.srcAttr._2 / edge.srcAttr._1), // gather + (a: Double, b: Double) => a + b, // merge + 1.0, + numIter).mapVertices{ case (id, (outDeg, r)) => r } + } + + + /** + * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + */ + def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + tol: Float, + maxIter: Int = Integer.MAX_VALUE, + resetProb: Double = 0.15) = { + // Compute the out degree of each vertex + val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){ + (id, data, degIter) => (degIter.sum, 1.0, 1.0) + } + + // Run PageRank + GraphLab.iterate(pagerankGraph)( + (me_id, edge) => edge.srcAttr._2 / edge.srcAttr._1, // gather + (a: Double, b: Double) => a + b, + (id, data, a: Option[Double]) => + (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply + (me_id, edge) => math.abs(edge.srcAttr._3 - edge.srcAttr._2) > tol, // scatter + maxIter).mapVertices { case (vid, data) => data._2 } + } + + + /** + * Compute the connected component membership of each vertex + * and return an RDD with the vertex value containing the + * lowest vertex id in the connected component containing + * that vertex. + */ + def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { + val ccGraph = graph.mapVertices { case (vid, _) => vid } + GraphLab.iterate(ccGraph)( + (me_id, edge) => edge.otherVertexAttr(me_id), // gather + (a: Vid, b: Vid) => math.min(a, b), // merge + (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply + (me_id, edge) => (edge.vertexAttr(me_id) < edge.otherVertexAttr(me_id)), // scatter + numIter, + gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both + ) + } + + def main(args: Array[String]) = { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { + loggers.map{ + loggerName => + val logger = org.apache.log4j.Logger.getLogger(loggerName) + val prevLevel = logger.getLevel() + logger.setLevel(level) + loggerName -> prevLevel + }.toMap + } +// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark")) + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + taskType match { + case "pagerank" => { + + var numIter = Int.MaxValue + var isDynamic = false + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| PageRank |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + if(isDynamic) println(" \t |-> Tolerance: " + tol) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + + val graph = GraphLoader.textFile(sc, fname, a => 1.0F, + minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + + val startTime = System.currentTimeMillis + logInfo("GRAPHX: starting tasks") + logInfo("GRAPHX: Number of vertices " + graph.vertices.count) + logInfo("GRAPHX: Number of edges " + graph.edges.count) + + val pr = Analytics.pagerank(graph, numIter) + // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) + // else Analytics.pagerank(graph, numIter) + logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) ) + if (!outFname.isEmpty) { + println("Saving pageranks of pages to " + outFname) + pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + sc.stop() + } + + case "cc" => { + + var numIter = Int.MaxValue + var numVPart = 4 + var numEPart = 4 + var isDynamic = false + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + //val graph = GraphLoader.textFile(sc, fname, a => 1.0F) + val graph = GraphLoader.textFile(sc, fname, a => 1.0F, + minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache() + val cc = Analytics.connectedComponents(graph, numIter) + //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) + // else Analytics.connectedComponents(graph, numIter) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + + sc.stop() + } +// +// case "shortestpath" => { +// +// var numIter = Int.MaxValue +// var isDynamic = true +// var sources: List[Int] = List.empty +// +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("source", v) => sources ++= List(v.toInt) +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } +// +// +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } +// +// if(sources.isEmpty) { +// println("No sources provided!") +// sys.exit(1) +// } +// +// println("======================================") +// println("| Shortest Path |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// println(" \tNumIter: " + numIter) +// println(" \tSources: [" + sources.mkString(", ") + "]") +// println("======================================") +// +// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") +// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) +// //val sp = Analytics.shortestPath(graph, sources, numIter) +// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) +// // else Analytics.shortestPath(graph, sources, numIter) +// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) +// +// sc.stop() +// } + + + // case "als" => { + + // var numIter = 5 + // var lambda = 0.01 + // var latentK = 10 + // var usersFname = "usersFactors.tsv" + // var moviesFname = "moviesFname.tsv" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("lambda", v) => lambda = v.toDouble + // case ("latentK", v) => latentK = v.toInt + // case ("usersFname", v) => usersFname = v + // case ("moviesFname", v) => moviesFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // println("======================================") + // println("| Alternating Least Squares |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tNumIter: " + numIter) + // println(" \tLambda: " + lambda) + // println(" \tLatentK: " + latentK) + // println(" \tusersFname: " + usersFname) + // println(" \tmoviesFname: " + moviesFname) + // println("======================================") + + // val sc = new SparkContext(host, "ALS(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) + // graph.numVPart = numVPart + // graph.numEPart = numEPart + + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) + // assert(maxUser < minMovie) + + // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache + // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(usersFname) + // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(moviesFname) + + // sc.stop() + // } + + + case _ => { + println("Invalid task type.") + } + } + } + + // /** + // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD + // */ + // def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + // tol: Double, maxIter: Int = 10) = { + // // Compute the out degree of each vertex + // val pagerankGraph = graph.updateVertices[Int, (Int, Double, Double)](graph.outDegrees, + // (vertex, degIter) => (degIter.sum, 1.0, 1.0) + // ) + + // // Run PageRank + // GraphLab.iterateGAS(pagerankGraph)( + // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather + // (a: Double, b: Double) => a + b, + // (vertex, a: Option[Double]) => + // (vertex.data._1, (0.15 + 0.85 * a.getOrElse(0.0)), vertex.data._2), // apply + // (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter + // maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } + // } + + // /** + // * Compute the connected component membership of each vertex + // * and return an RDD with the vertex value containing the + // * lowest vertex id in the connected component containing + // * that vertex. + // */ + // def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { + // val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) } + // GraphLab.iterateGA[Int, ED, Int](ccGraph)( + // (me_id, edge) => edge.otherVertex(me_id).data, // gather + // (a: Int, b: Int) => math.min(a, b), // merge + // (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply + // numIter, + // gatherDirection = EdgeDirection.Both) + // } + + // /** + // * Compute the shortest path to a set of markers + // */ + // def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = { + // val sourceSet = sources.toSet + // val spGraph = graph.mapVertices { + // case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0 else Double.MaxValue)) + // } + // GraphLab.iterateGA[Double, Double, Double](spGraph)( + // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather + // (a: Double, b: Double) => math.min(a, b), // merge + // (v, a: Option[Double]) => math.min(v.data, a.getOrElse(Double.MaxValue)), // apply + // numIter, + // gatherDirection = EdgeDirection.In) + // } + + // /** + // * Compute the connected component membership of each vertex + // * and return an RDD with the vertex value containing the + // * lowest vertex id in the connected component containing + // * that vertex. + // */ + // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + // numIter: Int = Int.MaxValue) = { + + // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) + // val edges = graph.edges // .mapValues(v => None) + // val ccGraph = new Graph(vertices, edges) + + // ccGraph.iterateDynamic( + // (me_id, edge) => edge.otherVertex(me_id).data, // gather + // (a: Int, b: Int) => math.min(a, b), // merge + // Integer.MAX_VALUE, + // (v, a: Int) => math.min(v.data, a), // apply + // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter + // numIter, + // gatherEdges = EdgeDirection.Both, + // scatterEdges = EdgeDirection.Both).vertices + // // + // // graph_ret.vertices.collect.foreach(println) + // // graph_ret.edges.take(10).foreach(println) + // } + + + // /** + // * Compute the shortest path to a set of markers + // */ + // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Double], + // sources: List[Int], numIter: Int) = { + // val sourceSet = sources.toSet + // val vertices = graph.vertices.mapPartitions( + // iter => iter.map { + // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Double.MaxValue) ) + // }); + + // val edges = graph.edges // .mapValues(v => None) + // val spGraph = new Graph(vertices, edges) + + // val niterations = Int.MaxValue + // spGraph.iterateDynamic( + // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather + // (a: Double, b: Double) => math.min(a, b), // merge + // Double.MaxValue, + // (v, a: Double) => math.min(v.data, a), // apply + // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter + // numIter, + // gatherEdges = EdgeDirection.In, + // scatterEdges = EdgeDirection.Out).vertices + // } + + + // /** + // * + // */ + // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], + // latentK: Int, lambda: Double, numIter: Int) = { + // val vertices = graph.vertices.mapPartitions( _.map { + // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) + // }).cache + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val edges = graph.edges // .mapValues(v => None) + // val alsGraph = new Graph(vertices, edges) + // alsGraph.numVPart = graph.numVPart + // alsGraph.numEPart = graph.numEPart + + // val niterations = Int.MaxValue + // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( + // (me_id, edge) => { // gather + // val X = edge.otherVertex(me_id).data + // val y = edge.data + // val Xy = X.map(_ * y) + // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray + // (Xy, XtX) + // }, + // (a, b) => { + // // The difference between the while loop and the zip is a FACTOR OF TWO in overall + // // runtime + // var i = 0 + // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 } + // i = 0 + // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 } + // a + // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r }) + // }, + // (Array.empty[Double], Array.empty[Double]), // default value is empty + // (vertex, accum) => { // apply + // val XyArray = accum._1 + // val XtXArray = accum._2 + // if(XyArray.isEmpty) vertex.data // no neighbors + // else { + // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) => + // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) + + // (if(i == j) lambda else 1.0F) //regularization + // } + // val Xy = DenseMatrix.create(latentK,1,XyArray) + // val w = XtX \ Xy + // w.data + // } + // }, + // (me_id, edge) => true, + // numIter, + // gatherEdges = EdgeDirection.Both, + // scatterEdges = EdgeDirection.Both, + // vertex => vertex.id < maxUser).vertices + // } + + // def main(args: Array[String]) = { + // val host = args(0) + // val taskType = args(1) + // val fname = args(2) + // val options = args.drop(3).map { arg => + // arg.dropWhile(_ == '-').split('=') match { + // case Array(opt, v) => (opt -> v) + // case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + // } + // } + + // System.setProperty("spark.serializer", "spark.KryoSerializer") + // //System.setProperty("spark.shuffle.compress", "false") + // System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") + + // taskType match { + // case "pagerank" => { + + // var numIter = Int.MaxValue + // var isDynamic = false + // var tol:Double = 0.001 + // var outFname = "" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case ("tol", v) => tol = v.toDouble + // case ("output", v) => outFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + // println("======================================") + // println("| PageRank |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // if(isDynamic) println(" \t |-> Tolerance: " + tol) + // println(" \tNumIter: " + numIter) + // println("======================================") + + // val sc = new SparkContext(host, "PageRank(" + fname + ")") + + // val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache() + + // val startTime = System.currentTimeMillis + // logInfo("GRAPHX: starting tasks") + // logInfo("GRAPHX: Number of vertices " + graph.vertices.count) + // logInfo("GRAPHX: Number of edges " + graph.edges.count) + + // val pr = Analytics.pagerank(graph, numIter) + // // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) + // // else Analytics.pagerank(graph, numIter) + // logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) + // if (!outFname.isEmpty) { + // println("Saving pageranks of pages to " + outFname) + // pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) + // } + // logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + // sc.stop() + // } + + // case "cc" => { + + // var numIter = Int.MaxValue + // var isDynamic = false + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + // println("======================================") + // println("| Connected Components |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // println(" \tNumIter: " + numIter) + // println("======================================") + + // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => 1.0) + // val cc = Analytics.connectedComponents(graph, numIter) + // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) + // // else Analytics.connectedComponents(graph, numIter) + // println("Components: " + cc.vertices.map(_.data).distinct()) + + // sc.stop() + // } + + // case "shortestpath" => { + + // var numIter = Int.MaxValue + // var isDynamic = true + // var sources: List[Int] = List.empty + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case ("source", v) => sources ++= List(v.toInt) + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + + // if(sources.isEmpty) { + // println("No sources provided!") + // sys.exit(1) + // } + + // println("======================================") + // println("| Shortest Path |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // println(" \tNumIter: " + numIter) + // println(" \tSources: [" + sources.mkString(", ") + "]") + // println("======================================") + + // val sc = new SparkContext(host, "ShortestPath(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) ) + // val sp = Analytics.shortestPath(graph, sources, numIter) + // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) + // // else Analytics.shortestPath(graph, sources, numIter) + // println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) + + // sc.stop() + // } + + + // case "als" => { + + // var numIter = 5 + // var lambda = 0.01 + // var latentK = 10 + // var usersFname = "usersFactors.tsv" + // var moviesFname = "moviesFname.tsv" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("lambda", v) => lambda = v.toDouble + // case ("latentK", v) => latentK = v.toInt + // case ("usersFname", v) => usersFname = v + // case ("moviesFname", v) => moviesFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // println("======================================") + // println("| Alternating Least Squares |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tNumIter: " + numIter) + // println(" \tLambda: " + lambda) + // println(" \tLatentK: " + latentK) + // println(" \tusersFname: " + usersFname) + // println(" \tmoviesFname: " + moviesFname) + // println("======================================") + + // val sc = new SparkContext(host, "ALS(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) + // graph.numVPart = numVPart + // graph.numEPart = numEPart + + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) + // assert(maxUser < minMovie) + + // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache + // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(usersFname) + // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(moviesFname) + + // sc.stop() + // } + + + // case _ => { + // println("Invalid task type.") + // } + // } + // } + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala new file mode 100644 index 0000000000..67b6454017 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala @@ -0,0 +1,34 @@ +package org.apache.spark.graph + + +/** + * A single directed edge consisting of a source id, target id, + * and the data associated with the Edgee. + * + * @tparam ED type of the edge attribute + */ +case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( + var srcId: Vid = 0, + var dstId: Vid = 0, + var attr: ED = nullValue[ED]) { + + /** + * Given one vertex in the edge return the other vertex. + * + * @param vid the id one of the two vertices on the edge. + * @return the id of the other vertex on the edge. + */ + def otherVertexId(vid: Vid): Vid = + if (srcId == vid) dstId else { assert(dstId == vid); srcId } + + + /** + * Return the relative direction of the edge to the corresponding vertex. + * + * @param vid the id of one of the two vertices in the edge. + * @return the relative direction of the edge to the corresponding vertex. + */ + def relativeDirection(vid: Vid): EdgeDirection = + if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala new file mode 100644 index 0000000000..99af2d5458 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala @@ -0,0 +1,32 @@ +package org.apache.spark.graph + + +/** + * The direction of directed edge relative to a vertex used to select + * the set of adjacent neighbors when running a neighborhood query. + */ +sealed abstract class EdgeDirection { + def reverse: EdgeDirection = this match { + case EdgeDirection.In => EdgeDirection.In + case EdgeDirection.Out => EdgeDirection.Out + case EdgeDirection.Both => EdgeDirection.Both + } +} + + +object EdgeDirection { + /** + * Edges arriving at a vertex. + */ + case object In extends EdgeDirection + + /** + * Edges originating from a vertex + */ + case object Out extends EdgeDirection + + /** + * All edges adjacent to a vertex + */ + case object Both extends EdgeDirection +} diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala new file mode 100644 index 0000000000..ef3aa199bd --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -0,0 +1,56 @@ +package org.apache.spark.graph + +/** + * An edge triplet represents two vertices and edge along with their attributes. + * + * @tparam VD the type of the vertex attribute. + * @tparam ED the type of the edge attribute + * + * @todo specialize edge triplet for basic types, though when I last tried + * specializing I got a warning about inherenting from a type that is not + * a trait. + */ +class EdgeTriplet[VD, ED] extends Edge[ED] { +// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest, +// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] { + + + /** + * The source vertex attribute + */ + var srcAttr: VD = _ //nullValue[VD] + + /** + * The destination vertex attribute + */ + var dstAttr: VD = _ //nullValue[VD] + + /** + * Set the edge properties of this triplet. + */ + protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = { + srcId = other.srcId + dstId = other.dstId + attr = other.attr + this + } + + /** + * Given one vertex in the edge return the other vertex. + * + * @param vid the id one of the two vertices on the edge. + * @return the attribute for the other vertex on the edge. + */ + def otherVertexAttr(vid: Vid): VD = + if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } + + /** + * Get the vertex object for the given vertex in the edge. + * + * @param vid the id of one of the two vertices on the edge + * @return the attr for the vertex with that id. + */ + def vertexAttr(vid: Vid): VD = + if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala new file mode 100644 index 0000000000..50a44e51e5 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -0,0 +1,313 @@ +package org.apache.spark.graph + + +import org.apache.spark.rdd.RDD +import org.apache.spark.util.ClosureCleaner + + + +/** + * The Graph abstractly represents a graph with arbitrary objects associated + * with vertices and edges. The graph provides basic operations to access and + * manipulate the data associated with vertices and edges as well as the + * underlying structure. Like Spark RDDs, the graph is a functional + * data-structure in which mutating operations return new graphs. + * + * @tparam VD The type of object associated with each vertex. + * + * @tparam ED The type of object associated with each edge + */ +abstract class Graph[VD: ClassManifest, ED: ClassManifest] { + + /** + * Get the vertices and their data. + * + * @note vertex ids are unique. + * @return An RDD containing the vertices in this graph + * + * @see Vertex for the vertex type. + * + */ + val vertices: RDD[(Vid,VD)] + + /** + * Get the Edges and their data as an RDD. The entries in the RDD contain + * just the source id and target id along with the edge data. + * + * @return An RDD containing the edges in this graph + * + * @see Edge for the edge type. + * @see edgesWithVertices to get an RDD which contains all the edges along + * with their vertex data. + * + * @todo Should edges return 3 tuples instead of Edge objects? In this case + * we could rename EdgeTriplet to Edge? + */ + val edges: RDD[Edge[ED]] + + /** + * Get the edges with the vertex data associated with the adjacent pair of + * vertices. + * + * @return An RDD containing edge triplets. + * + * @example This operation might be used to evaluate a graph coloring where + * we would like to check that both vertices are a different color. + * {{{ + * type Color = Int + * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") + * val numInvalid = graph.edgesWithVertices() + * .map(e => if(e.src.data == e.dst.data) 1 else 0).sum + * }}} + * + * @see edges() If only the edge data and adjacent vertex ids are required. + * + */ + val triplets: RDD[EdgeTriplet[VD, ED]] + + /** + * Return a graph that is cached when first created. This is used to pin a + * graph in memory enabling multiple queries to reuse the same construction + * process. + * + * @see RDD.cache() for a more detailed explanation of caching. + */ + def cache(): Graph[VD, ED] + + + /** + * Compute statistics describing the graph representation. + */ + def statistics: Map[String, Any] + + + + /** + * Construct a new graph where each vertex value has been transformed by the + * map function. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from a vertex object to a new vertex value. + * + * @tparam VD2 the new vertex data type + * + * @example We might use this operation to change the vertex values from one + * type to another to initialize an algorithm. + * {{{ + * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") + * val root = 42 + * var bfsGraph = rawGraph + * .mapVertices[Int]((vid, data) => if(vid == root) 0 else Math.MaxValue) + * }}} + * + */ + def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED] + + /** + * Construct a new graph where each the value of each edge is transformed by + * the map operation. This function is not passed the vertex value for the + * vertices adjacent to the edge. If vertex values are desired use the + * mapTriplets function. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge attributes. + * + */ + def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2] + + /** + * Construct a new graph where each the value of each edge is transformed by + * the map operation. This function passes vertex values for the adjacent + * vertices to the map function. If adjacent vertex values are not required, + * consider using the mapEdges function instead. + * + * @note This graph is not changed and that the new graph has the same + * structure. As a consequence the underlying index structures can be + * reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge attributes based + * on the attributes associated with each vertex. + * {{{ + * val rawGraph: Graph[Int, Int] = someLoadFunction() + * val graph = rawGraph.mapTriplets[Int]( edge => + * edge.src.data - edge.dst.data) + * }}} + * + */ + def mapTriplets[ED2: ClassManifest]( + map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] + + + /** + * Construct a new graph with all the edges reversed. If this graph contains + * an edge from a to b then the returned graph contains an edge from b to a. + * + */ + def reverse: Graph[VD, ED] + + + /** + * This function takes a vertex and edge predicate and constructs the subgraph + * that consists of vertices and edges that satisfy the predict. The resulting + * graph contains the vertices and edges that satisfy: + * + * V' = {v : for all v in V where vpred(v)} + * E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)} + * + * @param epred the edge predicate which takes a triplet and evaluates to true + * if the edge is to remain in the subgraph. Note that only edges in which both + * vertices satisfy the vertex predicate are considered. + * + * @param vpred the vertex predicate which takes a vertex object and evaluates + * to true if the vertex is to be included in the subgraph + * + * @return the subgraph containing only the vertices and edges that satisfy the + * predicates. + */ + def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] + + + + /** + * @todo document function + */ + def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] + + + /** + * @todo document function + */ + def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2] + + + /** + * The mapReduceTriplets function is used to compute statistics about + * the neighboring edges and vertices of each vertex. The user supplied + * `mapFunc` function is invoked on each edge of the graph generating 0 or + * more "messages" to be "sent" to either vertex in the edge. + * The `reduceFunc` is then used to combine the output of the map phase + * destined to each vertex. + * + * @tparam A the type of "message" to be sent to each vertex + * + * @param mapFunc the user defined map function which returns 0 or + * more messages to neighboring vertices. + * @param reduceFunc the user defined reduce function which should be + * commutative and assosciative and is used to combine the output of + * the map phase. + * + * @example We can use this function to compute the inDegree of each + * vertex + * {{{ + * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") + * val inDeg: RDD[(Vid, Int)] = + * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _) + * }}} + * + * @note By expressing computation at the edge level we achieve maximum + * parallelism. This is one of the core functions in the Graph API in that enables + * neighborhood level computation. For example this function can be used to + * count neighbors satisfying a predicate or implement PageRank. + * + */ + def mapReduceTriplets[A: ClassManifest]( + mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], + reduceFunc: (A, A) => A) + : RDD[(Vid, A)] + + + /** + * Join the vertices with an RDD and then apply a function from the the + * vertex and RDD entry to a new vertex value and type. + * The input table should contain at most one entry for each vertex. + * If no entry is provided the map function is invoked passing none. + * + * @tparam U the type of entry in the table of updates + * @tparam VD2 the new vertex value type + * + * @param table the table to join with the vertices in the graph. The table + * should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex values. The + * map function is invoked for all vertices, even those that do not have a + * corresponding entry in the table. + * + * @example This function is used to update the vertices with new values + * based on external data. For example we could add the out degree to each + * vertex record + * {{{ + * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") + * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.outerJoinVertices(outDeg) { + * (vid, data, optDeg) => optDeg.getOrElse(0) + * } + * }}} + * + */ + def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)]) + (mapFunc: (Vid, VD, Option[U]) => VD2) + : Graph[VD2, ED] + + + // Save a copy of the GraphOps object so there is always one unique GraphOps object + // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. + val ops = new GraphOps(this) +} + + +/** + * The Graph Singleton contains basic routines to create graphs + */ +object Graph { + + import org.apache.spark.graph.impl._ + import org.apache.spark.SparkContext._ + + /** + * Construct a graph from a list of Edges. + * + * @param rawEdges a collection of edges in (src,dst) form. + * @param uniqueEdges if multiple identical edges are found they are combined + * and the edge attribute is set to the sum. Otherwise duplicate edges are + * treated as separate. + * + * + */ + def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean = true): Graph[Int, Int] = { + // Reduce to unique edges. + val edges: RDD[Edge[Int]] = + if (uniqueEdges) { + rawEdges.map((_, 1)).reduceByKey(_ + _).map { case ((s, t), cnt) => Edge(s, t, cnt) } + } else { + rawEdges.map { case (s, t) => Edge(s, t, 1) } + } + // Determine unique vertices + /** @todo Should this reduceByKey operation be indexed? */ + val vertices: RDD[(Vid, Int)] = + edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _) + + // Return graph + GraphImpl(vertices, edges) + } + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { + GraphImpl(vertices, edges) + } + + implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops +} diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala new file mode 100644 index 0000000000..29ea38ec67 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -0,0 +1,21 @@ +package org.apache.spark.graph + +import com.esotericsoftware.kryo.Kryo + +import org.apache.spark.graph.impl.MessageToPartition +import org.apache.spark.serializer.KryoRegistrator +import org.apache.spark.graph.impl._ + +class GraphKryoRegistrator extends KryoRegistrator { + + def registerClasses(kryo: Kryo) { + kryo.register(classOf[Edge[Object]]) + kryo.register(classOf[MutableTuple2[Object, Object]]) + kryo.register(classOf[MessageToPartition[Object]]) + kryo.register(classOf[(Vid, Object)]) + kryo.register(classOf[EdgePartition[Object]]) + + // This avoids a large number of hash table lookups. + kryo.setReferences(false) + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala new file mode 100644 index 0000000000..8ba708ba32 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -0,0 +1,135 @@ +package org.apache.spark.graph + +import scala.collection.JavaConversions._ +import org.apache.spark.rdd.RDD + +/** + * This object implement the graphlab gather-apply-scatter api. + */ +object GraphLab { + + /** + * Execute the GraphLab Gather-Apply-Scatter API + * + * @todo finish documenting GraphLab Gather-Apply-Scatter API + * + * @param graph The graph on which to execute the GraphLab API + * @param gatherFunc The gather function is executed on each edge triplet + * adjacent to a vertex and returns an accumulator which + * is then merged using the merge function. + * @param mergeFunc An accumulative associative operation on the result of + * the gather type. + * @param applyFunc Takes a vertex and the final result of the merge operations + * on the adjacent edges and returns a new vertex value. + * @param scatterFunc Executed after the apply function the scatter function takes + * a triplet and signals whether the neighboring vertex program + * must be recomputed. + * @param numIter The maximum number of iterations to run. + * @param gatherDirection The direction of edges to consider during the gather phase + * @param scatterDirection The direction of edges to consider during the scatter phase + * + * @tparam VD The graph vertex attribute type + * @tparam ED The graph edge attribute type + * @tparam A The type accumulated during the gather phase + * @return the resulting graph after the algorithm converges + */ + def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, + mergeFunc: (A, A) => A, + applyFunc: (Vid, VD, Option[A]) => VD, + scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, + numIter: Int = Integer.MAX_VALUE, + gatherDirection: EdgeDirection = EdgeDirection.In, + scatterDirection: EdgeDirection = EdgeDirection.Out): Graph[VD, ED] = { + + + // Add an active attribute to all vertices to track convergence. + var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices { + case (id, data) => (true, data) + }.cache() + + // The gather function wrapper strips the active attribute and + // only invokes the gather function on active vertices + def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = { + if (e.vertexAttr(vid)._1) { + val edgeTriplet = new EdgeTriplet[VD,ED] + edgeTriplet.set(e) + edgeTriplet.srcAttr = e.srcAttr._2 + edgeTriplet.dstAttr = e.dstAttr._2 + Some(gatherFunc(vid, edgeTriplet)) + } else { + None + } + } + + // The apply function wrapper strips the vertex of the active attribute + // and only invokes the apply function on active vertices + def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { + val (active, vData) = data + if (active) (true, applyFunc(vid, vData, accum)) + else (false, vData) + } + + // The scatter function wrapper strips the vertex of the active attribute + // and only invokes the scatter function on active vertices + def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = { + val vid = e.otherVertexId(rawVid) + if (e.vertexAttr(vid)._1) { + val edgeTriplet = new EdgeTriplet[VD,ED] + edgeTriplet.set(e) + edgeTriplet.srcAttr = e.srcAttr._2 + edgeTriplet.dstAttr = e.dstAttr._2 + Some(scatterFunc(vid, edgeTriplet)) + } else { + None + } + } + + // Used to set the active status of vertices for the next round + def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = { + val (prevActive, vData) = data + (newActive, vData) + } + + // Main Loop --------------------------------------------------------------------- + var i = 0 + var numActive = activeGraph.numVertices + while (i < numIter && numActive > 0) { + + // Gather + val gathered: RDD[(Vid, A)] = + activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) + + // Apply + activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache() + + + + // Scatter is basically a gather in the opposite direction so we reverse the edge direction + // activeGraph: Graph[(Boolean, VD), ED] + val scattered: RDD[(Vid, Boolean)] = + activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) + + activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache() + + // Calculate the number of active vertices + numActive = activeGraph.vertices.map{ + case (vid, data) => if (data._1) 1 else 0 + }.reduce(_ + _) + println("Number active vertices: " + numActive) + i += 1 + } + + // Remove the active attribute from the vertex data before returning the graph + activeGraph.mapVertices{case (vid, data) => data._2 } + } +} + + + + + + + + + diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala new file mode 100644 index 0000000000..052f9acdeb --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -0,0 +1,54 @@ +package org.apache.spark.graph + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph.impl.GraphImpl + + +object GraphLoader { + + /** + * Load an edge list from file initializing the Graph RDD + */ + def textFile[ED: ClassManifest]( + sc: SparkContext, + path: String, + edgeParser: Array[String] => ED, + minEdgePartitions: Int = 1, + minVertexPartitions: Int = 1) + : GraphImpl[Int, ED] = { + + // Parse the edge data table + val edges = sc.textFile(path, minEdgePartitions).flatMap { line => + if (!line.isEmpty && line(0) != '#') { + val lineArray = line.split("\\s+") + if(lineArray.length < 2) { + println("Invalid line: " + line) + assert(false) + } + val source = lineArray(0) + val target = lineArray(1) + val tail = lineArray.drop(2) + val edata = edgeParser(tail) + Array(Edge(source.trim.toInt, target.trim.toInt, edata)) + } else { + Array.empty[Edge[ED]] + } + }.cache() + + val graph = fromEdges(edges) + // println("Loaded graph:" + + // "\n\t#edges: " + graph.numEdges + + // "\n\t#vertices: " + graph.numVertices) + + graph + } + + def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { + val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } + .reduceByKey(_ + _) + .map{ case (vid, degree) => (vid, degree) } + GraphImpl(vertices, edges) + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala new file mode 100644 index 0000000000..92198a4995 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -0,0 +1,166 @@ +package org.apache.spark.graph + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.util.ClosureCleaner + + +class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { + + + + lazy val numEdges: Long = graph.edges.count() + + lazy val numVertices: Long = graph.vertices.count() + + lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In) + + lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out) + + lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both) + + + /** + * This function is used to compute a statistic for the neighborhood of each + * vertex and returns a value for all vertices (including those without + * neighbors). + * + * @note Because the a default value is provided all vertices will have a + * corresponding entry in the returned RDD. + * + * @param mapFunc the function applied to each edge adjacent to each vertex. + * The mapFunc can optionally return None in which case it does not + * contribute to the final sum. + * @param reduceFunc the function used to merge the results of each map + * operation. + * @param default the default value to use for each vertex if it has no + * neighbors or the map function repeatedly evaluates to none + * @param direction the direction of edges to consider (e.g., In, Out, Both). + * @tparam VD2 The returned type of the aggregation operation. + * + * @return A Spark.RDD containing tuples of vertex identifiers and + * their resulting value. There will be exactly one entry for ever vertex in + * the original graph. + * + * @example We can use this function to compute the average follower age + * for each user + * {{{ + * val graph: Graph[Int,Int] = loadGraph() + * val averageFollowerAge: RDD[(Int, Int)] = + * graph.aggregateNeighbors[(Int,Double)]( + * (vid, edge) => (edge.otherVertex(vid).data, 1), + * (a, b) => (a._1 + b._1, a._2 + b._2), + * -1, + * EdgeDirection.In) + * .mapValues{ case (sum,followers) => sum.toDouble / followers} + * }}} + * + * @todo Should this return a graph with the new vertex values? + * + */ + def aggregateNeighbors[A: ClassManifest]( + mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], + reduceFunc: (A, A) => A, + dir: EdgeDirection) + : RDD[(Vid, A)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + // Define a new map function over edge triplets + val mf = (et: EdgeTriplet[VD,ED]) => { + // Compute the message to the dst vertex + val dst = + if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { + mapFunc(et.dstId, et) + } else { Option.empty[A] } + // Compute the message to the source vertex + val src = + if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { + mapFunc(et.srcId, et) + } else { Option.empty[A] } + // construct the return array + (src, dst) match { + case (None, None) => Array.empty[(Vid, A)] + case (Some(srcA),None) => Array((et.srcId, srcA)) + case (None, Some(dstA)) => Array((et.dstId, dstA)) + case (Some(srcA), Some(dstA)) => + Array((et.srcId, srcA), (et.dstId, dstA)) + } + } + + ClosureCleaner.clean(mf) + graph.mapReduceTriplets(mf, reduceFunc) + } // end of aggregateNeighbors + + + + + + + + def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = { + val nbrs = graph.aggregateNeighbors[Array[Vid]]( + (vid, edge) => Some(Array(edge.otherVertexId(vid))), + (a, b) => a ++ b, + edgeDirection) + + graph.vertices.leftOuterJoin(nbrs).mapValues{ + case (_, Some(nbrs)) => nbrs + case (_, None) => Array.empty[Vid] + } + } + + + private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = { + graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection) + } + + + /** + * Join the vertices with an RDD and then apply a function from the the + * vertex and RDD entry to a new vertex value. The input table should + * contain at most one entry for each vertex. If no entry is provided the + * map function is skipped and the old value is used. + * + * @tparam U the type of entry in the table of updates + * @param table the table to join with the vertices in the graph. The table + * should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex values. The + * map function is invoked only for vertices with a corresponding entry in + * the table otherwise the old vertex value is used. + * + * @note for small tables this function can be much more efficient than + * leftJoinVertices + * + * @example This function is used to update the vertices with new values + * based on external data. For example we could add the out degree to each + * vertex record + * {{{ + * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph") + * .mapVertices(v => 0) + * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg, + * (v, deg) => deg ) + * }}} + * + * @todo Should this function be curried to enable type inference? For + * example + * {{{ + * graph.joinVertices(tbl)( (v, row) => row ) + * }}} + */ + def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) + : Graph[VD, ED] = { + ClosureCleaner.clean(mapFunc) + val uf = (id: Vid, data: VD, o: Option[U]) => { + o match { + case Some(u) => mapFunc(id, data, u) + case None => data + } + } + ClosureCleaner.clean(uf) + graph.outerJoinVertices(table)(uf) + } + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala new file mode 100644 index 0000000000..065d196ff6 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -0,0 +1,36 @@ +package org.apache.spark.graph + +import org.apache.spark.rdd.RDD + + +object Pregel { + + def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( + vprog: (Vid, VD, A) => VD, + sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mergeMsg: (A, A) => A, + initialMsg: A, + numIter: Int) + : Graph[VD, ED] = { + + var g = graph + //var g = graph.cache() + var i = 0 + + def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge) + + // Receive the first set of messages + g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)) + + while (i < numIter) { + // compute the messages + val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In) + // receive the messages + g = g.joinVertices(messages)(vprog) + // count the iteration + i += 1 + } + // Return the final graph + g + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala new file mode 100644 index 0000000000..dbfccde8b9 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -0,0 +1,65 @@ +package org.apache.spark.graph.impl + +import scala.collection.mutable.ArrayBuilder +import org.apache.spark.graph._ + + +/** + * A partition of edges in 3 large columnar arrays. + */ +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( + val srcIds: Array[Vid], + val dstIds: Array[Vid], + val data: Array[ED] + ){ + + // private var _data: Array[ED] = _ + // private var _dataBuilder = ArrayBuilder.make[ED] + + // var srcIds = new VertexArrayList + // var dstIds = new VertexArrayList + + def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) + + def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { + val newData = new Array[ED2](data.size) + val edge = new Edge[ED]() + for(i <- 0 until data.size){ + edge.srcId = srcIds(i) + edge.dstId = dstIds(i) + edge.attr = data(i) + newData(i) = f(edge) + } + new EdgePartition(srcIds, dstIds, newData) + } + + def foreach(f: Edge[ED] => Unit) { + val edge = new Edge[ED] + for(i <- 0 until data.size){ + edge.srcId = srcIds(i) + edge.dstId = dstIds(i) + edge.attr = data(i) + f(edge) + } + } + + + def size: Int = srcIds.size + + def iterator = new Iterator[Edge[ED]] { + private val edge = new Edge[ED] + private var pos = 0 + + override def hasNext: Boolean = pos < EdgePartition.this.size + + override def next(): Edge[ED] = { + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) + pos += 1 + edge + } + } +} + + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala new file mode 100644 index 0000000000..cc3a443fa2 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -0,0 +1,31 @@ +package org.apache.spark.graph.impl + +import scala.collection.mutable.ArrayBuilder +import org.apache.spark.graph._ + + +//private[graph] +class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) +ED: ClassManifest]{ + val srcIds = new VertexArrayList + val dstIds = new VertexArrayList + var dataBuilder = ArrayBuilder.make[ED] + + + /** Add a new edge to the partition. */ + def add(src: Vid, dst: Vid, d: ED) { + srcIds.add(src) + dstIds.add(dst) + dataBuilder += d + } + + def toEdgePartition: EdgePartition[ED] = { + new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result()) + } + + +} + + + + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala new file mode 100644 index 0000000000..413177b2da --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -0,0 +1,671 @@ +package org.apache.spark.graph.impl + +import scala.collection.JavaConversions._ + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuilder +import scala.collection.mutable.BitSet + + +import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner +import org.apache.spark.HashPartitioner +import org.apache.spark.util.ClosureCleaner + +import org.apache.spark.rdd +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.IndexedRDD +import org.apache.spark.rdd.RDDIndex + + +import org.apache.spark.graph._ +import org.apache.spark.graph.impl.GraphImpl._ +import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ + +/** + * The Iterator type returned when constructing edge triplets + */ +class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( + val vidToIndex: VertexIdToIndexMap, + val vertexArray: Array[VD], + val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] { + + private var pos = 0 + private val et = new EdgeTriplet[VD, ED] + + override def hasNext: Boolean = pos < edgePartition.size + override def next() = { + et.srcId = edgePartition.srcIds(pos) + // assert(vmap.containsKey(e.src.id)) + et.srcAttr = vertexArray(vidToIndex(et.srcId)) + et.dstId = edgePartition.dstIds(pos) + // assert(vmap.containsKey(e.dst.id)) + et.dstAttr = vertexArray(vidToIndex(et.dstId)) + et.attr = edgePartition.data(pos) + pos += 1 + et + } + + override def toList: List[EdgeTriplet[VD, ED]] = { + val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]] + val currentEdge = new EdgeTriplet[VD, ED] + for (i <- (0 until edgePartition.size)) { + currentEdge.srcId = edgePartition.srcIds(i) + // assert(vmap.containsKey(e.src.id)) + currentEdge.srcAttr = vertexArray(vidToIndex(currentEdge.srcId)) + currentEdge.dstId = edgePartition.dstIds(i) + // assert(vmap.containsKey(e.dst.id)) + currentEdge.dstAttr = vertexArray(vidToIndex(currentEdge.dstId)) + currentEdge.attr = edgePartition.data(i) + lb += currentEdge + } + lb.toList + } +} // end of Edge Triplet Iterator + + + +object EdgeTripletBuilder { + def makeTriplets[VD: ClassManifest, ED: ClassManifest]( + localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], + vTableReplicatedValues: IndexedRDD[Pid, Array[VD]], + eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = { + val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { + val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() + assert(iter.hasNext == false) + new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) + } + ClosureCleaner.clean(iterFun) + localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable) + .mapPartitions( iterFun ) // end of map partition + } +} + + +// { +// val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => { +// val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next() +// assert(iter.hasNext == false) +// // Return an iterator that looks up the hash map to find matching +// // vertices for each edge. +// new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition) +// } +// ClosureCleaner.clean(iterFun) +// localVidMap.zipJoin(vTableReplicatedValues).zipJoinRDD(eTable) +// .mapPartitions( iterFun ) // end of map partition +// } +// } + + +/** + * A Graph RDD that supports computation on graphs. + */ +class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( + @transient val vTable: IndexedRDD[Vid, VD], + @transient val vid2pid: IndexedRDD[Vid, Array[Pid]], + @transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap], + @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]]) + extends Graph[VD, ED] { + +// def this() = this(null,null,null) + + + /** + * (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the + * vertex data after it is replicated. Within each partition, it holds a map + * from vertex ID to the index where that vertex's attribute is stored. This + * index refers to an array in the same partition in vTableReplicatedValues. + * + * (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data + * and is arranged as described above. + */ + @transient val vTableReplicatedValues = + createVTableReplicated(vTable, vid2pid, localVidMap) + + + /** Return a RDD of vertices. */ + @transient override val vertices: RDD[(Vid, VD)] = vTable + + + /** Return a RDD of edges. */ + @transient override val edges: RDD[Edge[ED]] = { + eTable.mapPartitions { iter => iter.next()._2.iterator } + } + + + /** Return a RDD that brings edges with its source and destination vertices together. */ + @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = + EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable) + + + // { + // val iterFun = (iter: Iterator[(Pid, (VertexHashMap[VD], EdgePartition[ED]))]) => { + // val (pid, (vmap, edgePartition)) = iter.next() + // //assert(iter.hasNext == false) + // // Return an iterator that looks up the hash map to find matching + // // vertices for each edge. + // new EdgeTripletIterator(vmap, edgePartition) + // } + // ClosureCleaner.clean(iterFun) + // vTableReplicated.join(eTable).mapPartitions( iterFun ) // end of map partition + // } + + + + + override def cache(): Graph[VD, ED] = { + eTable.cache() + vid2pid.cache() + vTable.cache() + this + } + + + override def statistics: Map[String, Any] = { + val numVertices = this.numVertices + val numEdges = this.numEdges + val replicationRatio = + vid2pid.map(kv => kv._2.size).sum / vTable.count + val loadArray = + eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges) + val minLoad = loadArray.min + val maxLoad = loadArray.max + Map( + "Num Vertices" -> numVertices, "Num Edges" -> numEdges, + "Replication" -> replicationRatio, "Load Array" -> loadArray, + "Min Load" -> minLoad, "Max Load" -> maxLoad) + } + + + override def reverse: Graph[VD, ED] = { + val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]] + new GraphImpl(vTable, vid2pid, localVidMap, etable) + } + + + override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { + val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data)) + .asInstanceOf[IndexedRDD[Vid, VD2]] + new GraphImpl(newVTable, vid2pid, localVidMap, eTable) + } + + override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { + val newETable = eTable.mapValues(eBlock => eBlock.map(f)) + .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + new GraphImpl(vTable, vid2pid, localVidMap, newETable) + } + + + override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): + Graph[VD, ED2] = { + val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{ + case ((edgePartition, vidToIndex), vertexArray) => + val et = new EdgeTriplet[VD, ED] + edgePartition.map{e => + et.set(e) + et.srcAttr = vertexArray(vidToIndex(e.srcId)) + et.dstAttr = vertexArray(vidToIndex(e.dstId)) + f(et) + } + }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]] + new GraphImpl(vTable, vid2pid, localVidMap, newETable) + } + + // override def correctEdges(): Graph[VD, ED] = { + // val sc = vertices.context + // val vset = sc.broadcast(vertices.map(_.id).collect().toSet) + // val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst)) + // Graph(vertices, newEdges) + // } + + + override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = { + + /** @todo The following code behaves deterministically on each + * vertex predicate but uses additional space. Should we swithc to + * this version + */ + // val predGraph = mapVertices(v => (v.data, vpred(v))) + // val newETable = predGraph.triplets.filter(t => + // if(v.src.data._2 && v.dst.data._2) { + // val src = Vertex(t.src.id, t.src.data._1) + // val dst = Vertex(t.dst.id, t.dst.data._1) + // epred(new EdgeTriplet[VD, ED](src, dst, t.data)) + // } else { false }) + + // val newVTable = predGraph.vertices.filter(v => v.data._1) + // .map(v => (v.id, v.data._1)).indexed() + + // Reuse the partitioner (but not the index) from this graph + val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner) + + + // Restrict the set of edges to those that satisfy the vertex and the edge predicate. + val newETable = createETable( + triplets.filter( + t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t) + ) + .map( t => Edge(t.srcId, t.dstId, t.attr) ), + eTable.index.partitioner.numPartitions + ) + + // Construct the Vid2Pid map. Here we assume that the filter operation + // behaves deterministically. + // @todo reindex the vertex and edge tables + val newVid2Pid = createVid2Pid(newETable, newVTable.index) + val newVidMap = createLocalVidMap(newETable) + + new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable) + } + + + // Because of the edgepartitioner, we know that all edges with the same src and dst + // will be in the same partition + + // We will want to keep the same partitioning scheme. Use newGraph() rather than + // new GraphImpl() + // TODO(crankshaw) is there a better way to do this using RDD.groupBy() + // functions? + + override def groupEdgeTriplets[ED2: ClassManifest]( + f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = { + //override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): + + // I think that + // myRDD.mapPartitions { part => + // val (vmap, edges) = part.next() + // gives me access to the vertex map and the set of + // edges within that partition + + // This is what happens during mapPartitions + // The iterator iterates over all partitions + // val result: RDD[U] = new RDD[T]().mapPartitions(f: Iterator[T] => Iterator[U]) + + // TODO(crankshaw) figure out how to actually get the new Edge RDD and what + // type that should have + val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter => + // toList lets us operate on all EdgeTriplets in a single partition at once + partIter + .toList + // groups all ETs in this partition that have the same src and dst + // Because all ETs with the same src and dst will live on the same + // partition due to the EdgePartitioner, this guarantees that these + // ET groups will be complete. + .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } + //.groupBy { e => (e.src, e.dst) } + // Apply the user supplied supplied edge group function to + // each group of edges + // The result of this line is Map[(Long, Long, ED2] + .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } + // convert the resulting map back to a list of tuples + .toList + // TODO(crankshaw) needs an iterator over the tuples? + // Why can't I map over the list? + .toIterator + // map over those tuples that contain src and dst info plus the + // new edge data to make my new edges + .map { case ((src, dst), data) => Edge(src, dst, data) } + + // How do I convert from a scala map to a list? + // I want to be able to apply a function like: + // f: (key, value): (K, V) => result: [R] + // so that I can transfrom a Map[K, V] to List[R] + + // Maybe look at collections.breakOut + // see http://stackoverflow.com/questions/1715681/scala-2-8-breakout + // and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list + + } + + // @todo eliminate the need to call createETable + val newETable = createETable(newEdges, + eTable.index.partitioner.numPartitions) + + + new GraphImpl(vTable, vid2pid, localVidMap, newETable) + + } + + + override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): + Graph[VD,ED2] = { + + val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter => + partIter.toList + .groupBy { e: Edge[ED] => (e.srcId, e.dstId) } + .mapValues { ts => f(ts.toIterator) } + .toList + .toIterator + .map { case ((src, dst), data) => Edge(src, dst, data) } + } + // @todo eliminate the need to call createETable + val newETable = createETable(newEdges, + eTable.index.partitioner.numPartitions) + + new GraphImpl(vTable, vid2pid, localVidMap, newETable) + } + + + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Lower level transformation methods + ////////////////////////////////////////////////////////////////////////////////////////////////// + + override def mapReduceTriplets[A: ClassManifest]( + mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)], + reduceFunc: (A, A) => A) + : RDD[(Vid, A)] = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + // Map and preaggregate + val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{ + case (pid, ((vidToIndex, vertexArray), edgePartition)) => + // We can reuse the vidToIndex map for aggregation here as well. + /** @todo Since this has the downside of not allowing "messages" to arbitrary + * vertices we should consider just using a fresh map. + */ + val msgArray = new Array[A](vertexArray.size) + val msgBS = new BitSet(vertexArray.size) + // Iterate over the partition + val et = new EdgeTriplet[VD, ED] + edgePartition.foreach{e => + et.set(e) + et.srcAttr = vertexArray(vidToIndex(e.srcId)) + et.dstAttr = vertexArray(vidToIndex(e.dstId)) + mapFunc(et).foreach{ case (vid, msg) => + // verify that the vid is valid + assert(vid == et.srcId || vid == et.dstId) + val ind = vidToIndex(vid) + // Populate the aggregator map + if(msgBS(ind)) { + msgArray(ind) = reduceFunc(msgArray(ind), msg) + } else { + msgArray(ind) = msg + msgBS(ind) = true + } + } + } + // Return the aggregate map + vidToIndex.long2IntEntrySet().fastIterator() + // Remove the entries that did not receive a message + .filter{ entry => msgBS(entry.getValue()) } + // Construct the actual pairs + .map{ entry => + val vid = entry.getLongKey() + val ind = entry.getValue() + val msg = msgArray(ind) + (vid, msg) + } + }.partitionBy(vTable.index.rdd.partitioner.get) + // do the final reduction reusing the index map + IndexedRDD(preAgg, vTable.index, reduceFunc) + } + + + override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] + (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2) + : Graph[VD2, ED] = { + ClosureCleaner.clean(updateF) + val newVTable = vTable.leftJoin(updates).mapValuesWithKeys( + (vid, vu) => updateF(vid, vu._1, vu._2) ) + new GraphImpl(newVTable, vid2pid, localVidMap, eTable) + } + + +} // end of class GraphImpl + + + + + + + + + + + + + + + + +object GraphImpl { + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]): + GraphImpl[VD,ED] = { + + apply(vertices, edges, + vertices.context.defaultParallelism, edges.context.defaultParallelism) + } + + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], + numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = { + + val vtable = vertices.indexed(numVPart) + val etable = createETable(edges, numEPart) + val vid2pid = createVid2Pid(etable, vtable.index) + val localVidMap = createLocalVidMap(etable) + new GraphImpl(vtable, vid2pid, localVidMap, etable) + } + + + + /** + * Create the edge table RDD, which is much more efficient for Java heap storage than the + * normal edges data structure (RDD[(Vid, Vid, ED)]). + * + * The edge table contains multiple partitions, and each partition contains only one RDD + * key-value pair: the key is the partition id, and the value is an EdgePartition object + * containing all the edges in a partition. + */ + protected def createETable[ED: ClassManifest]( + edges: RDD[Edge[ED]], numPartitions: Int) + : IndexedRDD[Pid, EdgePartition[ED]] = { + val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt + edges + .map { e => + // Random partitioning based on the source vertex id. + // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) + // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) + val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) + //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) + + // Should we be using 3-tuple or an optimized class + MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex({ (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).indexed() + } + + + protected def createVid2Pid[ED: ClassManifest]( + eTable: IndexedRDD[Pid, EdgePartition[ED]], + vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = { + val preAgg = eTable.mapPartitions { iter => + val (pid, edgePartition) = iter.next() + val vSet = new VertexSet + edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)}) + vSet.iterator.map { vid => (vid.toLong, pid) } + } + IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, + (p: Pid) => ArrayBuffer(p), + (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab}, + (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b) + .mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]] + } + + + protected def createLocalVidMap[ED: ClassManifest]( + eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = { + eTable.mapValues{ epart => + val vidToIndex = new VertexIdToIndexMap() + var i = 0 + epart.foreach{ e => + if(!vidToIndex.contains(e.srcId)) { + vidToIndex.put(e.srcId, i) + i += 1 + } + if(!vidToIndex.contains(e.dstId)) { + vidToIndex.put(e.dstId, i) + i += 1 + } + } + vidToIndex + } + } + + + protected def createVTableReplicated[VD: ClassManifest]( + vTable: IndexedRDD[Vid, VD], + vid2pid: IndexedRDD[Vid, Array[Pid]], + replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]): + IndexedRDD[Pid, Array[VD]] = { + // Join vid2pid and vTable, generate a shuffle dependency on the joined + // result, and get the shuffle id so we can use it on the slave. + val msgsByPartition = vTable.zipJoin(vid2pid) + .flatMap { case (vid, (vdata, pids)) => + pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) } + } + .partitionBy(replicationMap.partitioner.get).cache() + + val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){ + (mapIter, msgsIter) => + val (IndexedSeq(vidToIndex), bs) = mapIter.next() + assert(!mapIter.hasNext) + // Populate the vertex array using the vidToIndex map + val vertexArray = new Array[VD](vidToIndex.size) + for (msg <- msgsIter) { + val ind = vidToIndex(msg.data._1) + vertexArray(ind) = msg.data._2 + } + Iterator((IndexedSeq(vertexArray), bs)) + } + + new IndexedRDD(replicationMap.index, newValuesRDD) + + // @todo assert edge table has partitioner + + // val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] = + // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { + // val vidToIndex = new VertexIdToIndexMap + // var i = 0 + // for (msg <- iter) { + // vidToIndex.put(msg.data._1, i) + // i += 1 + // } + // Array((pid, vidToIndex)).iterator + // }, preservesPartitioning = true).indexed(eTable.index) + + // val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] = + // msgsByPartition.mapPartitionsWithIndex( (pid, iter) => { + // val vertexArray = ArrayBuilder.make[VD] + // for (msg <- iter) { + // vertexArray += msg.data._2 + // } + // Array((pid, vertexArray.result)).iterator + // }, preservesPartitioning = true).indexed(eTable.index) + + // (localVidMap, vTableReplicatedValues) + } + + + protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = { + val mixingPrime: Vid = 1125899906842597L + (math.abs(src) * mixingPrime).toInt % numParts + } + + + + /** + * This function implements a classic 2D-Partitioning of a sparse matrix. + * Suppose we have a graph with 11 vertices that we want to partition + * over 9 machines. We can use the following sparse matrix representation: + * + * __________________________________ + * v0 | P0 * | P1 | P2 * | + * v1 | **** | * | | + * v2 | ******* | ** | **** | + * v3 | ***** | * * | * | + * ---------------------------------- + * v4 | P3 * | P4 *** | P5 ** * | + * v5 | * * | * | | + * v6 | * | ** | **** | + * v7 | * * * | * * | * | + * ---------------------------------- + * v8 | P6 * | P7 * | P8 * *| + * v9 | * | * * | | + * v10 | * | ** | * * | + * v11 | * <-E | *** | ** | + * ---------------------------------- + * + * The edge denoted by E connects v11 with v1 and is assigned to + * processor P6. To get the processor number we divide the matrix + * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges + * adjacent to v11 can only be in the first colum of + * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8). + * As a consequence we can guarantee that v11 will need to be + * replicated to at most 2 * sqrt(numProc) machines. + * + * Notice that P0 has many edges and as a consequence this + * partitioning would lead to poor work balance. To improve + * balance we first multiply each vertex id by a large prime + * to effectively shuffle the vertex locations. + * + * One of the limitations of this approach is that the number of + * machines must either be a perfect square. We partially address + * this limitation by computing the machine assignment to the next + * largest perfect square and then mapping back down to the actual + * number of machines. Unfortunately, this can also lead to work + * imbalance and so it is suggested that a perfect square is used. + * + * + */ + protected def edgePartitionFunction2D(src: Vid, dst: Vid, + numParts: Pid, ceilSqrtNumParts: Pid): Pid = { + val mixingPrime: Vid = 1125899906842597L + val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } + + + /** + * Assign edges to an aribtrary machine corresponding to a + * random vertex cut. + */ + protected def randomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = { + math.abs((src, dst).hashCode()) % numParts + } + + + /** + * @todo(crankshaw) how does this effect load balancing? + */ + protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, + numParts: Pid, ceilSqrtNumParts: Pid): Pid = { + val mixingPrime: Vid = 1125899906842597L + // Partitions by canonical edge direction + val src = math.min(srcOrig, dstOrig) + val dst = math.max(srcOrig, dstOrig) + val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } + +} // end of object GraphImpl + diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala new file mode 100644 index 0000000000..b7bbf257a4 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala @@ -0,0 +1,49 @@ +package org.apache.spark.graph.impl + +import org.apache.spark.Partitioner +import org.apache.spark.graph.Pid +import org.apache.spark.rdd.{ShuffledRDD, RDD} + + +/** + * A message used to send a specific value to a partition. + * @param partition index of the target partition. + * @param data value to send + */ +class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( + @transient var partition: Pid, + var data: T) + extends Product2[Pid, T] { + + override def _1 = partition + + override def _2 = data + + override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]] +} + +/** + * Companion object for MessageToPartition. + */ +object MessageToPartition { + def apply[T](partition: Pid, value: T) = new MessageToPartition(partition, value) +} + + +class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) { + + /** + * Return a copy of the RDD partitioned using the specified partitioner. + */ + def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = { + new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner) + } + +} + + +object MessageToPartitionRDDFunctions { + implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = { + new MessageToPartitionRDDFunctions(rdd) + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala new file mode 100644 index 0000000000..4627c3566c --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -0,0 +1,25 @@ +package org.apache.spark + +package object graph { + + type Vid = Long + type Pid = Int + + type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T] + type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet + type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList + // @todo replace with rxin's fast hashmap + type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap + + /** + * Return the default null-like value for a data type T. + */ + def nullValue[T] = null.asInstanceOf[T] + + + private[graph] + case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U, + @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V]( + var _1: U, var _2: V) + +} diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala new file mode 100644 index 0000000000..eaff27a33e --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala @@ -0,0 +1,76 @@ +///// This file creates circular dependencies between examples bagle and graph + +// package org.apache.spark.graph.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel + +// import org.apache.spark.examples.bagel +// //import org.apache.spark.bagel.examples._ +// import org.apache.spark.graph._ + + +// object BagelTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala new file mode 100644 index 0000000000..01bd968550 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala @@ -0,0 +1,75 @@ +///// This file creates circular dependencies between examples bagle and graph + + +// package org.apache.spark.graph.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel +// import org.apache.spark.bagel.examples._ +// import org.apache.spark.graph._ + + +// object SparkTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala new file mode 100644 index 0000000000..bc00ce2151 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala @@ -0,0 +1,114 @@ +package org.apache.spark.graph.util + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import scala.collection.mutable.HashSet + +import org.apache.spark.util.Utils + +import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} +import org.objectweb.asm.Opcodes._ + + + +private[spark] object BytecodeUtils { + + /** + * Test whether the given closure invokes the specified method in the specified class. + */ + def invokedMethod(closure: AnyRef, targetClass: Class[_], targetMethod: String): Boolean = { + if (_invokedMethod(closure.getClass, "apply", targetClass, targetMethod)) { + true + } else { + // look at closures enclosed in this closure + for (f <- closure.getClass.getDeclaredFields + if f.getType.getName.startsWith("scala.Function")) { + f.setAccessible(true) + if (invokedMethod(f.get(closure), targetClass, targetMethod)) { + return true + } + } + return false + } + } + + private def _invokedMethod(cls: Class[_], method: String, + targetClass: Class[_], targetMethod: String): Boolean = { + + val seen = new HashSet[(Class[_], String)] + var stack = List[(Class[_], String)]((cls, method)) + + while (stack.nonEmpty) { + val (c, m) = stack.head + stack = stack.tail + seen.add((c, m)) + val finder = new MethodInvocationFinder(c.getName, m) + getClassReader(c).accept(finder, 0) + for (classMethod <- finder.methodsInvoked) { + //println(classMethod) + if (classMethod._1 == targetClass && classMethod._2 == targetMethod) { + return true + } else if (!seen.contains(classMethod)) { + stack = classMethod :: stack + } + } + } + return false + } + + /** + * Get an ASM class reader for a given class from the JAR that loaded it. + */ + private def getClassReader(cls: Class[_]): ClassReader = { + // Copy data over, before delegating to ClassReader - else we can run out of open file handles. + val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" + val resourceStream = cls.getResourceAsStream(className) + // todo: Fixme - continuing with earlier behavior ... + if (resourceStream == null) return new ClassReader(resourceStream) + + val baos = new ByteArrayOutputStream(128) + Utils.copyStream(resourceStream, baos, true) + new ClassReader(new ByteArrayInputStream(baos.toByteArray)) + } + + /** + * Given the class name, return whether we should look into the class or not. This is used to + * skip examing a large quantity of Java or Scala classes that we know for sure wouldn't access + * the closures. Note that the class name is expected in ASM style (i.e. use "/" instead of "."). + */ + private def skipClass(className: String): Boolean = { + val c = className + c.startsWith("java/") || c.startsWith("scala/") || c.startsWith("javax/") + } + + /** + * Find the set of methods invoked by the specified method in the specified class. + * For example, after running the visitor, + * MethodInvocationFinder("spark/graph/Foo", "test") + * its methodsInvoked variable will contain the set of methods invoked directly by + * Foo.test(). Interface invocations are not returned as part of the result set because we cannot + * determine the actual metod invoked by inspecting the bytecode. + */ + private class MethodInvocationFinder(className: String, methodName: String) + extends ClassVisitor(ASM4) { + + val methodsInvoked = new HashSet[(Class[_], String)] + + override def visitMethod(access: Int, name: String, desc: String, + sig: String, exceptions: Array[String]): MethodVisitor = { + if (name == methodName) { + new MethodVisitor(ASM4) { + override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { + if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) { + if (!skipClass(owner)) { + methodsInvoked.add((Class.forName(owner.replace("/", ".")), name)) + } + } + } + } + } else { + null + } + } + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala new file mode 100644 index 0000000000..d75a678b26 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -0,0 +1,252 @@ +package org.apache.spark.graph.util + +import util._ +import math._ +import scala.annotation.tailrec +//import scala.collection.mutable + + +import org.apache.spark._ +import org.apache.spark.serializer._ +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.graph.Graph +import org.apache.spark.graph.Edge +import org.apache.spark.graph.impl.GraphImpl + + +// TODO(crankshaw) I might want to pull at least RMAT out into a separate class. +// Might simplify the code to have classwide variables and such. +object GraphGenerators { + + val RMATa = 0.45 + val RMATb = 0.15 + val RMATc = 0.15 + val RMATd = 0.25 + + def main(args: Array[String]) { + + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") + val host = "local[4]" + val sc = new SparkContext(host, "Lognormal graph generator") + + val lnGraph = logNormalGraph(sc, 10000) + + val rmat = rmatGraph(sc, 1000, 3000) + + //for (v <- lnGraph.vertices) { + // println(v.id + ":\t" + v.data) + //} + + val times = 100000 + //val nums = (1 to times).flatMap { n => List(sampleLogNormal(4.0, 1.3, times)) }.toList + //val avg = nums.sum / nums.length + //val sumSquares = nums.foldLeft(0.0) {(total, next) => + // (total + math.pow((next - avg), 2)) } + //val stdev = math.sqrt(sumSquares/(nums.length - 1)) + + //println("avg: " + avg + "+-" + stdev) + + + //for (i <- 1 to 1000) { + // println(sampleLogNormal(4.0, 1.3, 1000)) + //} + + sc.stop() + + } + + + // Right now it just generates a bunch of edges where + // the edge data is the weight (default 1) + def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = { + // based on Pregel settings + val mu = 4 + val sigma = 1.3 + //val vertsAndEdges = (0 until numVertices).flatMap { src => { + + val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{ + src => (src, sampleLogNormal(mu, sigma, numVertices)) + } + + val edges = vertices.flatMap{ + v => generateRandomEdges(v._1.toInt, v._2, numVertices) + } + + GraphImpl(vertices, edges) + //println("Vertices:") + //for (v <- vertices) { + // println(v.id) + //} + + //println("Edges") + //for (e <- edges) { + // println(e.src, e.dst, e.data) + //} + + } + + + def generateRandomEdges(src: Int, numEdges: Int, maxVid: Int): Array[Edge[Int]] = { + val rand = new Random() + var dsts: Set[Int] = Set() + while (dsts.size < numEdges) { + val nextDst = rand.nextInt(maxVid) + if (nextDst != src) { + dsts += nextDst + } + } + dsts.map {dst => Edge[Int](src, dst, 1) }.toArray + } + + + /** + * Randomly samples from a log normal distribution + * whose corresponding normal distribution has the + * the given mean and standard deviation. It uses + * the formula X = exp(m+s*Z) where m, s are the + * mean, standard deviation of the lognormal distribution + * and Z~N(0, 1). In this function, + * m = e^(mu+sigma^2/2) and + * s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]. + * + * @param mu the mean of the normal distribution + * @param sigma the standard deviation of the normal distribution + * @param macVal exclusive upper bound on the value of the sample + */ + def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = { + val rand = new Random() + val m = math.exp(mu+(sigma*sigma)/2.0) + val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma)) + // Z ~ N(0, 1) + var X: Double = maxVal + + while (X >= maxVal) { + val Z = rand.nextGaussian() + //X = math.exp((m + s*Z)) + X = math.exp((mu + sigma*Z)) + } + math.round(X.toFloat) + } + + + + def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): GraphImpl[Int, Int] = { + // let N = requestedNumVertices + // the number of vertices is 2^n where n=ceil(log2[N]) + // This ensures that the 4 quadrants are the same size at all recursion levels + val numVertices = math.round(math.pow(2.0, math.ceil(math.log(requestedNumVertices)/math.log(2.0)))).toInt + var edges: Set[Edge[Int]] = Set() + while (edges.size < numEdges) { + if (edges.size % 100 == 0) { + println(edges.size + " edges") + } + edges += addEdge(numVertices) + + } + val graph = outDegreeFromEdges(sc.parallelize(edges.toList)) + graph + + } + + def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { + + val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } + .reduceByKey(_ + _) + .map{ case (vid, degree) => (vid, degree) } + GraphImpl(vertices, edges) + } + + /** + * @param numVertices Specifies the total number of vertices in the graph (used to get + * the dimensions of the adjacency matrix + */ + def addEdge(numVertices: Int): Edge[Int] = { + //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0) + val v = math.round(numVertices.toFloat/2.0).toInt + + val (src, dst) = chooseCell(v, v, v) + Edge[Int](src, dst, 1) + } + + + /** + * This method recursively subdivides the the adjacency matrix into quadrants + * until it picks a single cell. The naming conventions in this paper match + * those of the R-MAT paper. There are a power of 2 number of nodes in the graph. + * The adjacency matrix looks like: + * + * dst -> + * (x,y) *************** _ + * | | | | + * | a | b | | + * src | | | | + * | *************** | T + * \|/ | | | | + * | c | d | | + * | | | | + * *************** - + * + * where this represents the subquadrant of the adj matrix currently being + * subdivided. (x,y) represent the upper left hand corner of the subquadrant, + * and T represents the side length (guaranteed to be a power of 2). + * + * After choosing the next level subquadrant, we get the resulting sets + * of parameters: + * quad = a, x'=x, y'=y, T'=T/2 + * quad = b, x'=x+T/2, y'=y, T'=T/2 + * quad = c, x'=x, y'=y+T/2, T'=T/2 + * quad = d, x'=x+T/2, y'=y+T/2, T'=T/2 + * + * @param src is the + */ + @tailrec + def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = { + if (t <= 1) + (x,y) + else { + val newT = math.round(t.toFloat/2.0).toInt + pickQuadrant(RMATa, RMATb, RMATc, RMATd) match { + case 0 => chooseCell(x, y, newT) + case 1 => chooseCell(x+newT, y, newT) + case 2 => chooseCell(x, y+newT, newT) + case 3 => chooseCell(x+newT, y+newT, newT) + } + } + } + + // TODO(crankshaw) turn result into an enum (or case class for pattern matching} + def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = { + if (a+b+c+d != 1.0) { + throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0") + } + val rand = new Random() + val result = rand.nextDouble() + result match { + case x if x < a => 0 // 0 corresponds to quadrant a + case x if (x >= a && x < a+b) => 1 // 1 corresponds to b + case x if (x >= a+b && x < a+b+c) => 2 // 2 corresponds to c + case _ => 3 // 3 corresponds to d + } + } + + + +} + + + + + + + + + + + diff --git a/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala new file mode 100644 index 0000000000..cb18ef3d26 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala @@ -0,0 +1,21 @@ +package org.apache.spark.graph.util + + +object HashUtils { + + /** + * Compute a 64-bit hash value for the given string. + * See http://stackoverflow.com/questions/1660501/what-is-a-good-64bit-hash-function-in-java-for-textual-strings + */ + def hash(str: String): Long = { + var h = 1125899906842597L + val len = str.length + var i = 0 + + while (i < len) { + h = 31 * h + str(i) + i += 1 + } + h + } +} diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala new file mode 100644 index 0000000000..145be3c126 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -0,0 +1,104 @@ +package org.apache.spark.graph + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.LocalSparkContext._ + + +class GraphSuite extends FunSuite with LocalSparkContext { + +// val sc = new SparkContext("local[4]", "test") + + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + test("Graph Creation") { + withSpark(new SparkContext("local", "test")) { sc => + val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) + val edges = sc.parallelize(rawEdges) + val graph = Graph(edges) + assert( graph.edges.count() === rawEdges.size ) + } + } + + test("aggregateNeighbors") { + withSpark(new SparkContext("local", "test")) { sc => + val star = Graph(sc.parallelize(List((0, 1), (0, 2), (0, 3)))) + + val indegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.In)// .map((vid, attr) => (vid, attr._2.getOrElse(0))) + assert(indegrees.collect().toSet === Set((1, 1), (2, 1), (3, 1))) // (0, 0), + + val outdegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.Out) //.map((vid, attr) => (vid, attr._2.getOrElse(0))) + assert(outdegrees.collect().toSet === Set((0, 3))) //, (1, 0), (2, 0), (3, 0))) + + val noVertexValues = star.aggregateNeighbors[Int]( + (vid: Vid, edge: EdgeTriplet[Int, Int]) => None, + (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), + EdgeDirection.In)//.map((vid, attr) => (vid, attr)) + assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)] ) // ((0, None), (1, None), (2, None), (3, None))) + } + } + + /* test("joinVertices") { + sc = new SparkContext("local", "test") + val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"), Vertex(3, "three")), 2) + val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) + val g: Graph[String, String] = new GraphImpl(vertices, edges) + + val tbl = sc.parallelize(Seq((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl, (v: Vertex[String], u: Int) => v.data + u) + + val v = g1.vertices.collect().sortBy(_.id) + assert(v(0).data === "one10") + assert(v(1).data === "two20") + assert(v(2).data === "three") + + val e = g1.edges.collect() + assert(e(0).data === "onetwo") + } + */ + +// test("graph partitioner") { +// sc = new SparkContext("local", "test") +// val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two"))) +// val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge"))) +// var g = Graph(vertices, edges) +// +// g = g.withPartitioner(4, 7) +// assert(g.numVertexPartitions === 4) +// assert(g.numEdgePartitions === 7) +// +// g = g.withVertexPartitioner(5) +// assert(g.numVertexPartitions === 5) +// +// g = g.withEdgePartitioner(8) +// assert(g.numEdgePartitions === 8) +// +// g = g.mapVertices(x => x) +// assert(g.numVertexPartitions === 5) +// assert(g.numEdgePartitions === 8) +// +// g = g.mapEdges(x => x) +// assert(g.numVertexPartitions === 5) +// assert(g.numEdgePartitions === 8) +// +// val updates = sc.parallelize(Seq((1, " more"))) +// g = g.updateVertices( +// updates, +// (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data) +// assert(g.numVertexPartitions === 5) +// assert(g.numEdgePartitions === 8) +// +// g = g.reverse +// assert(g.numVertexPartitions === 5) +// assert(g.numEdgePartitions === 8) +// +// } +} diff --git a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala new file mode 100644 index 0000000000..4a0155b6bd --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala @@ -0,0 +1,44 @@ +package org.apache.spark.graph + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkContext + + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ +trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => + + @transient var sc: SparkContext = _ + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() = { + if (sc != null) { + LocalSparkContext.stop(sc) + sc = null + } + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + sc.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { + try { + f(sc) + } finally { + stop(sc) + } + } + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala new file mode 100644 index 0000000000..d85e877ddf --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala @@ -0,0 +1,93 @@ +package org.apache.spark.graph.util + +import org.scalatest.FunSuite + + +class BytecodeUtilsSuite extends FunSuite { + + import BytecodeUtilsSuite.TestClass + + test("closure invokes a method") { + val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + + val c2 = {e: TestClass => println(e.foo); println(e.bar); } + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) + assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) + + val c3 = {e: TestClass => println(e.foo); } + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) + } + + test("closure inside a closure invokes a method") { + val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } + val c2 = {e: TestClass => c1(e); println(e.foo); } + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) + } + + test("closure inside a closure inside a closure invokes a method") { + val c1 = {e: TestClass => println(e.baz); } + val c2 = {e: TestClass => c1(e); println(e.foo); } + val c3 = {e: TestClass => c2(e) } + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) + } + + test("closure calling a function that invokes a method") { + def zoo(e: TestClass) { + println(e.baz) + } + val c1 = {e: TestClass => zoo(e)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + test("closure calling a function that invokes a method which uses another closure") { + val c2 = {e: TestClass => println(e.baz)} + def zoo(e: TestClass) { + c2(e) + } + val c1 = {e: TestClass => zoo(e)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + test("nested closure") { + val c2 = {e: TestClass => println(e.baz)} + def zoo(e: TestClass, c: TestClass => Unit) { + c(e) + } + val c1 = {e: TestClass => zoo(e, c2)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + // The following doesn't work yet, because the byte code doesn't contain any information + // about what exactly "c" is. +// test("invoke interface") { +// val c1 = {e: TestClass => c(e)} +// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) +// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) +// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) +// } + + private val c = {e: TestClass => println(e.baz)} +} + + +object BytecodeUtilsSuite { + class TestClass(val foo: Int, val bar: Long) { + def baz: Boolean = false + } +} @@ -87,6 +87,7 @@ <modules> <module>core</module> <module>bagel</module> + <module>graph</module> <module>examples</module> <module>mllib</module> <module>tools</module> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f2bbe5358f..079e698ea0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -42,15 +42,17 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) lazy val repl = Project("repl", file("repl"), settings = replSettings) - .dependsOn(core, bagel, mllib) + .dependsOn(core, graph, bagel, mllib) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) + .dependsOn(core, mllib, graph, bagel, streaming) lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core) + lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core) + lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core) lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) @@ -58,7 +60,7 @@ object SparkBuild extends Build { lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) - .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) + .dependsOn(core, graph, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) // A configuration to set an alternative publishLocalConfiguration lazy val MavenCompile = config("m2r") extend(Compile) @@ -75,7 +77,7 @@ object SparkBuild extends Build { lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() lazy val allProjects = Seq[ProjectReference]( - core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef + core, repl, examples, graph, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", @@ -259,6 +261,10 @@ object SparkBuild extends Build { name := "spark-tools" ) + def graphSettings = sharedSettings ++ Seq( + name := "spark-graphx" + ) + def bagelSettings = sharedSettings ++ Seq( name := "spark-bagel" ) |