aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-06-12 23:09:41 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-12 23:09:41 -0700
commit13f8cfdc04589b986554310965e83fe658085683 (patch)
treedb175f4a7bc5e1187ddaf96a253d04b00349433e
parentf95ac686bcba4e677254120735b0eb7a29f20d63 (diff)
downloadspark-13f8cfdc04589b986554310965e83fe658085683.tar.gz
spark-13f8cfdc04589b986554310965e83fe658085683.tar.bz2
spark-13f8cfdc04589b986554310965e83fe658085683.zip
[SPARK-2135][SQL] Use planner for in-memory scans
Author: Michael Armbrust <michael@databricks.com> Closes #1072 from marmbrus/cachedStars and squashes the following commits: 8757c8e [Michael Armbrust] Use planner for in-memory scans.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala6
10 files changed, 75 insertions, 35 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 264192ed1a..b6a2f1b9d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
@@ -166,10 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
val asInMemoryRelation =
- InMemoryColumnarTableScan(
- currentTable.output, executePlan(currentTable).executedPlan, useCompression)
+ InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
- catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
+ catalog.registerTable(None, tableName, asInMemoryRelation)
}
/** Removes the specified table from the in-memory cache. */
@@ -177,11 +176,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
- case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
+ case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
- case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
+ case inMem: InMemoryRelation =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
@@ -192,7 +191,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def isCached(tableName: String): Boolean = {
val relation = catalog.lookupRelation(None, tableName)
EliminateAnalysisOperators(relation) match {
- case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+ case _: InMemoryRelation => true
case _ => false
}
}
@@ -208,6 +207,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
PartialAggregation ::
LeftSemiJoin ::
HashJoin ::
+ InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index fdf28e1bb1..e1e4f24c6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,18 +17,29 @@
package org.apache.spark.sql.columnar
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row
import org.apache.spark.SparkConf
-private[sql] case class InMemoryColumnarTableScan(
- attributes: Seq[Attribute],
- child: SparkPlan,
- useCompression: Boolean)
- extends LeafNode {
+object InMemoryRelation {
+ def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
+ new InMemoryRelation(child.output, useCompression, child)
+}
- override def output: Seq[Attribute] = attributes
+private[sql] case class InMemoryRelation(
+ output: Seq[Attribute],
+ useCompression: Boolean,
+ child: SparkPlan)
+ extends LogicalPlan with MultiInstanceRelation {
+
+ override def children = Seq.empty
+ override def references = Set.empty
+
+ override def newInstance() =
+ new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
lazy val cachedColumnBuffers = {
val output = child.output
@@ -55,14 +66,26 @@ private[sql] case class InMemoryColumnarTableScan(
cached.count()
cached
}
+}
+
+private[sql] case class InMemoryColumnarTableScan(
+ attributes: Seq[Attribute],
+ relation: InMemoryRelation)
+ extends LeafNode {
+
+ override def output: Seq[Attribute] = attributes
override def execute() = {
- cachedColumnBuffers.mapPartitions { iterator =>
+ relation.cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
new Iterator[Row] {
- val columnAccessors = columnBuffers.map(ColumnAccessor(_))
+ // Find the ordinals of the requested columns. If none are requested, use the first.
+ val requestedColumns =
+ if (attributes.isEmpty) Seq(0) else attributes.map(relation.output.indexOf(_))
+
+ val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
override def next() = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 4613df1039..07967fe75e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -77,8 +77,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
- case scan @ InMemoryColumnarTableScan(output, _, _) =>
- scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f2f95dfe27..a657911afe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
@@ -191,6 +192,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
+ object InMemoryScans extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
+ pruneFilterProject(
+ projectList,
+ filters,
+ identity[Seq[Expression]], // No filters are pushed down.
+ InMemoryColumnarTableScan(_, mem)) :: Nil
+ case _ => Nil
+ }
+ }
+
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions = self.numPartitions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index ebca3adc2f..c794da4da4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
-import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.test.TestSQLContext
class CachedTableSuite extends QueryTest {
@@ -34,7 +33,7 @@ class CachedTableSuite extends QueryTest {
)
TestSQLContext.table("testData").queryExecution.analyzed match {
- case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
+ case _ : InMemoryRelation => // Found evidence of caching
case noCache => fail(s"No cache node found in plan $noCache")
}
@@ -46,7 +45,7 @@ class CachedTableSuite extends QueryTest {
)
TestSQLContext.table("testData").queryExecution.analyzed match {
- case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+ case cachePlan: InMemoryRelation =>
fail(s"Table still cached after uncache: $cachePlan")
case noCache => // Table uncached successfully
}
@@ -61,13 +60,17 @@ class CachedTableSuite extends QueryTest {
test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.cacheTable("selectStar")
- TestSQLContext.sql("SELECT * FROM selectStar")
+ TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
TestSQLContext.uncacheTable("selectStar")
}
test("Self-join cached") {
+ val unCachedAnswer =
+ TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
TestSQLContext.cacheTable("testData")
- TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
+ checkAnswer(
+ TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
+ unCachedAnswer.toSeq)
TestSQLContext.uncacheTable("testData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 31c5dfba92..86727b93f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().toSeq)
}
test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
+ val scan = InMemoryRelation(useCompression = true, plan)
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6497821554..9cd13f6ae0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -230,6 +230,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
CommandStrategy(self),
TakeOrdered,
ParquetOperations,
+ InMemoryScans,
HiveTableScans,
DataSinks,
Scripts,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index a91b520765..e9e6497f7e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable}
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -130,8 +130,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
castChildOutput(p, table, child)
- case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
- _, HiveTableScan(_, table, _), _)), _, child, _) =>
+ case p @ logical.InsertIntoTable(
+ InMemoryRelation(_, _,
+ HiveTableScan(_, table, _)), _, child, _) =>
castChildOutput(p, table, child)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8b51957162..ed6cd5a11d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.InMemoryRelation
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
@@ -44,8 +44,9 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
- case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
- _, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
+ case logical.InsertIntoTable(
+ InMemoryRelation(_, _,
+ HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 91ac03ca30..3132d0112c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql.execution.SparkLogicalPlan
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.hive.execution.HiveComparisonTest
import org.apache.spark.sql.hive.test.TestHive
@@ -34,7 +34,7 @@ class CachedTableSuite extends HiveComparisonTest {
test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
- case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
+ case _ : InMemoryRelation => // Found evidence of caching
case noCache => fail(s"No cache node found in plan $noCache")
}
TestHive.uncacheTable("src")
@@ -45,7 +45,7 @@ class CachedTableSuite extends HiveComparisonTest {
test("make sure table is uncached") {
TestHive.table("src").queryExecution.analyzed match {
- case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+ case cachePlan: InMemoryRelation =>
fail(s"Table still cached after uncache: $cachePlan")
case noCache => // Table uncached successfully
}