aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-03 12:34:27 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-03 12:34:27 -0700
commit6a1d48f4f02c4498b64439c3dd5f671286a90e30 (patch)
tree0b22a278418d9f1d8a6decf3f15aafab0de3dd84 /sql/core/src/main
parentbec0d0eaa33811fde72b84f7d53a6f6031e7b5d3 (diff)
downloadspark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.tar.gz
spark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.tar.bz2
spark-6a1d48f4f02c4498b64439c3dd5f671286a90e30.zip
[SPARK-3212][SQL] Use logical plan matching instead of temporary tables for table caching
_Also addresses: SPARK-1671, SPARK-1379 and SPARK-3641_ This PR introduces a new trait, `CacheManger`, which replaces the previous temporary table based caching system. Instead of creating a temporary table that shadows an existing table with and equivalent cached representation, the cached manager maintains a separate list of logical plans and their cached data. After optimization, this list is searched for any matching plan fragments. When a matching plan fragment is found it is replaced with the cached data. There are several advantages to this approach: - Calling .cache() on a SchemaRDD now works as you would expect, and uses the more efficient columnar representation. - Its now possible to provide a list of temporary tables, without having to decide if a given table is actually just a cached persistent table. (To be done in a follow-up PR) - In some cases it is possible that cached data will be used, even if a cached table was not explicitly requested. This is because we now look at the logical structure instead of the table name. - We now correctly invalidate when data is inserted into a hive table. Author: Michael Armbrust <michael@databricks.com> Closes #2501 from marmbrus/caching and squashes the following commits: 63fbc2c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching. 0ea889e [Michael Armbrust] Address comments. 1e23287 [Michael Armbrust] Add support for cache invalidation for hive inserts. 65ed04a [Michael Armbrust] fix tests. bdf9a3f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into caching b4b77f2 [Michael Armbrust] Address comments 6923c9d [Michael Armbrust] More comments / tests 80f26ac [Michael Armbrust] First draft of improved semantics for Spark SQL caching.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala139
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala119
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala39
10 files changed, 323 insertions, 133 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
new file mode 100644
index 0000000000..aebdbb68e4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+
+/** Holds a cached logical plan and its data */
+private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
+
+/**
+ * Provides support in a SQLContext for caching query results and automatically using these cached
+ * results when subsequent queries are executed. Data is cached using byte buffers stored in an
+ * InMemoryRelation. This relation is automatically substituted query plans that return the
+ * `sameResult` as the originally cached query.
+ */
+private[sql] trait CacheManager {
+ self: SQLContext =>
+
+ @transient
+ private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
+
+ @transient
+ private val cacheLock = new ReentrantReadWriteLock
+
+ /** Returns true if the table is currently cached in-memory. */
+ def isCached(tableName: String): Boolean = lookupCachedData(table(tableName)).nonEmpty
+
+ /** Caches the specified table in-memory. */
+ def cacheTable(tableName: String): Unit = cacheQuery(table(tableName))
+
+ /** Removes the specified table from the in-memory cache. */
+ def uncacheTable(tableName: String): Unit = uncacheQuery(table(tableName))
+
+ /** Acquires a read lock on the cache for the duration of `f`. */
+ private def readLock[A](f: => A): A = {
+ val lock = cacheLock.readLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Acquires a write lock on the cache for the duration of `f`. */
+ private def writeLock[A](f: => A): A = {
+ val lock = cacheLock.writeLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ private[sql] def clearCache(): Unit = writeLock {
+ cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
+ cachedData.clear()
+ }
+
+ /** Caches the data produced by the logical representation of the given schema rdd. */
+ private[sql] def cacheQuery(
+ query: SchemaRDD,
+ storageLevel: StorageLevel = MEMORY_ONLY): Unit = writeLock {
+ val planToCache = query.queryExecution.optimizedPlan
+ if (lookupCachedData(planToCache).nonEmpty) {
+ logWarning("Asked to cache already cached data.")
+ } else {
+ cachedData +=
+ CachedData(
+ planToCache,
+ InMemoryRelation(
+ useCompression, columnBatchSize, storageLevel, query.queryExecution.executedPlan))
+ }
+ }
+
+ /** Removes the data for the given SchemaRDD from the cache */
+ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock {
+ val planToCache = query.queryExecution.optimizedPlan
+ val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
+
+ if (dataIndex < 0) {
+ throw new IllegalArgumentException(s"Table $query is not cached.")
+ }
+
+ cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+ cachedData.remove(dataIndex)
+ }
+
+
+ /** Optionally returns cached data for the given SchemaRDD */
+ private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
+ lookupCachedData(query.queryExecution.optimizedPlan)
+ }
+
+ /** Optionally returns cached data for the given LogicalPlan. */
+ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
+ cachedData.find(_.plan.sameResult(plan))
+ }
+
+ /** Replaces segments of the given logical plan with cached versions where possible. */
+ private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
+ plan transformDown {
+ case currentFragment =>
+ lookupCachedData(currentFragment)
+ .map(_.cachedRepresentation.withOutput(currentFragment.output))
+ .getOrElse(currentFragment)
+ }
+ }
+
+ /**
+ * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
+ * function will over invalidate.
+ */
+ private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
+ cachedData.foreach {
+ case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
+ data.cachedRepresentation.recache()
+ case _ =>
+ }
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a42bedbe6c..7a55c5bf97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -50,6 +50,7 @@ import org.apache.spark.{Logging, SparkContext}
class SQLContext(@transient val sparkContext: SparkContext)
extends org.apache.spark.Logging
with SQLConf
+ with CacheManager
with ExpressionConversions
with UDFRegistration
with Serializable {
@@ -96,7 +97,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
SparkPlan.currentContext.set(self)
- new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
+ new SchemaRDD(this,
+ LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self))
}
/**
@@ -133,7 +135,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD = {
// TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
// schema differs from the existing schema on any field data type.
- val logicalPlan = SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRDD))(self)
+ val logicalPlan = LogicalRDD(schema.toAttributes, rowRDD)(self)
new SchemaRDD(this, logicalPlan)
}
@@ -272,45 +274,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def table(tableName: String): SchemaRDD =
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
- /** Caches the specified table in-memory. */
- def cacheTable(tableName: String): Unit = {
- val currentTable = table(tableName).queryExecution.analyzed
- val asInMemoryRelation = currentTable match {
- case _: InMemoryRelation =>
- currentTable
-
- case _ =>
- InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
- }
-
- catalog.registerTable(None, tableName, asInMemoryRelation)
- }
-
- /** Removes the specified table from the in-memory cache. */
- def uncacheTable(tableName: String): Unit = {
- table(tableName).queryExecution.analyzed match {
- // This is kind of a hack to make sure that if this was just an RDD registered as a table,
- // we reregister the RDD as a table.
- case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
- inMem.cachedColumnBuffers.unpersist()
- catalog.unregisterTable(None, tableName)
- catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
- case inMem: InMemoryRelation =>
- inMem.cachedColumnBuffers.unpersist()
- catalog.unregisterTable(None, tableName)
- case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
- }
- }
-
- /** Returns true if the table is currently cached in-memory. */
- def isCached(tableName: String): Boolean = {
- val relation = table(tableName).queryExecution.analyzed
- relation match {
- case _: InMemoryRelation => true
- case _ => false
- }
- }
-
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
@@ -401,10 +364,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
lazy val optimizedPlan = optimizer(analyzed)
+ lazy val withCachedData = useCachedData(optimizedPlan)
+
// TODO: Don't just pick the first one...
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
- planner(optimizedPlan).next()
+ planner(withCachedData).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
@@ -526,6 +491,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
iter.map { m => new GenericRow(m): Row}
}
- new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRdd))(self))
+ new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 3b873f7c62..594bf8ffc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import java.util.{Map => JMap, List => JList}
+import org.apache.spark.storage.StorageLevel
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.api.java.JavaRDD
/**
@@ -442,8 +444,7 @@ class SchemaRDD(
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext,
- SparkLogicalPlan(
- ExistingRdd(queryExecution.analyzed.output.map(_.newInstance), rdd))(sqlContext))
+ LogicalRDD(queryExecution.analyzed.output.map(_.newInstance()), rdd)(sqlContext))
}
// =======================================================================
@@ -497,4 +498,20 @@ class SchemaRDD(
override def subtract(other: RDD[Row], p: Partitioner)
(implicit ord: Ordering[Row] = null): SchemaRDD =
applySchema(super.subtract(other, p)(ord))
+
+ /** Overridden cache function will always use the in-memory columnar caching. */
+ override def cache(): this.type = {
+ sqlContext.cacheQuery(this)
+ this
+ }
+
+ override def persist(newLevel: StorageLevel): this.type = {
+ sqlContext.cacheQuery(this, newLevel)
+ this
+ }
+
+ override def unpersist(blocking: Boolean): this.type = {
+ sqlContext.uncacheQuery(this, blocking)
+ this
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index e52eeb3e1c..25ba7d88ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.execution.LogicalRDD
/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
@@ -55,8 +55,7 @@ private[sql] trait SchemaRDDLike {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
- queryExecution.toRdd
- SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
+ LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 150ff8a420..c006c4330f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.util.DataTypeConversions.asScalaDataType
import org.apache.spark.util.Utils
@@ -100,7 +100,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
}
}
- new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
+ new JavaSchemaRDD(sqlContext, LogicalRDD(schema, rowRdd)(sqlContext))
}
/**
@@ -114,7 +114,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
val scalaRowRDD = rowRDD.rdd.map(r => r.row)
val scalaSchema = asScalaDataType(schema).asInstanceOf[SStructType]
val logicalPlan =
- SparkLogicalPlan(ExistingRdd(scalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ LogicalRDD(scalaSchema.toAttributes, scalaRowRDD)(sqlContext)
new JavaSchemaRDD(sqlContext, logicalPlan)
}
@@ -151,7 +151,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
val logicalPlan =
- SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
new JavaSchemaRDD(sqlContext, logicalPlan)
}
@@ -167,7 +167,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[SStructType]
val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
val logicalPlan =
- SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
new JavaSchemaRDD(sqlContext, logicalPlan)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 8a3612cdf1..cec82a7f2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -27,10 +27,15 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+import org.apache.spark.storage.StorageLevel
private[sql] object InMemoryRelation {
- def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, batchSize, child)()
+ def apply(
+ useCompression: Boolean,
+ batchSize: Int,
+ storageLevel: StorageLevel,
+ child: SparkPlan): InMemoryRelation =
+ new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
}
private[sql] case class CachedBatch(buffers: Array[ByteBuffer], stats: Row)
@@ -39,6 +44,7 @@ private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
batchSize: Int,
+ storageLevel: StorageLevel,
child: SparkPlan)
(private var _cachedColumnBuffers: RDD[CachedBatch] = null)
extends LogicalPlan with MultiInstanceRelation {
@@ -51,6 +57,16 @@ private[sql] case class InMemoryRelation(
// If the cached column buffers were not passed in, we calculate them in the constructor.
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
+ buildBuffers()
+ }
+
+ def recache() = {
+ _cachedColumnBuffers.unpersist()
+ _cachedColumnBuffers = null
+ buildBuffers()
+ }
+
+ private def buildBuffers(): Unit = {
val output = child.output
val cached = child.execute().mapPartitions { rowIterator =>
new Iterator[CachedBatch] {
@@ -80,12 +96,17 @@ private[sql] case class InMemoryRelation(
def hasNext = rowIterator.hasNext
}
- }.cache()
+ }.persist(storageLevel)
cached.setName(child.toString)
_cachedColumnBuffers = cached
}
+ def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
+ InMemoryRelation(
+ newOutput, useCompression, batchSize, storageLevel, child)(_cachedColumnBuffers)
+ }
+
override def children = Seq.empty
override def newInstance() = {
@@ -93,6 +114,7 @@ private[sql] case class InMemoryRelation(
output.map(_.newInstance),
useCompression,
batchSize,
+ storageLevel,
child)(
_cachedColumnBuffers).asInstanceOf[this.type]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
new file mode 100644
index 0000000000..2ddf513b6f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SQLContext, Row}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+object RDDConversions {
+ def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
+ data.mapPartitions { iterator =>
+ if (iterator.isEmpty) {
+ Iterator.empty
+ } else {
+ val bufferedIterator = iterator.buffered
+ val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
+
+ bufferedIterator.map { r =>
+ var i = 0
+ while (i < mutableRow.length) {
+ mutableRow(i) = ScalaReflection.convertToCatalyst(r.productElement(i))
+ i += 1
+ }
+
+ mutableRow
+ }
+ }
+ }
+ }
+
+ /*
+ def toLogicalPlan[A <: Product : TypeTag](productRdd: RDD[A]): LogicalPlan = {
+ LogicalRDD(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
+ }
+ */
+}
+
+case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
+ extends LogicalPlan with MultiInstanceRelation {
+
+ def children = Nil
+
+ def newInstance() =
+ LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type]
+
+ override def sameResult(plan: LogicalPlan) = plan match {
+ case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
+ case _ => false
+ }
+
+ @transient override lazy val statistics = Statistics(
+ // TODO: Instead of returning a default value here, find a way to return a meaningful size
+ // estimate for RDDs. See PR 1238 for more discussions.
+ sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+ )
+}
+
+case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
+ override def execute() = rdd
+}
+
+@deprecated("Use LogicalRDD", "1.2.0")
+case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
+ override def execute() = rdd
+}
+
+@deprecated("Use LogicalRDD", "1.2.0")
+case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
+ extends LogicalPlan with MultiInstanceRelation {
+
+ def output = alreadyPlanned.output
+ override def children = Nil
+
+ override final def newInstance(): this.type = {
+ SparkLogicalPlan(
+ alreadyPlanned match {
+ case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
+ case _ => sys.error("Multiple instance of the same relation detected.")
+ })(sqlContext).asInstanceOf[this.type]
+ }
+
+ override def sameResult(plan: LogicalPlan) = plan match {
+ case SparkLogicalPlan(ExistingRdd(_, rdd)) =>
+ rdd.id == alreadyPlanned.asInstanceOf[ExistingRdd].rdd.id
+ case _ => false
+ }
+
+ @transient override lazy val statistics = Statistics(
+ // TODO: Instead of returning a default value here, find a way to return a meaningful size
+ // estimate for RDDs. See PR 1238 for more discussions.
+ sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+ )
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2b8913985b..b1a7948b66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -126,39 +126,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}
-/**
- * :: DeveloperApi ::
- * Allows already planned SparkQueries to be linked into logical query plans.
- *
- * Note that in general it is not valid to use this class to link multiple copies of the same
- * physical operator into the same query plan as this violates the uniqueness of expression ids.
- * Special handling exists for ExistingRdd as these are already leaf operators and thus we can just
- * replace the output attributes with new copies of themselves without breaking any attribute
- * linking.
- */
-@DeveloperApi
-case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
- extends LogicalPlan with MultiInstanceRelation {
-
- def output = alreadyPlanned.output
- override def children = Nil
-
- override final def newInstance(): this.type = {
- SparkLogicalPlan(
- alreadyPlanned match {
- case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
- case _ => sys.error("Multiple instance of the same relation detected.")
- })(sqlContext).asInstanceOf[this.type]
- }
-
- @transient override lazy val statistics = Statistics(
- // TODO: Instead of returning a default value here, find a way to return a meaningful size
- // estimate for RDDs. See PR 1238 for more discussions.
- sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
- )
-
-}
-
private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
self: Product =>
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 45687d9604..cf93d5ad7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -272,10 +272,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil
+ case SparkLogicalPlan(alreadyPlanned) => alreadyPlanned :: Nil
case logical.LocalRelation(output, data) =>
- ExistingRdd(
+ PhysicalRDD(
output,
- ExistingRdd.productToRowRdd(sparkContext.parallelize(data, numPartitions))) :: Nil
+ RDDConversions.productToRowRdd(sparkContext.parallelize(data, numPartitions))) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child)) :: Nil
case Unions(unionChildren) =>
@@ -287,12 +288,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.NoRelation =>
- execution.ExistingRdd(Nil, singleRowRdd) :: Nil
+ execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
- case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
+ case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index cac376608b..977f3c9f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -212,45 +212,6 @@ case class Sort(
/**
* :: DeveloperApi ::
- */
-@DeveloperApi
-object ExistingRdd {
- def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
- data.mapPartitions { iterator =>
- if (iterator.isEmpty) {
- Iterator.empty
- } else {
- val bufferedIterator = iterator.buffered
- val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
-
- bufferedIterator.map { r =>
- var i = 0
- while (i < mutableRow.length) {
- mutableRow(i) = ScalaReflection.convertToCatalyst(r.productElement(i))
- i += 1
- }
-
- mutableRow
- }
- }
- }
- }
-
- def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
- ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
- }
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
- override def execute() = rdd
-}
-
-/**
- * :: DeveloperApi ::
* Computes the set of distinct input rows using a HashSet.
* @param partial when true the distinct operation is performed partially, per partition, without
* shuffling the data.