aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-05-19 12:12:42 -0700
committerAndrew Or <andrew@databricks.com>2016-05-19 12:12:42 -0700
commit5ccecc078aa757d3f1f6632aa6df5659490f602f (patch)
treee8f37bd325eb96eceb4413a2c8dcaea8ea9c6dc5
parent4e3cb7a5d965fd490390398ecfe35f1fc05e8511 (diff)
downloadspark-5ccecc078aa757d3f1f6632aa6df5659490f602f.tar.gz
spark-5ccecc078aa757d3f1f6632aa6df5659490f602f.tar.bz2
spark-5ccecc078aa757d3f1f6632aa6df5659490f602f.zip
[SPARK-15392][SQL] fix default value of size estimation of logical plan
## What changes were proposed in this pull request? We use autoBroadcastJoinThreshold + 1L as the default value of size estimation, that is not good in 2.0, because we will calculate the size based on size of schema, then the estimation could be less than autoBroadcastJoinThreshold if you have an SELECT on top of an DataFrame created from RDD. This PR change the default value to Long.MaxValue. ## How was this patch tested? Added regression tests. Author: Davies Liu <davies@databricks.com> Closes #13183 from davies/fix_default_size.
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala34
4 files changed, 40 insertions, 7 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a68ef33d39..4fa799ac55 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -576,7 +576,7 @@ class DataFrame(object):
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
- [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]
+ [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)]
"""
assert isinstance(alias, basestring), "alias should be a string"
return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 518430f16d..5d18689801 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -120,8 +120,8 @@ object SQLConf {
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
- .intConf
- .createWithDefault(10 * 1024 * 1024)
+ .longConf
+ .createWithDefault(10L * 1024 * 1024)
val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
@@ -599,14 +599,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
- def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
+ def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
- def defaultSizeInBytes: Long =
- getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)
+ def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a6b83b3d07..a5d8cb19ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -438,7 +438,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
spark.cacheManager.clearCache()
sql("CACHE TABLE testData")
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
classOf[BroadcastHashJoinExec]),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
new file mode 100644
index 0000000000..9523f6f9f5
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class StatisticsSuite extends QueryTest with SharedSQLContext {
+
+ test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
+ val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
+ val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType))
+ assert(df.queryExecution.analyzed.statistics.sizeInBytes >
+ spark.wrapped.conf.autoBroadcastJoinThreshold)
+ assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes >
+ spark.wrapped.conf.autoBroadcastJoinThreshold)
+ }
+
+}