diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala) | 30 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala) | 34 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala | 28 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala | 30 |
4 files changed, 101 insertions, 21 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 4fc72b9e47..9b65419dba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -29,17 +29,25 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet -class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { +class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } +} + + +abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { import testImplicits._ - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - private val nullDF = (for { + private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private lazy val nullDF = (for { i <- 0 to 50 s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g") } yield (i % 5, s, i % 13)).toDF("i", "j", "k") @@ -224,8 +232,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } } - private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") - private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + private lazy val df1 = + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") + private lazy val df2 = + (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") case class BucketedTableTestSpec( bucketSpec: Option[BucketSpec], @@ -535,7 +545,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("error if there exists any malformed bucket files") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath val tableDir = new File(warehouseFilePath, "bucketed_table") Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) @@ -553,9 +563,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j")) + checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j")) - checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")), + checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")), df1.groupBy("j").agg(max("k"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 61cef2a800..9082261af7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,19 +20,29 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI -import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} -class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { +class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json") +} + +abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { import testImplicits._ + protected def fileFormatsToTest: Seq[String] + test("bucketed by non-existing column") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) @@ -76,11 +86,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle assert(e.getMessage == "'insertInto' does not support bucketing right now;") } - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private lazy val df = { + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + } def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) } /** @@ -141,7 +153,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -157,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -190,7 +202,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data without partitionBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -203,7 +215,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data without partitionBy with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -219,7 +231,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle test("write bucketed data with bucketing disabled") { // The configuration BUCKETING_ENABLED does not affect the writing path withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala new file mode 100644 index 0000000000..f277f99805 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala new file mode 100644 index 0000000000..454e2f65d5 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc") +} |