aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-06-12 23:09:41 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-12 23:09:41 -0700
commit13f8cfdc04589b986554310965e83fe658085683 (patch)
treedb175f4a7bc5e1187ddaf96a253d04b00349433e /sql/core
parentf95ac686bcba4e677254120735b0eb7a29f20d63 (diff)
downloadspark-13f8cfdc04589b986554310965e83fe658085683.tar.gz
spark-13f8cfdc04589b986554310965e83fe658085683.tar.bz2
spark-13f8cfdc04589b986554310965e83fe658085683.zip
[SPARK-2135][SQL] Use planner for in-memory scans
Author: Michael Armbrust <michael@databricks.com> Closes #1072 from marmbrus/cachedStars and squashes the following commits: 8757c8e [Michael Armbrust] Use planner for in-memory scans.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala6
6 files changed, 63 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 264192ed1a..b6a2f1b9d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
@@ -166,10 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
val asInMemoryRelation =
- InMemoryColumnarTableScan(
- currentTable.output, executePlan(currentTable).executedPlan, useCompression)
+ InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
- catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
+ catalog.registerTable(None, tableName, asInMemoryRelation)
}
/** Removes the specified table from the in-memory cache. */
@@ -177,11 +176,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
- case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
+ case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
- case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
+ case inMem: InMemoryRelation =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
@@ -192,7 +191,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def isCached(tableName: String): Boolean = {
val relation = catalog.lookupRelation(None, tableName)
EliminateAnalysisOperators(relation) match {
- case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+ case _: InMemoryRelation => true
case _ => false
}
}
@@ -208,6 +207,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
PartialAggregation ::
LeftSemiJoin ::
HashJoin ::
+ InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index fdf28e1bb1..e1e4f24c6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,18 +17,29 @@
package org.apache.spark.sql.columnar
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row
import org.apache.spark.SparkConf
-private[sql] case class InMemoryColumnarTableScan(
- attributes: Seq[Attribute],
- child: SparkPlan,
- useCompression: Boolean)
- extends LeafNode {
+object InMemoryRelation {
+ def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
+ new InMemoryRelation(child.output, useCompression, child)
+}
- override def output: Seq[Attribute] = attributes
+private[sql] case class InMemoryRelation(
+ output: Seq[Attribute],
+ useCompression: Boolean,
+ child: SparkPlan)
+ extends LogicalPlan with MultiInstanceRelation {
+
+ override def children = Seq.empty
+ override def references = Set.empty
+
+ override def newInstance() =
+ new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
lazy val cachedColumnBuffers = {
val output = child.output
@@ -55,14 +66,26 @@ private[sql] case class InMemoryColumnarTableScan(
cached.count()
cached
}
+}
+
+private[sql] case class InMemoryColumnarTableScan(
+ attributes: Seq[Attribute],
+ relation: InMemoryRelation)
+ extends LeafNode {
+
+ override def output: Seq[Attribute] = attributes
override def execute() = {
- cachedColumnBuffers.mapPartitions { iterator =>
+ relation.cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
new Iterator[Row] {
- val columnAccessors = columnBuffers.map(ColumnAccessor(_))
+ // Find the ordinals of the requested columns. If none are requested, use the first.
+ val requestedColumns =
+ if (attributes.isEmpty) Seq(0) else attributes.map(relation.output.indexOf(_))
+
+ val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
override def next() = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 4613df1039..07967fe75e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -77,8 +77,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
- case scan @ InMemoryColumnarTableScan(output, _, _) =>
- scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f2f95dfe27..a657911afe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
@@ -191,6 +192,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
+ object InMemoryScans extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
+ pruneFilterProject(
+ projectList,
+ filters,
+ identity[Seq[Expression]], // No filters are pushed down.
+ InMemoryColumnarTableScan(_, mem)) :: Nil
+ case _ => Nil
+ }
+ }
+
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions = self.numPartitions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index ebca3adc2f..c794da4da4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
-import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.test.TestSQLContext
class CachedTableSuite extends QueryTest {
@@ -34,7 +33,7 @@ class CachedTableSuite extends QueryTest {
)
TestSQLContext.table("testData").queryExecution.analyzed match {
- case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
+ case _ : InMemoryRelation => // Found evidence of caching
case noCache => fail(s"No cache node found in plan $noCache")
}
@@ -46,7 +45,7 @@ class CachedTableSuite extends QueryTest {
)
TestSQLContext.table("testData").queryExecution.analyzed match {
- case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+ case cachePlan: InMemoryRelation =>
fail(s"Table still cached after uncache: $cachePlan")
case noCache => // Table uncached successfully
}
@@ -61,13 +60,17 @@ class CachedTableSuite extends QueryTest {
test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.cacheTable("selectStar")
- TestSQLContext.sql("SELECT * FROM selectStar")
+ TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
TestSQLContext.uncacheTable("selectStar")
}
test("Self-join cached") {
+ val unCachedAnswer =
+ TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
TestSQLContext.cacheTable("testData")
- TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
+ checkAnswer(
+ TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
+ unCachedAnswer.toSeq)
TestSQLContext.uncacheTable("testData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 31c5dfba92..86727b93f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().toSeq)
}
test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)