aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-08-04 12:59:18 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-04 12:59:18 -0700
commit8e7d5ba1a20a8a1f409e9d6472ae3e6c4bc948b4 (patch)
tree22128614dde4f880b0036bb7fa816d91318ee183 /core
parent59f84a9531f7974a053fd4963ce9afd88273ea4c (diff)
downloadspark-8e7d5ba1a20a8a1f409e9d6472ae3e6c4bc948b4.tar.gz
spark-8e7d5ba1a20a8a1f409e9d6472ae3e6c4bc948b4.tar.bz2
spark-8e7d5ba1a20a8a1f409e9d6472ae3e6c4bc948b4.zip
SPARK-2792. Fix reading too much or too little data from each stream in ExternalMap / Sorter
All these changes are from mridulm's work in #1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed. In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues. Author: Matei Zaharia <matei@databricks.com> Closes #1722 from mateiz/spark-2792 and squashes the following commits: 5d4bfb5 [Matei Zaharia] Make objectStreamReset counter count the last object written too 18fe865 [Matei Zaharia] Update docs on objectStreamReset 576ee83 [Matei Zaharia] Allow objectStreamReset to be 0 0374217 [Matei Zaharia] Remove super paranoid code to close file handles bda37bb [Matei Zaharia] Implement Mridul's ExternalAppendOnlyMap fixes in ExternalSorter too 0d6dad7 [Matei Zaharia] Added Mridul's test changes for ExternalAppendOnlyMap 9a78e4b [Matei Zaharia] Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala47
5 files changed, 193 insertions, 82 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index a7fa057ee0..34bc312409 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
/**
* Calling reset to avoid memory leak:
* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
- * But only call it every 10,000th time to avoid bloated serialization streams (when
+ * But only call it every 100th time to avoid bloated serialization streams (when
* the stream 'resets' object class descriptions have to be re-written)
*/
def writeObject[T: ClassTag](t: T): SerializationStream = {
objOut.writeObject(t)
+ counter += 1
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
counter = 0
- } else {
- counter += 1
}
this
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index cb67a1c039..5d10a1f844 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.collection
-import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
+import java.io._
import java.util.Comparator
import scala.collection.BufferedIterator
@@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
@@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C](
// Flush the disk writer's contents to disk, and update relevant variables
def flush() = {
- writer.commitAndClose()
- val bytesWritten = writer.bytesWritten
+ val w = writer
+ writer = null
+ w.commitAndClose()
+ val bytesWritten = w.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
objectsWritten = 0
}
+ var success = false
try {
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
@@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
flush()
- writer.close()
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
}
}
if (objectsWritten > 0) {
flush()
+ } else if (writer != null) {
+ val w = writer
+ writer = null
+ w.revertPartialWritesAndClose()
}
+ success = true
} finally {
- // Partial failures cannot be tolerated; do not revert partial writes
- writer.close()
+ if (!success) {
+ // This code path only happens if an exception was thrown above before we set success;
+ // close our stuff and let the exception be thrown further
+ if (writer != null) {
+ writer.revertPartialWritesAndClose()
+ }
+ if (file.exists()) {
+ file.delete()
+ }
+ }
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
*/
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
- extends Iterator[(K, C)] {
- private val fileStream = new FileInputStream(file)
- private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
+ extends Iterator[(K, C)]
+ {
+ private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
+ assert(file.length() == batchOffsets(batchOffsets.length - 1))
+
+ private var batchIndex = 0 // Which batch we're in
+ private var fileStream: FileInputStream = null
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
- private var batchStream = nextBatchStream()
- private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
- private var deserializeStream = ser.deserializeStream(compressedStream)
+ private var deserializeStream = nextBatchStream()
private var nextItem: (K, C) = null
private var objectsRead = 0
/**
* Construct a stream that reads only from the next batch.
*/
- private def nextBatchStream(): InputStream = {
- if (batchSizes.length > 0) {
- ByteStreams.limit(bufferedStream, batchSizes.remove(0))
+ private def nextBatchStream(): DeserializationStream = {
+ // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
+ // we're still in a valid batch.
+ if (batchIndex < batchOffsets.length - 1) {
+ if (deserializeStream != null) {
+ deserializeStream.close()
+ fileStream.close()
+ deserializeStream = null
+ fileStream = null
+ }
+
+ val start = batchOffsets(batchIndex)
+ fileStream = new FileInputStream(file)
+ fileStream.getChannel.position(start)
+ batchIndex += 1
+
+ val end = batchOffsets(batchIndex)
+
+ assert(end >= start, "start = " + start + ", end = " + end +
+ ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
+
+ val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
+ val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+ ser.deserializeStream(compressedStream)
} else {
// No more batches left
- bufferedStream
+ cleanup()
+ null
}
}
@@ -424,10 +463,8 @@ class ExternalAppendOnlyMap[K, V, C](
val item = deserializeStream.readObject().asInstanceOf[(K, C)]
objectsRead += 1
if (objectsRead == serializerBatchSize) {
- batchStream = nextBatchStream()
- compressedStream = blockManager.wrapForCompression(blockId, batchStream)
- deserializeStream = ser.deserializeStream(compressedStream)
objectsRead = 0
+ deserializeStream = nextBatchStream()
}
item
} catch {
@@ -439,6 +476,9 @@ class ExternalAppendOnlyMap[K, V, C](
override def hasNext: Boolean = {
if (nextItem == null) {
+ if (deserializeStream == null) {
+ return false
+ }
nextItem = readNextItem()
}
nextItem != null
@@ -455,7 +495,11 @@ class ExternalAppendOnlyMap[K, V, C](
// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
- deserializeStream.close()
+ batchIndex = batchOffsets.length // Prevent reading any other batch
+ val ds = deserializeStream
+ deserializeStream = null
+ fileStream = null
+ ds.close()
file.delete()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 6e415a2bd8..b04c50bd3e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import com.google.common.io.ByteStreams
import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.BlockId
/**
@@ -273,13 +273,16 @@ private[spark] class ExternalSorter[K, V, C](
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is closed at the end of this process, and cannot be reused.
def flush() = {
- writer.commitAndClose()
- val bytesWritten = writer.bytesWritten
+ val w = writer
+ writer = null
+ w.commitAndClose()
+ val bytesWritten = w.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
objectsWritten = 0
}
+ var success = false
try {
val it = collection.destructiveSortedIterator(partitionKeyComparator)
while (it.hasNext) {
@@ -299,13 +302,23 @@ private[spark] class ExternalSorter[K, V, C](
}
if (objectsWritten > 0) {
flush()
+ } else if (writer != null) {
+ val w = writer
+ writer = null
+ w.revertPartialWritesAndClose()
+ }
+ success = true
+ } finally {
+ if (!success) {
+ // This code path only happens if an exception was thrown above before we set success;
+ // close our stuff and let the exception be thrown further
+ if (writer != null) {
+ writer.revertPartialWritesAndClose()
+ }
+ if (file.exists()) {
+ file.delete()
+ }
}
- writer.close()
- } catch {
- case e: Exception =>
- writer.close()
- file.delete()
- throw e
}
if (usingMap) {
@@ -472,36 +485,58 @@ private[spark] class ExternalSorter[K, V, C](
* partitions to be requested in order.
*/
private[this] class SpillReader(spill: SpilledFile) {
- val fileStream = new FileInputStream(spill.file)
- val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
+ // Serializer batch offsets; size will be batchSize.length + 1
+ val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)
// Track which partition and which batch stream we're in. These will be the indices of
// the next element we will read. We'll also store the last partition read so that
// readNextPartition() can figure out what partition that was from.
var partitionId = 0
var indexInPartition = 0L
- var batchStreamsRead = 0
+ var batchId = 0
var indexInBatch = 0
var lastPartitionId = 0
skipToNextPartition()
- // An intermediate stream that reads from exactly one batch
+
+ // Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
- var batchStream = nextBatchStream()
- var compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream)
- var deserStream = serInstance.deserializeStream(compressedStream)
+ var fileStream: FileInputStream = null
+ var deserializeStream = nextBatchStream() // Also sets fileStream
+
var nextItem: (K, C) = null
var finished = false
/** Construct a stream that only reads from the next batch */
- def nextBatchStream(): InputStream = {
- if (batchStreamsRead < spill.serializerBatchSizes.length) {
- batchStreamsRead += 1
- ByteStreams.limit(bufferedStream, spill.serializerBatchSizes(batchStreamsRead - 1))
+ def nextBatchStream(): DeserializationStream = {
+ // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
+ // we're still in a valid batch.
+ if (batchId < batchOffsets.length - 1) {
+ if (deserializeStream != null) {
+ deserializeStream.close()
+ fileStream.close()
+ deserializeStream = null
+ fileStream = null
+ }
+
+ val start = batchOffsets(batchId)
+ fileStream = new FileInputStream(spill.file)
+ fileStream.getChannel.position(start)
+ batchId += 1
+
+ val end = batchOffsets(batchId)
+
+ assert(end >= start, "start = " + start + ", end = " + end +
+ ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
+
+ val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
+ val compressedStream = blockManager.wrapForCompression(spill.blockId, bufferedStream)
+ serInstance.deserializeStream(compressedStream)
} else {
- // No more batches left; give an empty stream
- bufferedStream
+ // No more batches left
+ cleanup()
+ null
}
}
@@ -525,19 +560,17 @@ private[spark] class ExternalSorter[K, V, C](
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
- if (finished) {
+ if (finished || deserializeStream == null) {
return null
}
- val k = deserStream.readObject().asInstanceOf[K]
- val c = deserStream.readObject().asInstanceOf[C]
+ val k = deserializeStream.readObject().asInstanceOf[K]
+ val c = deserializeStream.readObject().asInstanceOf[C]
lastPartitionId = partitionId
// Start reading the next batch if we're done with this one
indexInBatch += 1
if (indexInBatch == serializerBatchSize) {
- batchStream = nextBatchStream()
- compressedStream = blockManager.wrapForCompression(spill.blockId, batchStream)
- deserStream = serInstance.deserializeStream(compressedStream)
indexInBatch = 0
+ deserializeStream = nextBatchStream()
}
// Update the partition location of the element we're reading
indexInPartition += 1
@@ -545,7 +578,9 @@ private[spark] class ExternalSorter[K, V, C](
// If we've finished reading the last partition, remember that we're done
if (partitionId == numPartitions) {
finished = true
- deserStream.close()
+ if (deserializeStream != null) {
+ deserializeStream.close()
+ }
}
(k, c)
}
@@ -578,6 +613,17 @@ private[spark] class ExternalSorter[K, V, C](
item
}
}
+
+ // Clean up our open streams and put us in a state where we can't read any more data
+ def cleanup() {
+ batchId = batchOffsets.length // Prevent reading any other batch
+ val ds = deserializeStream
+ deserializeStream = null
+ fileStream = null
+ ds.close()
+ // NOTE: We don't do file.delete() here because that is done in ExternalSorter.stop().
+ // This should also be fixed in ExternalAppendOnlyMap.
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 7de5df6e1c..04d7338488 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+ private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+ val conf = new SparkConf(loadDefaults)
+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
+ conf.set("spark.serializer.objectStreamReset", "1")
+ conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ // Ensure that we actually have multiple batches per spill file
+ conf.set("spark.shuffle.spill.batchSize", "10")
+ conf
+ }
+
test("simple insert") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -57,7 +68,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("insert with collision") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -80,7 +91,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("ordering") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -125,7 +136,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("null keys and values") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
@@ -166,7 +177,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("simple aggregator") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
// reduceByKey
@@ -181,7 +192,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("simple cogroup") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
sc = new SparkContext("local", "test", conf)
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
@@ -199,7 +210,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("spilling") {
- val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -249,7 +260,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("spilling with hash collisions") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -304,7 +315,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("spilling with many hash collisions") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -329,7 +340,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("spilling with hash collisions using the Int.MaxValue key") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -347,7 +358,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
}
test("spilling with null keys and values") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 65a71e5a83..57dcb4ffab 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -25,6 +25,17 @@ import org.apache.spark._
import org.apache.spark.SparkContext._
class ExternalSorterSuite extends FunSuite with LocalSparkContext {
+ private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+ val conf = new SparkConf(loadDefaults)
+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
+ conf.set("spark.serializer.objectStreamReset", "1")
+ conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ // Ensure that we actually have multiple batches per spill file
+ conf.set("spark.shuffle.spill.batchSize", "10")
+ conf
+ }
+
test("empty data stream") {
val conf = new SparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
@@ -60,7 +71,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("few elements per partition") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -102,7 +113,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("empty partitions with spilling") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -127,7 +138,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling in local cluster") {
- val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -198,7 +209,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling in local cluster with many reduce tasks") {
- val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
@@ -269,7 +280,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("cleanup of intermediate files in sorter") {
- val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -290,7 +301,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("cleanup of intermediate files in sorter if there are errors") {
- val conf = new SparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
+ val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -311,7 +322,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("cleanup of intermediate files in shuffle") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -326,7 +337,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("cleanup of intermediate files in shuffle with errors") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -348,7 +359,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("no partial aggregation or sorting") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -363,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("partial aggregation without spill") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -379,7 +390,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("partial aggregation with spill, no ordering") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -395,7 +406,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("partial aggregation with spill, with ordering") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -412,7 +423,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("sorting without aggregation, no spill") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -429,7 +440,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("sorting without aggregation, with spill") {
- val conf = new SparkConf(false)
+ val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -446,7 +457,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling with hash collisions") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -503,7 +514,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling with many hash collisions") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -526,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling with hash collisions using the Int.MaxValue key") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -547,7 +558,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
}
test("spilling with null keys and values") {
- val conf = new SparkConf(true)
+ val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)