diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-07 12:44:22 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-07 12:44:22 -0800 |
commit | 80ba9f8ba06e623600469ddb3e59dffcbedee1d0 (patch) | |
tree | 7cf7f9c773c93ba3a97996a76d3fe71b8a98e548 /core/src/main/scala/org | |
parent | 4de9c9554ca6464b806496dbffe0ba99c0ae6b45 (diff) | |
download | spark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.tar.gz spark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.tar.bz2 spark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.zip |
Get SparkConf from SparkEnv, rather than creating new ones
Diffstat (limited to 'core/src/main/scala/org')
3 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 292e32e7c8..08a96b0c34 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -31,7 +31,7 @@ case class Aggregator[K, V, C] ( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { - private val sparkConf = new SparkConf() + private val sparkConf = SparkEnv.get.conf private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 7dc7094aac..b7c7773e58 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} -import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf} +import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -66,7 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: private type CoGroupValue = (Any, Int) // Int is dependency number private type CoGroupCombiner = Seq[CoGroup] - private val sparkConf = new SparkConf() + private val sparkConf = SparkEnv.get.conf private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -122,7 +122,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher - val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf) + val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf) val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser) rddIterators += ((it, depNum)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c348168a8b..a5897e8066 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream import scala.collection.mutable.{ArrayBuffer, PriorityQueue} -import org.apache.spark.{SparkConf, Logging, SparkEnv} +import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} @@ -68,7 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] - private val sparkConf = new SparkConf() + private val sparkConf = SparkEnv.get.conf // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { |