diff options
author | Yijie Shen <henry.yijieshen@gmail.com> | 2015-07-16 10:52:09 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-07-16 10:52:09 -0700 |
commit | 43dac2c880d6f310a958531aee0bb4ac1d9b7025 (patch) | |
tree | db45cb9cbed7e35c4efe0b8debdb6bd81ff6ae6f /sql | |
parent | b536d5dc6c2c712270b8130ddd9945dff19a27d9 (diff) | |
download | spark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.tar.gz spark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.tar.bz2 spark-43dac2c880d6f310a958531aee0bb4ac1d9b7025.zip |
[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table
JIRA: https://issues.apache.org/jira/browse/SPARK-6941
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits:
f82cbe7 [Yijie Shen] reorder import
dd67e40 [Yijie Shen] resolve comments
09518af [Yijie Shen] fix import order in DataframeSuite
0c635d4 [Yijie Shen] make match more specific
9df388d [Yijie Shen] move check into PreWriteCheck
847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala | 9 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 55 |
2 files changed, 60 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index a3fd7f13b3..40ee048e26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException} import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType @@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case logical.InsertIntoTable(t, _, _, _, _) => + if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) { + failAnalysis(s"Inserting into an RDD-based table is not allowed.") + } else { + // OK + } + case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f592a9934d..23244fd310 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql +import java.io.File + import scala.language.postfixOps +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint} - +import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils} -class DataFrameSuite extends QueryTest { +class DataFrameSuite extends QueryTest with SQLTestUtils { import org.apache.spark.sql.TestData._ lazy val ctx = org.apache.spark.sql.test.TestSQLContext import ctx.implicits._ + def sqlContext: SQLContext = ctx + test("analysis error should be eagerly reported") { val oldSetting = ctx.conf.dataFrameEagerAnalysis // Eager analysis. @@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest { assert(f.getMessage.contains("column3")) assert(!f.getMessage.contains("column2")) } + + test("SPARK-6941: Better error message for inserting into RDD-based Table") { + withTempDir { dir => + + val tempParquetFile = new File(dir, "tmp_parquet") + val tempJsonFile = new File(dir, "tmp_json") + + val df = Seq(Tuple1(1)).toDF() + val insertion = Seq(Tuple1(2)).toDF("col") + + // pass case: parquet table (HadoopFsRelation) + df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath) + val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath) + pdf.registerTempTable("parquet_base") + insertion.write.insertInto("parquet_base") + + // pass case: json table (InsertableRelation) + df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath) + val jdf = ctx.read.json(tempJsonFile.getCanonicalPath) + jdf.registerTempTable("json_base") + insertion.write.mode(SaveMode.Overwrite).insertInto("json_base") + + // error cases: insert into an RDD + df.registerTempTable("rdd_base") + val e1 = intercept[AnalysisException] { + insertion.write.insertInto("rdd_base") + } + assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + + // error case: insert into a logical plan that is not a LeafNode + val indirectDS = pdf.select("_1").filter($"_1" > 5) + indirectDS.registerTempTable("indirect_ds") + val e2 = intercept[AnalysisException] { + insertion.write.insertInto("indirect_ds") + } + assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + + // error case: insert into an OneRowRelation + new DataFrame(ctx, OneRowRelation).registerTempTable("one_row") + val e3 = intercept[AnalysisException] { + insertion.write.insertInto("one_row") + } + assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + } + } } |