From 43dac2c880d6f310a958531aee0bb4ac1d9b7025 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 16 Jul 2015 10:52:09 -0700 Subject: [SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table JIRA: https://issues.apache.org/jira/browse/SPARK-6941 Author: Yijie Shen Closes #7342 from yijieshen/SPARK-6941 and squashes the following commits: f82cbe7 [Yijie Shen] reorder import dd67e40 [Yijie Shen] resolve comments 09518af [Yijie Shen] fix import order in DataframeSuite 0c635d4 [Yijie Shen] make match more specific 9df388d [Yijie Shen] move check into PreWriteCheck 847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy --- .../scala/org/apache/spark/sql/sources/rules.scala | 9 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 55 ++++++++++++++++++++-- 2 files changed, 60 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index a3fd7f13b3..40ee048e26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException} import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType @@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case logical.InsertIntoTable(t, _, _, _, _) => + if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) { + failAnalysis(s"Inserting into an RDD-based table is not allowed.") + } else { + // OK + } + case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f592a9934d..23244fd310 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql +import java.io.File + import scala.language.postfixOps +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint} - +import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils} -class DataFrameSuite extends QueryTest { +class DataFrameSuite extends QueryTest with SQLTestUtils { import org.apache.spark.sql.TestData._ lazy val ctx = org.apache.spark.sql.test.TestSQLContext import ctx.implicits._ + def sqlContext: SQLContext = ctx + test("analysis error should be eagerly reported") { val oldSetting = ctx.conf.dataFrameEagerAnalysis // Eager analysis. @@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest { assert(f.getMessage.contains("column3")) assert(!f.getMessage.contains("column2")) } + + test("SPARK-6941: Better error message for inserting into RDD-based Table") { + withTempDir { dir => + + val tempParquetFile = new File(dir, "tmp_parquet") + val tempJsonFile = new File(dir, "tmp_json") + + val df = Seq(Tuple1(1)).toDF() + val insertion = Seq(Tuple1(2)).toDF("col") + + // pass case: parquet table (HadoopFsRelation) + df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath) + val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath) + pdf.registerTempTable("parquet_base") + insertion.write.insertInto("parquet_base") + + // pass case: json table (InsertableRelation) + df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath) + val jdf = ctx.read.json(tempJsonFile.getCanonicalPath) + jdf.registerTempTable("json_base") + insertion.write.mode(SaveMode.Overwrite).insertInto("json_base") + + // error cases: insert into an RDD + df.registerTempTable("rdd_base") + val e1 = intercept[AnalysisException] { + insertion.write.insertInto("rdd_base") + } + assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + + // error case: insert into a logical plan that is not a LeafNode + val indirectDS = pdf.select("_1").filter($"_1" > 5) + indirectDS.registerTempTable("indirect_ds") + val e2 = intercept[AnalysisException] { + insertion.write.insertInto("indirect_ds") + } + assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + + // error case: insert into an OneRowRelation + new DataFrame(ctx, OneRowRelation).registerTempTable("one_row") + val e3 = intercept[AnalysisException] { + insertion.write.insertInto("one_row") + } + assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + } + } } -- cgit v1.2.3