aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-12-21 23:50:35 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-12-21 23:50:35 +0100
commit354e936187708a404c0349e3d8815a47953123ec (patch)
tree7173f87541395152d3b3f7086fdd820f553c4c08 /sql/core/src/test
parent078c71c2dcbb1470d22f8eb8138fb17e3d7c2414 (diff)
downloadspark-354e936187708a404c0349e3d8815a47953123ec.tar.gz
spark-354e936187708a404c0349e3d8815a47953123ec.tar.bz2
spark-354e936187708a404c0349e3d8815a47953123ec.zip
[SPARK-18775][SQL] Limit the max number of records written per file
## What changes were proposed in this pull request? Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files. This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag). ## How was this patch tested? Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert. Author: Reynold Xin <rxin@databricks.com> Closes #16204 from rxin/SPARK-18775.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BucketingUtilsSuite.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala37
2 files changed, 83 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BucketingUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BucketingUtilsSuite.scala
new file mode 100644
index 0000000000..9d892bbdba
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BucketingUtilsSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class BucketingUtilsSuite extends SparkFunSuite {
+
+ test("generate bucket id") {
+ assert(BucketingUtils.bucketIdToString(0) == "_00000")
+ assert(BucketingUtils.bucketIdToString(10) == "_00010")
+ assert(BucketingUtils.bucketIdToString(999999) == "_999999")
+ }
+
+ test("match bucket ids") {
+ def testCase(filename: String, expected: Option[Int]): Unit = withClue(s"name: $filename") {
+ assert(BucketingUtils.getBucketId(filename) == expected)
+ }
+
+ testCase("a_1", Some(1))
+ testCase("a_1.txt", Some(1))
+ testCase("a_9999999", Some(9999999))
+ testCase("a_9999999.txt", Some(9999999))
+ testCase("a_1.c2.txt", Some(1))
+ testCase("a_1.", Some(1))
+
+ testCase("a_1:txt", None)
+ testCase("a_1-c2.txt", None)
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index a2decadbe0..953604e4ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.sources
+import java.io.File
+
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@@ -61,4 +63,39 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
assert(spark.read.parquet(path).schema.map(_.name) == Seq("j", "i"))
}
}
+
+ test("maxRecordsPerFile setting in non-partitioned write path") {
+ withTempDir { f =>
+ spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
+ .write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath)
+ assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+
+ spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
+ .write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath)
+ assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2)
+
+ spark.range(start = 0, end = 4, step = 1, numPartitions = 1)
+ .write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath)
+ assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1)
+ }
+ }
+
+ test("maxRecordsPerFile setting in dynamic partition writes") {
+ withTempDir { f =>
+ spark.range(start = 0, end = 4, step = 1, numPartitions = 1).selectExpr("id", "id id1")
+ .write
+ .partitionBy("id")
+ .option("maxRecordsPerFile", 1)
+ .mode("overwrite")
+ .parquet(f.getAbsolutePath)
+ assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4)
+ }
+ }
+
+ /** Lists files recursively. */
+ private def recursiveList(f: File): Array[File] = {
+ require(f.isDirectory)
+ val current = f.listFiles
+ current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+ }
}