aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala42
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala8
3 files changed, 52 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e885a..78b74f948e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
@Experimental
def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+ assertNotPartitioned("foreach")
assertNotBucketed("foreach")
assertStreaming(
"foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def assertNotBucketed(operation: String): Unit = {
if (numBuckets.isDefined || sortColumnNames.isDefined) {
- throw new IllegalArgumentException(
- s"'$operation' does not support bucketing right now.")
+ throw new AnalysisException(s"'$operation' does not support bucketing right now")
+ }
+ }
+
+ private def assertNotPartitioned(operation: String): Unit = {
+ if (partitioningColumns.isDefined) {
+ throw new AnalysisException( s"'$operation' does not support partitioning")
}
}
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
+ assertNotPartitioned("jdbc")
+ assertNotBucketed("jdbc")
assertNotStreaming("jdbc() can only be called on non-continuous queries")
val props = new Properties()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a4c4..6e0d66ae7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
- val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream())
- assert(e.getMessage == "'startStream' does not support bucketing right now.")
+ val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream())
+ assert(e.getMessage == "'startStream' does not support bucketing right now;")
}
test("check sortBy() can only be called on non-continuous queries;") {
@@ -464,8 +464,8 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
- val e = intercept[IllegalArgumentException](w.sortBy("text").startStream())
- assert(e.getMessage == "'startStream' does not support bucketing right now.")
+ val e = intercept[AnalysisException](w.sortBy("text").startStream())
+ assert(e.getMessage == "'startStream' does not support bucketing right now;")
}
test("check save(path) can only be called on non-continuous queries") {
@@ -558,6 +558,40 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
assert(e.getMessage == "csv() can only be called on non-continuous queries;")
}
+ test("check foreach() does not support partitioning or bucketing") {
+ val df = spark.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ var w = df.write.partitionBy("value")
+ var e = intercept[AnalysisException](w.foreach(null))
+ Seq("foreach", "partitioning").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+
+ w = df.write.bucketBy(2, "value")
+ e = intercept[AnalysisException](w.foreach(null))
+ Seq("foreach", "bucketing").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+
+ test("check jdbc() does not support partitioning or bucketing") {
+ val df = spark.read.text(newTextInput)
+
+ var w = df.write.partitionBy("value")
+ var e = intercept[AnalysisException](w.jdbc(null, null, null))
+ Seq("jdbc", "partitioning").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+
+ w = df.write.bucketBy(2, "value")
+ e = intercept[AnalysisException](w.jdbc(null, null, null))
+ Seq("jdbc", "bucketing").foreach { s =>
+ assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+ }
+ }
+
test("ConsoleSink can be correctly loaded") {
LastOptions.clear()
val df = spark.read
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 61a281db85..997445114b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -62,19 +62,19 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
test("write bucketed data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
- val e = intercept[IllegalArgumentException] {
+ val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").parquet("/tmp/path")
}
- assert(e.getMessage == "'save' does not support bucketing right now.")
+ assert(e.getMessage == "'save' does not support bucketing right now;")
}
test("write bucketed data using insertInto()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
- val e = intercept[IllegalArgumentException] {
+ val e = intercept[AnalysisException] {
df.write.bucketBy(2, "i").insertInto("tt")
}
- assert(e.getMessage == "'insertInto' does not support bucketing right now.")
+ assert(e.getMessage == "'insertInto' does not support bucketing right now;")
}
private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")