aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-30 10:45:32 -0700
committerReynold Xin <rxin@databricks.com>2015-07-30 10:45:32 -0700
commit520ec0ff9db75267f627dc4615b2316a1a3d44d7 (patch)
tree8a810354ee505d06e6a6e80dbc99d5a0a81e87f6 /sql/core
parentab78b1d2a6ce26833ea3878a63921efd805a3737 (diff)
downloadspark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.tar.gz
spark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.tar.bz2
spark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.zip
[SPARK-8850] [SQL] Enable Unsafe mode by default
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues: **List of fixed blockers**: - [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741). - [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this) - [x] Update planner to also check whether codegen is enabled before planning unsafe operators. - [x] Investigate failing HiveThriftBinaryServerSuite test. This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD. This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used. - [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603). - [x] ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF. This is necessary for `UDFSuite` to pass. For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682. - [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6). - [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736. - [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`. - [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs: - [x] Wrong answer in `join_1to1` (fixed by #7680) - [x] Wrong answer in `join_nulls` (fixed by #7680) - [x] Managed memory OOM / leak in `lateral_view` - [x] Seems to hang indefinitely in `partcols1`. This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710. - [x] Error while freeing memory in `partcols1`: will be fixed by #7734. - [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well. - [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759). Author: Josh Rosen <joshrosen@databricks.com> Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits: 83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default 963f567 [Josh Rosen] Reduce buffer size for R tests d6986de [Josh Rosen] Lower page size in PySpark tests 013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects 5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used 715517b [Josh Rosen] Enable Unsafe by default
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala20
4 files changed, 10 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2564bbd207..6644e85d4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -229,7 +229,7 @@ private[spark] object SQLConf {
" a specific query.")
val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
- defaultValue = Some(false),
+ defaultValue = Some(true),
doc = "When true, use the new optimized Tungsten physical execution backend.")
val DIALECT = stringConf(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 41a0c519ba..70e5031fb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,7 +47,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def canProcessSafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
+ override def canProcessUnsafeRows: Boolean = {
+ // Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
+ // an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
+ // ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
+ !newPartitioning.isInstanceOf[RangePartitioning]
+ }
/**
* Determines whether records must be defensively copied before being sent to the shuffle.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 5c11024108..eb64684ae0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.scalatest.Matchers._
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.{Project, TungstenProject}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.SQLTestUtils
@@ -538,6 +538,7 @@ class ColumnExpressionSuite extends QueryTest with SQLTestUtils {
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
val projects = df.queryExecution.executedPlan.collect {
case project: Project => project
+ case tungstenProject: TungstenProject => tungstenProject
}
assert(projects.size === expectedNumProjects)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 7a4baa9e4a..138636b0c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
}
- ignore("sort followed by limit should not leak memory") {
- // TODO: this test is going to fail until we implement a proper iterator interface
- // with a close() method.
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
+ test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
@@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
)
}
- test("sort followed by limit") {
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
- try {
- checkThatPlansAgree(
- (1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
- (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
- sortAnswers = false
- )
- } finally {
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
-
- }
- }
-
test("sorting does not crash for large inputs") {
val sortOrder = 'a.asc :: Nil
val stringLength = 1024 * 1024 * 2