aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala4
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala4
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala9
-rw-r--r--examples/src/main/java/spark/examples/JavaTC.java13
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala10
8 files changed, 25 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 284e06ae58..29912c7266 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -54,7 +54,6 @@ class SparkContext(
def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil)
-
// Ensure logging is initialized before we spawn any threads
initLogging()
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index ef2fa74eb7..9a90d0af79 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -10,11 +10,12 @@ import java.lang.Double
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
- val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
+ override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
- lazy val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
+ override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
- def wrapRDD: (RDD[Double]) => JavaDoubleRDD = rdd => new JavaDoubleRDD(rdd.map(_.doubleValue))
+ override def wrapRDD(rdd: RDD[Double]): JavaDoubleRDD =
+ new JavaDoubleRDD(rdd.map(_.doubleValue))
// Common RDD functions
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 4a909afdc9..99d1b1e208 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -22,9 +22,9 @@ import org.apache.hadoop.conf.Configuration
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
- def wrapRDD = JavaPairRDD.fromRDD _
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
- def classManifest: ClassManifest[(K, V)] =
+ override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
import JavaPairRDD._
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 194ea4a2a6..598d4cf15b 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -6,7 +6,7 @@ import spark.api.java.function.{Function => JFunction}
class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
- def wrapRDD: (RDD[T]) => JavaRDD[T] = JavaRDD.fromRDD
+ override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
// Common RDD functions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 5d7ce1895b..1c6948eb7f 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -13,9 +13,9 @@ import java.lang
import scala.Tuple2
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
- def wrapRDD: (RDD[T] => This)
+ def wrapRDD(rdd: RDD[T]): This
- implicit def classManifest: ClassManifest[T]
+ implicit val classManifest: ClassManifest[T]
def rdd: RDD[T]
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 11d9883c95..00e1aff58c 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -174,15 +174,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
- @Override
- def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
+ override def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd)
implicit val cm: ClassManifest[T] = jrdds.head.classManifest
sc.union(rdds: _*)(cm)
}
- @Override
- def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = {
+ override def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd)
implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest
implicit val kcm: ClassManifest[K] = jrdds.head.kManifest
@@ -190,8 +188,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
}
- @Override
- def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
+ override def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds: _*))
}
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
index d76bcbbe85..e6ca69ff97 100644
--- a/examples/src/main/java/spark/examples/JavaTC.java
+++ b/examples/src/main/java/spark/examples/JavaTC.java
@@ -45,14 +45,14 @@ public class JavaTC {
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC");
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
- JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices);
+ JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
// Linear transitive closure: each round grows paths by one edge,
// by joining the graph's edges with the already-discovered paths.
// e.g. join the path (y, z) from the TC with the edge (x, y) from
// the graph to obtain the path (x, z).
- // Because join() joins on keys, the edges are stored in reversed order.
+ // Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>,
Integer, Integer>() {
@Override
@@ -62,13 +62,14 @@ public class JavaTC {
});
long oldCount = 0;
+ long nextCount = tc.count();
do {
- oldCount = tc.count();
+ oldCount = nextCount;
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
-
- tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct();
- } while (tc.count() != oldCount);
+ tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache();
+ nextCount = tc.count();
+ } while (nextCount != oldCount);
System.out.println("TC has " + tc.count() + " edges.");
System.exit(0);
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
index fa945b5082..a095476a23 100644
--- a/examples/src/main/scala/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -28,7 +28,7 @@ object SparkTC {
}
val spark = new SparkContext(args(0), "SparkTC")
val slices = if (args.length > 1) args(1).toInt else 2
- var tc = spark.parallelize(generateGraph, slices)
+ var tc = spark.parallelize(generateGraph, slices).cache()
// Linear transitive closure: each round grows paths by one edge,
// by joining the graph's edges with the already-discovered paths.
@@ -40,12 +40,14 @@ object SparkTC {
// This join is iterated until a fixed point is reached.
var oldCount = 0L
+ var nextCount = tc.count()
do {
- oldCount = tc.count()
+ oldCount = nextCount
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
- tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct()
- } while (tc.count() != oldCount)
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
+ nextCount = tc.count()
+ } while (nextCount != oldCount)
println("TC has " + tc.count() + " edges.")
System.exit(0)