aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala8
-rw-r--r--pom.xml11
-rw-r--r--project/SparkBuild.scala6
5 files changed, 48 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 5fd1fab580..f9b6ee351a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -48,6 +48,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
+ */
+ def unpersist(): JavaDoubleRDD = fromRDD(srdd.unpersist())
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaDoubleRDD = fromRDD(srdd.unpersist(blocking))
+
// first() has to be overriden here in order for its return type to be Double instead of Object.
override def first(): Double = srdd.first()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a6518abf45..268f43b4e8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -65,6 +65,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.persist(newLevel))
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
+ */
+ def unpersist(): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist())
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist(blocking))
+
// Transformations (return a new RDD)
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index eec58abdd6..662990049b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -41,9 +41,17 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
*/
def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaRDD[T] = wrapRDD(rdd.unpersist(blocking))
+
// Transformations (return a new RDD)
/**
diff --git a/pom.xml b/pom.xml
index 54f100c37f..53ac82efd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,17 @@
<enabled>false</enabled>
</snapshots>
</repository>
+ <repository>
+ <id>mqtt-repo</id>
+ <name>MQTT Repository</name>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
</repositories>
<pluginRepositories>
<pluginRepository>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 20f2c018fa..8d7cbae821 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -108,9 +108,9 @@ object SparkBuild extends Build {
// Shared between both core and streaming.
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
- // Shared between both examples and streaming.
+ // Shared between both examples and streaming.
resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
-
+
// For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@@ -289,7 +289,7 @@ object SparkBuild extends Build {
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),
- "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
+ "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
)