aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala3
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala5
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala88
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala14
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala8
11 files changed, 105 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
index ea009f0f4f..daabc0d566 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -7,7 +7,8 @@ class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
}
-class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+ extends RDD[T](sc) {
@transient
val splits_ = (0 until blockIds.size).map(i => {
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index f282a4023b..0befca582d 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -42,7 +42,8 @@ class HadoopRDD[K, V](
minSplits: Int)
extends RDD[(K, V)](sc) {
- val serializableConf = new SerializableWritable(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+ val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@transient
val splits_ : Array[Split] = {
@@ -66,7 +67,7 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
- val conf = serializableConf.value
+ val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 65d0532bd5..8a3f565071 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -10,8 +10,10 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer
+import com.esotericsoftware.kryo.serialize.SerializableSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
+import spark.broadcast._
import spark.storage._
/**
@@ -203,6 +205,10 @@ class KryoSerializer extends Serializer with Logging {
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
+ // Allow sending SerializableWritable
+ kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer())
+ kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer())
+
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index de23eb6f48..82c1391345 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -1,5 +1,6 @@
package spark
+import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream}
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
@@ -10,6 +11,7 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
+import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.storage.BlockManagerId
@@ -18,12 +20,11 @@ sealed trait MapOutputTrackerMessage
case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
case object StopMapOutputTracker extends MapOutputTrackerMessage
-class MapOutputTrackerActor(bmAddresses: ConcurrentHashMap[Int, Array[BlockManagerId]])
-extends Actor with Logging {
+class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
case GetMapOutputLocations(shuffleId: Int) =>
logInfo("Asked to get map output locations for shuffle " + shuffleId)
- sender ! bmAddresses.get(shuffleId)
+ sender ! tracker.getSerializedLocations(shuffleId)
case StopMapOutputTracker =>
logInfo("MapOutputTrackerActor stopped!")
@@ -39,15 +40,19 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
val timeout = 10.seconds
- private var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+ var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
private var generation: Long = 0
private var generationLock = new java.lang.Object
+ // Cache a serialized version of the output locations for each shuffle to send them out faster
+ var cacheGeneration = generation
+ val cachedSerializedLocs = new HashMap[Int, Array[Byte]]
+
var trackerActor: ActorRef = if (isMaster) {
- val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(bmAddresses)), name = actorName)
+ val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
@@ -134,15 +139,16 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val fetched = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[BlockManagerId]]
+ val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
+ val fetchedLocs = deserializeLocations(fetchedBytes)
logInfo("Got the output locations")
- bmAddresses.put(shuffleId, fetched)
+ bmAddresses.put(shuffleId, fetchedLocs)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
- return fetched
+ return fetchedLocs
} else {
return locs
}
@@ -181,4 +187,70 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
}
}
+
+ def getSerializedLocations(shuffleId: Int): Array[Byte] = {
+ var locs: Array[BlockManagerId] = null
+ var generationGotten: Long = -1
+ generationLock.synchronized {
+ if (generation > cacheGeneration) {
+ cachedSerializedLocs.clear()
+ cacheGeneration = generation
+ }
+ cachedSerializedLocs.get(shuffleId) match {
+ case Some(bytes) =>
+ return bytes
+ case None =>
+ locs = bmAddresses.get(shuffleId)
+ generationGotten = generation
+ }
+ }
+ // If we got here, we failed to find the serialized locations in the cache, so we pulled
+ // out a snapshot of the locations as "locs"; let's serialize and return that
+ val bytes = serializeLocations(locs)
+ // Add them into the table only if the generation hasn't changed while we were working
+ generationLock.synchronized {
+ if (generation == generationGotten) {
+ cachedSerializedLocs(shuffleId) = bytes
+ }
+ }
+ return bytes
+ }
+
+ // Serialize an array of map output locations into an efficient byte format so that we can send
+ // it to reduce tasks. We do this by grouping together the locations by block manager ID.
+ def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = {
+ val out = new ByteArrayOutputStream
+ val dataOut = new DataOutputStream(out)
+ dataOut.writeInt(locs.length)
+ val grouped = locs.zipWithIndex.groupBy(_._1)
+ dataOut.writeInt(grouped.size)
+ for ((id, pairs) <- grouped if id != null) {
+ dataOut.writeUTF(id.ip)
+ dataOut.writeInt(id.port)
+ dataOut.writeInt(pairs.length)
+ for ((_, blockIndex) <- pairs) {
+ dataOut.writeInt(blockIndex)
+ }
+ }
+ dataOut.close()
+ out.toByteArray
+ }
+
+ // Opposite of serializeLocations.
+ def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = {
+ val dataIn = new DataInputStream(new ByteArrayInputStream(bytes))
+ val length = dataIn.readInt()
+ val array = new Array[BlockManagerId](length)
+ val numGroups = dataIn.readInt()
+ for (i <- 0 until numGroups) {
+ val ip = dataIn.readUTF()
+ val port = dataIn.readInt()
+ val id = new BlockManagerId(ip, port)
+ val numBlocks = dataIn.readInt()
+ for (j <- 0 until numBlocks) {
+ array(dataIn.readInt()) = id
+ }
+ }
+ array
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 73479bff01..f1eae9bc88 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -26,7 +26,8 @@ object ShuffleMapTask {
return old
} else {
val out = new ByteArrayOutputStream
- val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
objOut.writeObject(rdd)
objOut.writeObject(dep)
objOut.close()
@@ -45,10 +46,8 @@ object ShuffleMapTask {
} else {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
val tuple = (rdd, dep)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 5b59479682..20c82ad0fa 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -115,6 +115,7 @@ class ClusterScheduler(sc: SparkContext)
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized {
+ SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
slaveIdToHost(o.slaveId) = o.hostname
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 83e7c6e036..978b4f2676 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster
def removeSlave(slaveId: String) {
- logInfo("Slave " + slaveId + " disconnected, so removing it")
+ logWarning("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId)
addressToSlaveId -= slaveAddress(slaveId)
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index 0b2ed69e07..607633c6df 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -12,7 +12,7 @@ case class GetBlock(id: String)
case class GotBlock(id: String, data: ByteBuffer)
case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
-class BlockMessage() extends Logging{
+class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
@@ -22,8 +22,6 @@ class BlockMessage() extends Logging{
private var data: ByteBuffer = null
private var level: StorageLevel = null
- initLogging()
-
def set(getBlock: GetBlock) {
typ = BlockMessage.TYPE_GET_BLOCK
id = getBlock.id
@@ -62,8 +60,6 @@ class BlockMessage() extends Logging{
}
id = idBuilder.toString()
- logDebug("Set from buffer Result: " + typ + " " + id)
- logDebug("Buffer position is " + buffer.position)
if (typ == BlockMessage.TYPE_PUT_BLOCK) {
val booleanInt = buffer.getInt()
@@ -77,23 +73,18 @@ class BlockMessage() extends Logging{
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 2: " + level + " " + data)
} else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
val dataLength = buffer.getInt()
- logDebug("Data length is "+ dataLength)
- logDebug("Buffer position is " + buffer.position)
data = ByteBuffer.allocate(dataLength)
if (dataLength != buffer.remaining) {
throw new Exception("Error parsing buffer")
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 3: " + data)
}
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " from bytebuffer in " + (finishTime - startTime) / 1000.0 + " s")
}
def set(bufferMsg: BufferMessage) {
@@ -145,8 +136,6 @@ class BlockMessage() extends Logging{
buffers += data
}
- logDebug("Start to log buffers.")
- buffers.foreach((x: ByteBuffer) => logDebug("" + x))
/*
println()
println("BlockMessage: ")
@@ -160,7 +149,6 @@ class BlockMessage() extends Logging{
println()
*/
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " to buffer message in " + (finishTime - startTime) / 1000.0 + " s")
return Message.createBufferMessage(buffers)
}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 57d212e4ca..df4e23bfd6 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -31,6 +31,8 @@ object AkkaUtils {
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s
+ akka.remote.netty.execution-pool-size = 8
+ akka.actor.default-dispatcher.throughput = 30
""".format(host, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..230d806a89 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
+ println("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index aa542ba07d..c22949d7b9 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions {
object WordCount2 {
def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
+ (0 until 3).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
.map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
+ .reduceByKey(_ + _, 100)
.count()
}
}
@@ -88,7 +88,7 @@ object WordCount2 {
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
+ println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
println("Data count: " + data.count())
println("Data count: " + data.count())