aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala19
3 files changed, 22 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 50ae9667f4..1dd8818ded 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -281,7 +281,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 1.4.0
*/
def save(): Unit = {
- assertNotBucketed()
+ assertNotBucketed("save")
assertNotStreaming("save() can only be called on non-continuous queries")
val dataSource = DataSource(
df.sparkSession,
@@ -330,7 +330,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
@Experimental
def startStream(): ContinuousQuery = {
- assertNotBucketed()
+ assertNotBucketed("startStream")
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
@@ -430,7 +430,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def insertInto(tableIdent: TableIdentifier): Unit = {
- assertNotBucketed()
+ assertNotBucketed("insertInto")
assertNotStreaming("insertInto() can only be called on non-continuous queries")
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
val overwrite = mode == SaveMode.Overwrite
@@ -500,10 +500,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
s"existing columns (${validColumnNames.mkString(", ")})"))
}
- private def assertNotBucketed(): Unit = {
+ private def assertNotBucketed(operation: String): Unit = {
if (numBuckets.isDefined || sortColumnNames.isDefined) {
throw new IllegalArgumentException(
- "Currently we don't support writing bucketed data to this data source.")
+ s"'$operation' does not support bucketing right now.")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index a2aac69064..431a943304 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -456,7 +456,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.stream()
val w = df.write
val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream())
- assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.")
+ assert(e.getMessage == "'startStream' does not support bucketing right now.")
}
test("check sortBy() can only be called on non-continuous queries;") {
@@ -465,7 +465,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.stream()
val w = df.write
val e = intercept[IllegalArgumentException](w.sortBy("text").startStream())
- assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.")
+ assert(e.getMessage == "'startStream' does not support bucketing right now.")
}
test("check save(path) can only be called on non-continuous queries") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index ff44c6f294..61a281db85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -59,11 +59,22 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
}
- test("write bucketed data to non-hive-table or existing hive table") {
+ test("write bucketed data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
- intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path"))
- intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path"))
- intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt"))
+
+ val e = intercept[IllegalArgumentException] {
+ df.write.bucketBy(2, "i").parquet("/tmp/path")
+ }
+ assert(e.getMessage == "'save' does not support bucketing right now.")
+ }
+
+ test("write bucketed data using insertInto()") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+
+ val e = intercept[IllegalArgumentException] {
+ df.write.bucketBy(2, "i").insertInto("tt")
+ }
+ assert(e.getMessage == "'insertInto' does not support bucketing right now.")
}
private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")