aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-05-07 09:41:31 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-07 09:41:31 -0700
commit967635a2425a769b932eea0984fe697d6721cab0 (patch)
tree4375459b9bef590cc05e9470926fe273921851dc
parent3eb53bd59e828275471d41730e6de601a887416d (diff)
downloadspark-967635a2425a769b932eea0984fe697d6721cab0.tar.gz
spark-967635a2425a769b932eea0984fe697d6721cab0.tar.bz2
spark-967635a2425a769b932eea0984fe697d6721cab0.zip
[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...
... that do not change schema Author: Kan Zhang <kzhang@apache.org> Closes #448 from kanzhang/SPARK-1460 and squashes the following commits: 111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD 91dc787 [Kan Zhang] Taking into account newly added Ordering param 79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala10
-rw-r--r--project/MimaBuild.scala2
-rw-r--r--python/pyspark/sql.py29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala67
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala140
7 files changed, 246 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3b3524f33e..a1ca612cc9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null
/** Assign a name to this RDD */
- def setName(_name: String): RDD[T] = {
+ def setName(_name: String): this.type = {
name = _name
this
}
@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
- def persist(newLevel: StorageLevel): RDD[T] = {
+ def persist(newLevel: StorageLevel): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
throw new UnsupportedOperationException(
@@ -152,10 +152,10 @@ abstract class RDD[T: ClassTag](
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
+ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def cache(): RDD[T] = persist()
+ def cache(): this.type = persist()
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
@@ -163,7 +163,7 @@ abstract class RDD[T: ClassTag](
* @param blocking Whether to block until all blocks are deleted.
* @return This RDD.
*/
- def unpersist(blocking: Boolean = true): RDD[T] = {
+ def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 6d04bf790e..fa78ca99b8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
- override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
+ override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
}
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def cache(): EdgeRDD[ED] = persist()
-
- override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
+ override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index d6788d4d4b..f0fc605c88 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)
- override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
+ override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
}
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def cache(): VertexRDD[VD] = persist()
-
- override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
+ override def unpersist(blocking: Boolean = true): this.type = {
partitionsRDD.unpersist(blocking)
this
}
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index d540dc0a98..efdb38e907 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -74,6 +74,8 @@ object MimaBuild {
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
+ excludeSparkClass("graphx.EdgeRDD") ++
+ excludeSparkClass("graphx.VertexRDD") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 1a62031db5..6789d7002b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -360,6 +360,35 @@ class SchemaRDD(RDD):
else:
return None
+ def coalesce(self, numPartitions, shuffle=False):
+ rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def distinct(self):
+ rdd = self._jschema_rdd.distinct()
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def intersection(self, other):
+ if (other.__class__ is SchemaRDD):
+ rdd = self._jschema_rdd.intersection(other._jschema_rdd)
+ return SchemaRDD(rdd, self.sql_ctx)
+ else:
+ raise ValueError("Can only intersect with another SchemaRDD")
+
+ def repartition(self, numPartitions):
+ rdd = self._jschema_rdd.repartition(numPartitions)
+ return SchemaRDD(rdd, self.sql_ctx)
+
+ def subtract(self, other, numPartitions=None):
+ if (other.__class__ is SchemaRDD):
+ if numPartitions is None:
+ rdd = self._jschema_rdd.subtract(other._jschema_rdd)
+ else:
+ rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
+ return SchemaRDD(rdd, self.sql_ctx)
+ else:
+ raise ValueError("Can only subtract another SchemaRDD")
+
def _test():
import doctest
from pyspark.context import SparkContext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index d7782d6b32..34200be3ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,14 +19,16 @@ package org.apache.spark.sql
import net.razorvine.pickle.Pickler
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}
@@ -296,6 +298,13 @@ class SchemaRDD(
*/
def toSchemaRDD = this
+ /**
+ * Returns this RDD as a JavaSchemaRDD.
+ *
+ * @group schema
+ */
+ def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
+
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
this.mapPartitions { iter =>
@@ -314,4 +323,60 @@ class SchemaRDD(
}
}
}
+
+ /**
+ * Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
+ * of base RDD functions that do not change schema.
+ *
+ * @param rdd RDD derived from this one and has same schema
+ *
+ * @group schema
+ */
+ private def applySchema(rdd: RDD[Row]): SchemaRDD = {
+ new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
+ }
+
+ // =======================================================================
+ // Base RDD functions that do NOT change schema
+ // =======================================================================
+
+ // Transformations (return a new RDD)
+
+ override def coalesce(numPartitions: Int, shuffle: Boolean = false)
+ (implicit ord: Ordering[Row] = null): SchemaRDD =
+ applySchema(super.coalesce(numPartitions, shuffle)(ord))
+
+ override def distinct(): SchemaRDD =
+ applySchema(super.distinct())
+
+ override def distinct(numPartitions: Int)
+ (implicit ord: Ordering[Row] = null): SchemaRDD =
+ applySchema(super.distinct(numPartitions)(ord))
+
+ override def filter(f: Row => Boolean): SchemaRDD =
+ applySchema(super.filter(f))
+
+ override def intersection(other: RDD[Row]): SchemaRDD =
+ applySchema(super.intersection(other))
+
+ override def intersection(other: RDD[Row], partitioner: Partitioner)
+ (implicit ord: Ordering[Row] = null): SchemaRDD =
+ applySchema(super.intersection(other, partitioner)(ord))
+
+ override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
+ applySchema(super.intersection(other, numPartitions))
+
+ override def repartition(numPartitions: Int)
+ (implicit ord: Ordering[Row] = null): SchemaRDD =
+ applySchema(super.repartition(numPartitions)(ord))
+
+ override def subtract(other: RDD[Row]): SchemaRDD =
+ applySchema(super.subtract(other))
+
+ override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
+ applySchema(super.subtract(other, numPartitions))
+
+ override def subtract(other: RDD[Row], p: Partitioner)
+ (implicit ord: Ordering[Row] = null): SchemaRDD =
+ applySchema(super.subtract(other, p)(ord))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index d43d672938..22f57b758d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.api.java
+import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
/**
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
@@ -45,4 +48,141 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
val rdd = baseSchemaRDD.map(new Row(_))
+
+ override def toString: String = baseSchemaRDD.toString
+
+ // =======================================================================
+ // Base RDD functions that do NOT change schema
+ // =======================================================================
+
+ // Common RDD functions
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ def cache(): JavaSchemaRDD = {
+ baseSchemaRDD.cache()
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ def persist(): JavaSchemaRDD = {
+ baseSchemaRDD.persist()
+ this
+ }
+
+ /**
+ * Set this RDD's storage level to persist its values across operations after the first time
+ * it is computed. This can only be used to assign a new storage level if the RDD does not
+ * have a storage level set yet..
+ */
+ def persist(newLevel: StorageLevel): JavaSchemaRDD = {
+ baseSchemaRDD.persist(newLevel)
+ this
+ }
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ * @return This RDD.
+ */
+ def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
+ baseSchemaRDD.unpersist(blocking)
+ this
+ }
+
+ /** Assign a name to this RDD */
+ def setName(name: String): JavaSchemaRDD = {
+ baseSchemaRDD.setName(name)
+ this
+ }
+
+ // Transformations (return a new RDD)
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
+ baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(): JavaSchemaRDD =
+ baseSchemaRDD.distinct().toJavaSchemaRDD
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(numPartitions: Int): JavaSchemaRDD =
+ baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
+
+ /**
+ * Return a new RDD containing only the elements that satisfy a predicate.
+ */
+ def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
+ baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
+
+ /**
+ * Return the intersection of this RDD and another one. The output will not contain any
+ * duplicate elements, even if the input RDDs did.
+ *
+ * Note that this method performs a shuffle internally.
+ */
+ def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
+ this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
+
+ /**
+ * Return the intersection of this RDD and another one. The output will not contain any
+ * duplicate elements, even if the input RDDs did.
+ *
+ * Note that this method performs a shuffle internally.
+ *
+ * @param partitioner Partitioner to use for the resulting RDD
+ */
+ def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
+ this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
+
+ /**
+ * Return the intersection of this RDD and another one. The output will not contain any
+ * duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster
+ *
+ * Note that this method performs a shuffle internally.
+ *
+ * @param numPartitions How many partitions to use in the resulting RDD
+ */
+ def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
+ this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
+
+ /**
+ * Return a new RDD that has exactly `numPartitions` partitions.
+ *
+ * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ * a shuffle to redistribute data.
+ *
+ * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ * which can avoid performing a shuffle.
+ */
+ def repartition(numPartitions: Int): JavaSchemaRDD =
+ baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
+ this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
+ this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
+ this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
}