diff options
8 files changed, 25 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 284e06ae58..29912c7266 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -54,7 +54,6 @@ class SparkContext( def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil) - // Ensure logging is initialized before we spawn any threads initLogging() diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index ef2fa74eb7..9a90d0af79 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -10,11 +10,12 @@ import java.lang.Double class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] { - val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]] + override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]] - lazy val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x)) + override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x)) - def wrapRDD: (RDD[Double]) => JavaDoubleRDD = rdd => new JavaDoubleRDD(rdd.map(_.doubleValue)) + override def wrapRDD(rdd: RDD[Double]): JavaDoubleRDD = + new JavaDoubleRDD(rdd.map(_.doubleValue)) // Common RDD functions diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 4a909afdc9..99d1b1e208 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -22,9 +22,9 @@ import org.apache.hadoop.conf.Configuration class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { - def wrapRDD = JavaPairRDD.fromRDD _ + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) - def classManifest: ClassManifest[(K, V)] = + override val classManifest: ClassManifest[(K, V)] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] import JavaPairRDD._ diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 194ea4a2a6..598d4cf15b 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -6,7 +6,7 @@ import spark.api.java.function.{Function => JFunction} class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends JavaRDDLike[T, JavaRDD[T]] { - def wrapRDD: (RDD[T]) => JavaRDD[T] = JavaRDD.fromRDD + override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) // Common RDD functions diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 5d7ce1895b..1c6948eb7f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -13,9 +13,9 @@ import java.lang import scala.Tuple2 trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { - def wrapRDD: (RDD[T] => This) + def wrapRDD(rdd: RDD[T]): This - implicit def classManifest: ClassManifest[T] + implicit val classManifest: ClassManifest[T] def rdd: RDD[T] diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 11d9883c95..00e1aff58c 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -174,15 +174,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } - @Override - def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = { + override def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd) implicit val cm: ClassManifest[T] = jrdds.head.classManifest sc.union(rdds: _*)(cm) } - @Override - def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = { + override def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = { val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd) implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest implicit val kcm: ClassManifest[K] = jrdds.head.kManifest @@ -190,8 +188,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm) } - @Override - def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { + override def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd) new JavaDoubleRDD(sc.union(rdds: _*)) } diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index d76bcbbe85..e6ca69ff97 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -45,14 +45,14 @@ public class JavaTC { JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; - JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices); + JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache(); // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. // e.g. join the path (y, z) from the TC with the edge (x, y) from // the graph to obtain the path (x, z). - // Because join() joins on keys, the edges are stored in reversed order. + // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { @Override @@ -62,13 +62,14 @@ public class JavaTC { }); long oldCount = 0; + long nextCount = tc.count(); do { - oldCount = tc.count(); + oldCount = nextCount; // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - - tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct(); - } while (tc.count() != oldCount); + tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache(); + nextCount = tc.count(); + } while (nextCount != oldCount); System.out.println("TC has " + tc.count() + " edges."); System.exit(0); diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index fa945b5082..a095476a23 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -28,7 +28,7 @@ object SparkTC { } val spark = new SparkContext(args(0), "SparkTC") val slices = if (args.length > 1) args(1).toInt else 2 - var tc = spark.parallelize(generateGraph, slices) + var tc = spark.parallelize(generateGraph, slices).cache() // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. @@ -40,12 +40,14 @@ object SparkTC { // This join is iterated until a fixed point is reached. var oldCount = 0L + var nextCount = tc.count() do { - oldCount = tc.count() + oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct() - } while (tc.count() != oldCount) + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache(); + nextCount = tc.count() + } while (nextCount != oldCount) println("TC has " + tc.count() + " edges.") System.exit(0) |