diff options
Diffstat (limited to 'core/src/main/scala/spark/SparkContext.scala')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b4799d7c08..fda2ee3be7 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,6 +1,7 @@ package spark import java.io._ +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -34,6 +35,8 @@ extends Logging { scheduler.start() Cache.initialize() Broadcast.initialize(true) + MapOutputTracker.initialize(true) + RDDCache.initialize(true) // Methods for creating RDDs @@ -42,6 +45,12 @@ extends Logging { def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] = parallelize(seq, numCores) + + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] = + parallelize(seq, numSlices) + + def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] = + parallelize(seq, numCores) def textFile(path: String): RDD[String] = new HadoopTextFile(this, path) @@ -158,12 +167,16 @@ extends Logging { // Get the number of cores available to run tasks (as reported by Scheduler) def numCores = scheduler.numCores - private var nextShuffleId: Int = 0 + private var nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = { - val id = nextShuffleId - nextShuffleId += 1 - id + nextShuffleId.getAndIncrement() + } + + private var nextRddId = new AtomicInteger(0) + + private[spark] def newRddId(): Int = { + nextRddId.getAndIncrement() } } |