diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-03 16:13:40 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-03 16:13:40 -0800 |
commit | 838b0e7d154699291f9915d400c59a3580173d01 (patch) | |
tree | 9079bff5940c70c744894ba63849fa1249a976f7 | |
parent | df413e996fb7a4d7e05698e21d130387cf771811 (diff) | |
download | spark-838b0e7d154699291f9915d400c59a3580173d01.tar.gz spark-838b0e7d154699291f9915d400c59a3580173d01.tar.bz2 spark-838b0e7d154699291f9915d400c59a3580173d01.zip |
Refactor using SparkConf
4 files changed, 21 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index c408d5f145..c9e3e8ec5a 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -33,8 +33,10 @@ case class Aggregator[K, V, C: ClassTag] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { + private val sparkConf = new SparkConf() + private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { - val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -58,7 +60,6 @@ case class Aggregator[K, V, C: ClassTag] ( } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { - val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kc: Product2[K, C] = null diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 44494c7e0a..7dc7094aac 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} -import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} +import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -66,6 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] + private val sparkConf = new SparkConf() private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -106,7 +107,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean + val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 9e147feec4..68a23192c0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream import scala.collection.mutable.{ArrayBuffer, PriorityQueue} -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.{SparkConf, Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} @@ -57,14 +57,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskIterator] + + private val sparkConf = new SparkConf() private val memoryThresholdMB = { // TODO: Turn this into a fraction of memory per reducer - val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong - val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat + val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024) + val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8) bufferSize * bufferPercent } - private val fileBufferSize = - System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 + private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean private val comparator = new KCComparator[K, C] private val ser = serializer.newInstance() private var spillCount = 0 @@ -84,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!") logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)") val (blockId, file) = diskBlockManager.createTempBlock() - val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity) + val writer = + new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites) try { val it = currentMap.destructiveSortedIterator(comparator) while (it.hasNext) { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index a18d466baa..6c93b1f5a0 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -4,20 +4,17 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{HashPartitioner, SparkContext, SparkEnv, LocalSparkContext} +import org.apache.spark._ import org.apache.spark.SparkContext.rddToPairRDDFunctions class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { override def beforeEach() { - sc = new SparkContext("local", "test") - System.setProperty("spark.shuffle.externalSorting", "true") - } - - after { - System.setProperty("spark.shuffle.externalSorting", "false") - System.setProperty("spark.shuffle.buffer.mb", "1024") - System.setProperty("spark.shuffle.buffer.fraction", "0.8") + val conf = new SparkConf(false) + conf.set("spark.shuffle.externalSorting", "true") + conf.set("spark.shuffle.buffer.mb", "1024") + conf.set("spark.shuffle.buffer.fraction", "0.8") + sc = new SparkContext("local", "test", conf) } val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i) |