aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala32
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala61
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala7
-rw-r--r--sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b1
-rw-r--r--sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala58
7 files changed, 169 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index e09182dd8d..6b58b9322c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -31,6 +31,7 @@ trait Catalog {
alias: Option[String] = None): LogicalPlan
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
+ def unregisterTable(databaseName: Option[String], tableName: String): Unit
}
class SimpleCatalog extends Catalog {
@@ -40,7 +41,7 @@ class SimpleCatalog extends Catalog {
tables += ((tableName, plan))
}
- def dropTable(tableName: String) = tables -= tableName
+ def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }
def lookupRelation(
databaseName: Option[String],
@@ -87,6 +88,10 @@ trait OverrideCatalog extends Catalog {
plan: LogicalPlan): Unit = {
overrides.put((databaseName, tableName), plan)
}
+
+ override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
+ overrides.remove((databaseName, tableName))
+ }
}
/**
@@ -104,4 +109,8 @@ object EmptyCatalog extends Catalog {
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}
+
+ def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
+ throw new UnsupportedOperationException
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f950ea08ec..69bbbdc894 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
import org.apache.spark.sql.execution._
/**
@@ -111,6 +112,35 @@ class SQLContext(@transient val sparkContext: SparkContext)
result
}
+ /** Returns the specified table as a SchemaRDD */
+ def table(tableName: String): SchemaRDD =
+ new SchemaRDD(this, catalog.lookupRelation(None, tableName))
+
+ /** Caches the specified table in-memory. */
+ def cacheTable(tableName: String): Unit = {
+ val currentTable = catalog.lookupRelation(None, tableName)
+ val asInMemoryRelation =
+ InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
+
+ catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
+ }
+
+ /** Removes the specified table from the in-memory cache. */
+ def uncacheTable(tableName: String): Unit = {
+ EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
+ // This is kind of a hack to make sure that if this was just an RDD registered as a table,
+ // we reregister the RDD as a table.
+ case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
+ inMem.cachedColumnBuffers.unpersist()
+ catalog.unregisterTable(None, tableName)
+ catalog.registerTable(None, tableName, SparkLogicalPlan(e))
+ case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
+ inMem.cachedColumnBuffers.unpersist()
+ catalog.unregisterTable(None, tableName)
+ case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
+ }
+ }
+
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
new file mode 100644
index 0000000000..e5902c3cae
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+
+class CachedTableSuite extends QueryTest {
+ TestData // Load test tables.
+
+ test("read from cached table and uncache") {
+ TestSQLContext.cacheTable("testData")
+
+ checkAnswer(
+ TestSQLContext.table("testData"),
+ testData.collect().toSeq
+ )
+
+ TestSQLContext.table("testData").queryExecution.analyzed match {
+ case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
+ case noCache => fail(s"No cache node found in plan $noCache")
+ }
+
+ TestSQLContext.uncacheTable("testData")
+
+ checkAnswer(
+ TestSQLContext.table("testData"),
+ testData.collect().toSeq
+ )
+
+ TestSQLContext.table("testData").queryExecution.analyzed match {
+ case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+ fail(s"Table still cached after uncache: $cachePlan")
+ case noCache => // Table uncached successfully
+ }
+ }
+
+ test("correct error on uncache of non-cached table") {
+ intercept[IllegalArgumentException] {
+ TestSQLContext.uncacheTable("testData")
+ }
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4f8353666a..29834a11f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -141,6 +141,13 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
*/
override def registerTable(
databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???
+
+ /**
+ * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
+ * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
+ */
+ override def unregisterTable(
+ databaseName: Option[String], tableName: String): Unit = ???
}
object HiveMetastoreTypes extends RegexParsers {
diff --git a/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b b/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b
new file mode 100644
index 0000000000..60878ffb77
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b
@@ -0,0 +1 @@
+238 val_238
diff --git a/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b b/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b
new file mode 100644
index 0000000000..60878ffb77
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b
@@ -0,0 +1 @@
+238 val_238
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
new file mode 100644
index 0000000000..68d45e53cd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.hive.execution.HiveComparisonTest
+
+class CachedTableSuite extends HiveComparisonTest {
+ TestHive.loadTestTable("src")
+
+ test("cache table") {
+ TestHive.cacheTable("src")
+ }
+
+ createQueryTest("read from cached table",
+ "SELECT * FROM src LIMIT 1")
+
+ test("check that table is cached and uncache") {
+ TestHive.table("src").queryExecution.analyzed match {
+ case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
+ case noCache => fail(s"No cache node found in plan $noCache")
+ }
+ TestHive.uncacheTable("src")
+ }
+
+ createQueryTest("read from uncached table",
+ "SELECT * FROM src LIMIT 1")
+
+ test("make sure table is uncached") {
+ TestHive.table("src").queryExecution.analyzed match {
+ case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+ fail(s"Table still cached after uncache: $cachePlan")
+ case noCache => // Table uncached successfully
+ }
+ }
+
+ test("correct error on uncache of non-cached table") {
+ intercept[IllegalArgumentException] {
+ TestHive.uncacheTable("src")
+ }
+ }
+}