aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-07 12:44:22 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-07 12:44:22 -0800
commit80ba9f8ba06e623600469ddb3e59dffcbedee1d0 (patch)
tree7cf7f9c773c93ba3a97996a76d3fe71b8a98e548 /core/src/main/scala/org
parent4de9c9554ca6464b806496dbffe0ba99c0ae6b45 (diff)
downloadspark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.tar.gz
spark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.tar.bz2
spark-80ba9f8ba06e623600469ddb3e59dffcbedee1d0.zip
Get SparkConf from SparkEnv, rather than creating new ones
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala4
3 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 292e32e7c8..08a96b0c34 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -31,7 +31,7 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7dc7094aac..b7c7773e58 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -66,7 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -122,7 +122,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
+ val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
rddIterators += ((it, depNum))
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c348168a8b..a5897e8066 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import org.apache.spark.{SparkConf, Logging, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
@@ -68,7 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {