aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-03-24 12:10:30 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-24 12:10:30 -0700
commit26c6ce3d2947df5a294b1ad4a22fae5d31d06c19 (patch)
tree52c15a61c697a344866f482dc87c7bb190909bf4 /sql
parent32efadd0500f10bddf2ae8456c9e719ec52940f1 (diff)
downloadspark-26c6ce3d2947df5a294b1ad4a22fae5d31d06c19.tar.gz
spark-26c6ce3d2947df5a294b1ad4a22fae5d31d06c19.tar.bz2
spark-26c6ce3d2947df5a294b1ad4a22fae5d31d06c19.zip
[SPARK-6437][SQL] Use completion iterator to close external sorter
Otherwise we will leak files when spilling occurs. Author: Michael Armbrust <michael@databricks.com> Closes #5161 from marmbrus/cleanupAfterSort and squashes the following commits: cb13d3c [Michael Armbrust] hint to inferencer cdebdf5 [Michael Armbrust] Use completion iterator to close external sorter
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 20c9bc3e75..1f5251a203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.util.MutablePair
+import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.util.collection.ExternalSorter
/**
@@ -194,7 +194,9 @@ case class ExternalSort(
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r, null)))
- sorter.iterator.map(_._1)
+ val baseIterator = sorter.iterator.map(_._1)
+ // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
+ CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
}