aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-29 23:38:06 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-29 23:38:06 -0700
commit56419cf11f769c80f391b45dc41b3c7101cc5ff4 (patch)
treec40211de4baa6c9ab9d12160ac3bab977fb17db4 /sql
parentd89be0bf81029cd82008a959d191e1c7b6ceaa18 (diff)
downloadspark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.gz
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.bz2
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.zip
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java7
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala54
6 files changed, 21 insertions, 89 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 810c74fd2f..f7063d1e5c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -96,15 +96,10 @@ final class UnsafeExternalRowSorter {
);
numRowsInserted++;
if (testSpillFrequency > 0 && (numRowsInserted % testSpillFrequency) == 0) {
- spill();
+ sorter.spill();
}
}
- @VisibleForTesting
- void spill() throws IOException {
- sorter.spill();
- }
-
/**
* Return the peak memory used so far, in bytes.
*/
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 82c645df28..889f970034 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -165,7 +165,7 @@ public final class UnsafeFixedWidthAggregationMap {
public KVIterator<UnsafeRow, UnsafeRow> iterator() {
return new KVIterator<UnsafeRow, UnsafeRow>() {
- private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator =
+ private final BytesToBytesMap.MapIterator mapLocationIterator =
map.destructiveIterator();
private final UnsafeRow key = new UnsafeRow();
private final UnsafeRow value = new UnsafeRow();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 46301f0042..845f2ae685 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -17,13 +17,13 @@
package org.apache.spark.sql.execution;
-import java.io.IOException;
-
import javax.annotation.Nullable;
+import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.TaskContext;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering;
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering;
@@ -33,7 +33,6 @@ import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.collection.unsafe.sort.*;
/**
@@ -84,18 +83,16 @@ public final class UnsafeKVExternalSorter {
/* initialSize */ 4096,
pageSizeBytes);
} else {
- // Insert the records into the in-memory sorter.
- // We will use the number of elements in the map as the initialSize of the
- // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
- // we will use 1 as its initial size if the map is empty.
- // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
+ // The memory needed for UnsafeInMemorySorter should be less than longArray in map.
+ map.freeArray();
+ // The memory used by UnsafeInMemorySorter will be counted later (end of this block)
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
// We cannot use the destructive iterator here because we are reusing the existing memory
// pages in BytesToBytesMap to hold records during sorting.
// The only new memory we are allocating is the pointer/prefix array.
- BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
+ BytesToBytesMap.MapIterator iter = map.iterator();
final int numKeyFields = keySchema.size();
UnsafeRow row = new UnsafeRow();
while (iter.hasNext()) {
@@ -117,7 +114,7 @@ public final class UnsafeKVExternalSorter {
}
sorter = UnsafeExternalSorter.createWithExistingInMemorySorter(
- taskContext.taskMemoryManager(),
+ taskMemoryManager,
blockManager,
taskContext,
new KVComparator(ordering, keySchema.length()),
@@ -128,6 +125,8 @@ public final class UnsafeKVExternalSorter {
sorter.spill();
map.free();
+ // counting the memory used UnsafeInMemorySorter
+ taskMemoryManager.acquireExecutionMemory(inMemSorter.getMemoryUsage(), sorter);
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index dbf4863b76..a38623623a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -24,7 +24,7 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite}
-import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
@@ -48,7 +48,7 @@ class UnsafeFixedWidthAggregationMapSuite
private def emptyAggregationBuffer: InternalRow = InternalRow(0)
private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes
- private var memoryManager: GrantEverythingMemoryManager = null
+ private var memoryManager: TestMemoryManager = null
private var taskMemoryManager: TaskMemoryManager = null
def testWithMemoryLeakDetection(name: String)(f: => Unit) {
@@ -62,7 +62,7 @@ class UnsafeFixedWidthAggregationMapSuite
test(name) {
val conf = new SparkConf().set("spark.unsafe.offHeap", "false")
- memoryManager = new GrantEverythingMemoryManager(conf)
+ memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
@@ -193,10 +193,6 @@ class UnsafeFixedWidthAggregationMapSuite
// Convert the map into a sorter
val sorter = map.destructAndCreateExternalSorter()
- withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
- }
-
// Add more keys to the sorter and make sure the results come out sorted.
val additionalKeys = randomStrings(1024)
val keyConverter = UnsafeProjection.create(groupKeySchema)
@@ -208,7 +204,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -251,7 +247,7 @@ class UnsafeFixedWidthAggregationMapSuite
sorter.insertKV(keyConverter.apply(k), valueConverter.apply(v))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -294,16 +290,12 @@ class UnsafeFixedWidthAggregationMapSuite
// Convert the map into a sorter. Right now, it contains one record.
val sorter = map.destructAndCreateExternalSorter()
- withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
- assert(taskMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
- }
-
// Add more keys to the sorter and make sure the results come out sorted.
(1 to 4096).foreach { i =>
sorter.insertKV(UnsafeRow.createFromByteArray(0, 0), UnsafeRow.createFromByteArray(0, 0))
if ((i % 100) == 0) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
@@ -342,7 +334,7 @@ class UnsafeFixedWidthAggregationMapSuite
buf.setInt(0, str.length)
}
// Simulate running out of space
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.limit(0)
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf == null)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 13dc1754c9..7b80963ec8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import scala.util.Random
import org.apache.spark._
-import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
@@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
pageSize: Long,
spill: Boolean): Unit = {
val memoryManager =
- new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
+ new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
@@ -128,7 +128,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow])
// 1% chance we will spill
if (rand.nextDouble() < 0.01 && spill) {
- memoryManager.markExecutionAsOutOfMemory()
+ memoryManager.markExecutionAsOutOfMemoryOnce()
sorter.closeCurrentPage()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
deleted file mode 100644
index 475037bd45..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.aggregate
-
-import org.apache.spark._
-import org.apache.spark.memory.TaskMemoryManager
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.test.SharedSQLContext
-
-class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext {
-
- test("memory acquired on construction") {
- val taskMemoryManager = new TaskMemoryManager(SparkEnv.get.memoryManager, 0)
- val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, null, Seq.empty)
- TaskContext.setTaskContext(taskContext)
-
- // Assert that a page is allocated before processing starts
- var iter: TungstenAggregationIterator = null
- try {
- val newMutableProjection = (expr: Seq[Expression], schema: Seq[Attribute]) => {
- () => new InterpretedMutableProjection(expr, schema)
- }
- val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy")
- iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty,
- 0, Seq.empty, newMutableProjection, Seq.empty, None,
- dummyAccum, dummyAccum, dummyAccum, dummyAccum)
- val numPages = iter.getHashMap.getNumDataPages
- assert(numPages === 1)
- } finally {
- // Clean up
- if (iter != null) {
- iter.free()
- }
- TaskContext.unset()
- }
- }
-}