aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala)30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala)34
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala30
4 files changed, 101 insertions, 21 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 4fc72b9e47..9b65419dba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,17 +29,25 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet
-class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext {
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ }
+}
+
+
+abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
import testImplicits._
- private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
- private val nullDF = (for {
+ private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+ private lazy val nullDF = (for {
i <- 0 to 50
s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
} yield (i % 5, s, i % 13)).toDF("i", "j", "k")
@@ -224,8 +232,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}
- private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
- private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
+ private lazy val df1 =
+ (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
+ private lazy val df2 =
+ (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
case class BucketedTableTestSpec(
bucketSpec: Option[BucketSpec],
@@ -535,7 +545,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
test("error if there exists any malformed bucket files") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
- val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath
+ val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
val tableDir = new File(warehouseFilePath, "bucketed_table")
Utils.deleteRecursively(tableDir)
df1.write.parquet(tableDir.getAbsolutePath)
@@ -553,9 +563,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
- checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
+ checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
- checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+ checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
df1.groupBy("j").agg(max("k")))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 61cef2a800..9082261af7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,19 +20,29 @@ package org.apache.spark.sql.sources
import java.io.File
import java.net.URI
-import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
-class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext {
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+ }
+
+ override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json")
+}
+
+abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
import testImplicits._
+ protected def fileFormatsToTest: Seq[String]
+
test("bucketed by non-existing column") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
@@ -76,11 +86,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
assert(e.getMessage == "'insertInto' does not support bucketing right now;")
}
- private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+ private lazy val df = {
+ (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+ }
def tableDir: File = {
val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
- new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
+ new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
}
/**
@@ -141,7 +153,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
test("write bucketed data") {
- for (source <- Seq("parquet", "json", "orc")) {
+ for (source <- fileFormatsToTest) {
withTable("bucketed_table") {
df.write
.format(source)
@@ -157,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
test("write bucketed data with sortBy") {
- for (source <- Seq("parquet", "json", "orc")) {
+ for (source <- fileFormatsToTest) {
withTable("bucketed_table") {
df.write
.format(source)
@@ -190,7 +202,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
test("write bucketed data without partitionBy") {
- for (source <- Seq("parquet", "json", "orc")) {
+ for (source <- fileFormatsToTest) {
withTable("bucketed_table") {
df.write
.format(source)
@@ -203,7 +215,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
}
test("write bucketed data without partitionBy with sortBy") {
- for (source <- Seq("parquet", "json", "orc")) {
+ for (source <- fileFormatsToTest) {
withTable("bucketed_table") {
df.write
.format(source)
@@ -219,7 +231,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
test("write bucketed data with bucketing disabled") {
// The configuration BUCKETING_ENABLED does not affect the writing path
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
- for (source <- Seq("parquet", "json", "orc")) {
+ for (source <- fileFormatsToTest) {
withTable("bucketed_table") {
df.write
.format(source)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
new file mode 100644
index 0000000000..f277f99805
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton {
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
new file mode 100644
index 0000000000..454e2f65d5
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton {
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+ }
+
+ override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc")
+}