aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-03 16:13:40 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-03 16:13:40 -0800
commit838b0e7d154699291f9915d400c59a3580173d01 (patch)
tree9079bff5940c70c744894ba63849fa1249a976f7
parentdf413e996fb7a4d7e05698e21d130387cf771811 (diff)
downloadspark-838b0e7d154699291f9915d400c59a3580173d01.tar.gz
spark-838b0e7d154699291f9915d400c59a3580173d01.tar.bz2
spark-838b0e7d154699291f9915d400c59a3580173d01.zip
Refactor using SparkConf
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala15
4 files changed, 21 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c408d5f145..c9e3e8ec5a 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,8 +33,10 @@ case class Aggregator[K, V, C: ClassTag] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
+ private val sparkConf = new SparkConf()
+ private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
- val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
@@ -58,7 +60,6 @@ case class Aggregator[K, V, C: ClassTag] (
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
- val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 44494c7e0a..7dc7094aac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -66,6 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
+ private val sparkConf = new SparkConf()
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -106,7 +107,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
- val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+ val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9e147feec4..68a23192c0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{SparkConf, Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
@@ -57,14 +57,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskIterator]
+
+ private val sparkConf = new SparkConf()
private val memoryThresholdMB = {
// TODO: Turn this into a fraction of memory per reducer
- val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
- val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
+ val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024)
+ val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8)
bufferSize * bufferPercent
}
- private val fileBufferSize =
- System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+ private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
+ private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()
private var spillCount = 0
@@ -84,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!")
logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)")
val (blockId, file) = diskBlockManager.createTempBlock()
- val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity)
+ val writer =
+ new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
try {
val it = currentMap.destructiveSortedIterator(comparator)
while (it.hasNext) {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index a18d466baa..6c93b1f5a0 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -4,20 +4,17 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.apache.spark.{HashPartitioner, SparkContext, SparkEnv, LocalSparkContext}
+import org.apache.spark._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
override def beforeEach() {
- sc = new SparkContext("local", "test")
- System.setProperty("spark.shuffle.externalSorting", "true")
- }
-
- after {
- System.setProperty("spark.shuffle.externalSorting", "false")
- System.setProperty("spark.shuffle.buffer.mb", "1024")
- System.setProperty("spark.shuffle.buffer.fraction", "0.8")
+ val conf = new SparkConf(false)
+ conf.set("spark.shuffle.externalSorting", "true")
+ conf.set("spark.shuffle.buffer.mb", "1024")
+ conf.set("spark.shuffle.buffer.fraction", "0.8")
+ sc = new SparkContext("local", "test", conf)
}
val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)