aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xR/run-tests.sh2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java14
-rw-r--r--python/pyspark/java_gateway.py6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala20
7 files changed, 30 insertions, 24 deletions
diff --git a/R/run-tests.sh b/R/run-tests.sh
index e82ad0ba2c..18a1e13bdc 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))
if [[ $FAILED != 0 ]]; then
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c21990f4e4..866e0b4151 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -20,6 +20,9 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.IOException;
import java.util.LinkedList;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +93,17 @@ public final class UnsafeExternalSorter {
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
+
+ // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
+ // the end of the task. This is necessary to avoid memory leaks in when the downstream operator
+ // does not fully consume the sorter's output (e.g. sort followed by limit).
+ taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ freeMemory();
+ return null;
+ }
+ });
}
// TODO: metrics tracking + integration with shuffle write metrics
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 90cd342a6c..60be85e53e 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -52,7 +52,11 @@ def launch_gateway():
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
- submit_args = "--conf spark.ui.enabled=false " + submit_args
+ submit_args = ' '.join([
+ "--conf spark.ui.enabled=false",
+ "--conf spark.buffer.pageSize=4mb",
+ submit_args
+ ])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2564bbd207..6644e85d4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -229,7 +229,7 @@ private[spark] object SQLConf {
" a specific query.")
val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
- defaultValue = Some(false),
+ defaultValue = Some(true),
doc = "When true, use the new optimized Tungsten physical execution backend.")
val DIALECT = stringConf(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 41a0c519ba..70e5031fb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,7 +47,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def canProcessSafeRows: Boolean = true
- override def canProcessUnsafeRows: Boolean = true
+ override def canProcessUnsafeRows: Boolean = {
+ // Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
+ // an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
+ // ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
+ !newPartitioning.isInstanceOf[RangePartitioning]
+ }
/**
* Determines whether records must be defensively copied before being sent to the shuffle.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 5c11024108..eb64684ae0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.scalatest.Matchers._
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.{Project, TungstenProject}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.SQLTestUtils
@@ -538,6 +538,7 @@ class ColumnExpressionSuite extends QueryTest with SQLTestUtils {
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
val projects = df.queryExecution.executedPlan.collect {
case project: Project => project
+ case tungstenProject: TungstenProject => tungstenProject
}
assert(projects.size === expectedNumProjects)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 7a4baa9e4a..138636b0c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
}
- ignore("sort followed by limit should not leak memory") {
- // TODO: this test is going to fail until we implement a proper iterator interface
- // with a close() method.
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
+ test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
@@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
)
}
- test("sort followed by limit") {
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
- try {
- checkThatPlansAgree(
- (1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
- (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
- sortAnswers = false
- )
- } finally {
- TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
-
- }
- }
-
test("sorting does not crash for large inputs") {
val sortOrder = 'a.asc :: Nil
val stringLength = 1024 * 1024 * 2