aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYijie Shen <henry.yijieshen@gmail.com>2015-07-16 10:52:09 -0700
committerYin Huai <yhuai@databricks.com>2015-07-16 10:52:09 -0700
commit43dac2c880d6f310a958531aee0bb4ac1d9b7025 (patch)
treedb45cb9cbed7e35c4efe0b8debdb6bd81ff6ae6f /sql
parentb536d5dc6c2c712270b8130ddd9945dff19a27d9 (diff)
downloadspark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.tar.gz
spark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.tar.bz2
spark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.zip
[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table
JIRA: https://issues.apache.org/jira/browse/SPARK-6941 Author: Yijie Shen <henry.yijieshen@gmail.com> Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits: f82cbe7 [Yijie Shen] reorder import dd67e40 [Yijie Shen] resolve comments 09518af [Yijie Shen] fix import order in DataframeSuite 0c635d4 [Yijie Shen] make match more specific 9df388d [Yijie Shen] move check into PreWriteCheck 847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala55
2 files changed, 60 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index a3fd7f13b3..40ee048e26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType
@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
+ case logical.InsertIntoTable(t, _, _, _, _) =>
+ if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
+ failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+ } else {
+ // OK
+ }
+
case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f592a9934d..23244fd310 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
package org.apache.spark.sql
+import java.io.File
+
import scala.language.postfixOps
+import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
-
+import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
-class DataFrameSuite extends QueryTest {
+class DataFrameSuite extends QueryTest with SQLTestUtils {
import org.apache.spark.sql.TestData._
lazy val ctx = org.apache.spark.sql.test.TestSQLContext
import ctx.implicits._
+ def sqlContext: SQLContext = ctx
+
test("analysis error should be eagerly reported") {
val oldSetting = ctx.conf.dataFrameEagerAnalysis
// Eager analysis.
@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
assert(f.getMessage.contains("column3"))
assert(!f.getMessage.contains("column2"))
}
+
+ test("SPARK-6941: Better error message for inserting into RDD-based Table") {
+ withTempDir { dir =>
+
+ val tempParquetFile = new File(dir, "tmp_parquet")
+ val tempJsonFile = new File(dir, "tmp_json")
+
+ val df = Seq(Tuple1(1)).toDF()
+ val insertion = Seq(Tuple1(2)).toDF("col")
+
+ // pass case: parquet table (HadoopFsRelation)
+ df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
+ val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
+ pdf.registerTempTable("parquet_base")
+ insertion.write.insertInto("parquet_base")
+
+ // pass case: json table (InsertableRelation)
+ df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
+ val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
+ jdf.registerTempTable("json_base")
+ insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
+
+ // error cases: insert into an RDD
+ df.registerTempTable("rdd_base")
+ val e1 = intercept[AnalysisException] {
+ insertion.write.insertInto("rdd_base")
+ }
+ assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+
+ // error case: insert into a logical plan that is not a LeafNode
+ val indirectDS = pdf.select("_1").filter($"_1" > 5)
+ indirectDS.registerTempTable("indirect_ds")
+ val e2 = intercept[AnalysisException] {
+ insertion.write.insertInto("indirect_ds")
+ }
+ assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+
+ // error case: insert into an OneRowRelation
+ new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
+ val e3 = intercept[AnalysisException] {
+ insertion.write.insertInto("one_row")
+ }
+ assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+ }
+ }
}