diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-07-30 10:45:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-30 10:45:32 -0700 |
commit | 520ec0ff9db75267f627dc4615b2316a1a3d44d7 (patch) | |
tree | 8a810354ee505d06e6a6e80dbc99d5a0a81e87f6 /core/src | |
parent | ab78b1d2a6ce26833ea3878a63921efd805a3737 (diff) | |
download | spark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.tar.gz spark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.tar.bz2 spark-520ec0ff9db75267f627dc4615b2316a1a3d44d7.zip |
[SPARK-8850] [SQL] Enable Unsafe mode by default
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues:
**List of fixed blockers**:
- [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741).
- [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this)
- [x] Update planner to also check whether codegen is enabled before planning unsafe operators.
- [x] Investigate failing HiveThriftBinaryServerSuite test. This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD. This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used.
- [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603).
- [x] ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF. This is necessary for `UDFSuite` to pass. For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682.
- [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6).
- [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736.
- [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`.
- [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs:
- [x] Wrong answer in `join_1to1` (fixed by #7680)
- [x] Wrong answer in `join_nulls` (fixed by #7680)
- [x] Managed memory OOM / leak in `lateral_view`
- [x] Seems to hang indefinitely in `partcols1`. This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710.
- [x] Error while freeing memory in `partcols1`: will be fixed by #7734.
- [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well.
- [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits:
83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
963f567 [Josh Rosen] Reduce buffer size for R tests
d6986de [Josh Rosen] Lower page size in PySpark tests
013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects
5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort
ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used
715517b [Josh Rosen] Enable Unsafe by default
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index c21990f4e4..866e0b4151 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -20,6 +20,9 @@ package org.apache.spark.util.collection.unsafe.sort; import java.io.IOException; import java.util.LinkedList; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +93,17 @@ public final class UnsafeExternalSorter { this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m"); initializeForWriting(); + + // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at + // the end of the task. This is necessary to avoid memory leaks in when the downstream operator + // does not fully consume the sorter's output (e.g. sort followed by limit). + taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + freeMemory(); + return null; + } + }); } // TODO: metrics tracking + integration with shuffle write metrics |