aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/SparkContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/SparkContext.scala')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala21
1 files changed, 17 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index b4799d7c08..fda2ee3be7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,6 +1,7 @@
package spark
import java.io._
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@@ -34,6 +35,8 @@ extends Logging {
scheduler.start()
Cache.initialize()
Broadcast.initialize(true)
+ MapOutputTracker.initialize(true)
+ RDDCache.initialize(true)
// Methods for creating RDDs
@@ -42,6 +45,12 @@ extends Logging {
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
parallelize(seq, numCores)
+
+ def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
+ parallelize(seq, numSlices)
+
+ def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] =
+ parallelize(seq, numCores)
def textFile(path: String): RDD[String] =
new HadoopTextFile(this, path)
@@ -158,12 +167,16 @@ extends Logging {
// Get the number of cores available to run tasks (as reported by Scheduler)
def numCores = scheduler.numCores
- private var nextShuffleId: Int = 0
+ private var nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = {
- val id = nextShuffleId
- nextShuffleId += 1
- id
+ nextShuffleId.getAndIncrement()
+ }
+
+ private var nextRddId = new AtomicInteger(0)
+
+ private[spark] def newRddId(): Int = {
+ nextRddId.getAndIncrement()
}
}