aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-12 14:54:40 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-12 14:54:40 -0700
commit2d761e3353651049f6707c74bb5ffdd6e86f6f35 (patch)
treeb9d0dac4c0da0e92b1301ec0b9d9a7dc6ac057f8 /core
parent9b4cd1648b6c2467a63109ba817d7e7a0c46ffb9 (diff)
downloadspark-2d761e3353651049f6707c74bb5ffdd6e86f6f35.tar.gz
spark-2d761e3353651049f6707c74bb5ffdd6e86f6f35.tar.bz2
spark-2d761e3353651049f6707c74bb5ffdd6e86f6f35.zip
Ported performance and FT improvements from latest streaming work
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala3
-rw-r--r--core/src/main/scala/spark/DaemonThreadFactory.scala12
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala5
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala88
-rw-r--r--core/src/main/scala/spark/NewHadoopRDD.scala10
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala15
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala30
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala1
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
18 files changed, 153 insertions, 61 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
index ea009f0f4f..daabc0d566 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -7,7 +7,8 @@ class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
}
-class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+ extends RDD[T](sc) {
@transient
val splits_ = (0 until blockIds.size).map(i => {
diff --git a/core/src/main/scala/spark/DaemonThreadFactory.scala b/core/src/main/scala/spark/DaemonThreadFactory.scala
index 003880c5e8..56e59adeb7 100644
--- a/core/src/main/scala/spark/DaemonThreadFactory.scala
+++ b/core/src/main/scala/spark/DaemonThreadFactory.scala
@@ -6,9 +6,13 @@ import java.util.concurrent.ThreadFactory
* A ThreadFactory that creates daemon threads
*/
private object DaemonThreadFactory extends ThreadFactory {
- override def newThread(r: Runnable): Thread = {
- val t = new Thread(r)
- t.setDaemon(true)
- return t
+ override def newThread(r: Runnable): Thread = new DaemonThread(r)
+}
+
+private class DaemonThread(r: Runnable = null) extends Thread {
+ override def run() {
+ if (r != null) {
+ r.run()
+ }
}
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index f282a4023b..0befca582d 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -42,7 +42,8 @@ class HadoopRDD[K, V](
minSplits: Int)
extends RDD[(K, V)](sc) {
- val serializableConf = new SerializableWritable(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+ val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@transient
val splits_ : Array[Split] = {
@@ -66,7 +67,7 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
- val conf = serializableConf.value
+ val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 65d0532bd5..8a3f565071 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -10,8 +10,10 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer
+import com.esotericsoftware.kryo.serialize.SerializableSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
+import spark.broadcast._
import spark.storage._
/**
@@ -203,6 +205,10 @@ class KryoSerializer extends Serializer with Logging {
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
+ // Allow sending SerializableWritable
+ kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer())
+ kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer())
+
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index de23eb6f48..82c1391345 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -1,5 +1,6 @@
package spark
+import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream}
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
@@ -10,6 +11,7 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
+import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.storage.BlockManagerId
@@ -18,12 +20,11 @@ sealed trait MapOutputTrackerMessage
case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
case object StopMapOutputTracker extends MapOutputTrackerMessage
-class MapOutputTrackerActor(bmAddresses: ConcurrentHashMap[Int, Array[BlockManagerId]])
-extends Actor with Logging {
+class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
case GetMapOutputLocations(shuffleId: Int) =>
logInfo("Asked to get map output locations for shuffle " + shuffleId)
- sender ! bmAddresses.get(shuffleId)
+ sender ! tracker.getSerializedLocations(shuffleId)
case StopMapOutputTracker =>
logInfo("MapOutputTrackerActor stopped!")
@@ -39,15 +40,19 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
val timeout = 10.seconds
- private var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+ var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
private var generation: Long = 0
private var generationLock = new java.lang.Object
+ // Cache a serialized version of the output locations for each shuffle to send them out faster
+ var cacheGeneration = generation
+ val cachedSerializedLocs = new HashMap[Int, Array[Byte]]
+
var trackerActor: ActorRef = if (isMaster) {
- val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(bmAddresses)), name = actorName)
+ val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
@@ -134,15 +139,16 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val fetched = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[BlockManagerId]]
+ val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
+ val fetchedLocs = deserializeLocations(fetchedBytes)
logInfo("Got the output locations")
- bmAddresses.put(shuffleId, fetched)
+ bmAddresses.put(shuffleId, fetchedLocs)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
- return fetched
+ return fetchedLocs
} else {
return locs
}
@@ -181,4 +187,70 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
}
}
+
+ def getSerializedLocations(shuffleId: Int): Array[Byte] = {
+ var locs: Array[BlockManagerId] = null
+ var generationGotten: Long = -1
+ generationLock.synchronized {
+ if (generation > cacheGeneration) {
+ cachedSerializedLocs.clear()
+ cacheGeneration = generation
+ }
+ cachedSerializedLocs.get(shuffleId) match {
+ case Some(bytes) =>
+ return bytes
+ case None =>
+ locs = bmAddresses.get(shuffleId)
+ generationGotten = generation
+ }
+ }
+ // If we got here, we failed to find the serialized locations in the cache, so we pulled
+ // out a snapshot of the locations as "locs"; let's serialize and return that
+ val bytes = serializeLocations(locs)
+ // Add them into the table only if the generation hasn't changed while we were working
+ generationLock.synchronized {
+ if (generation == generationGotten) {
+ cachedSerializedLocs(shuffleId) = bytes
+ }
+ }
+ return bytes
+ }
+
+ // Serialize an array of map output locations into an efficient byte format so that we can send
+ // it to reduce tasks. We do this by grouping together the locations by block manager ID.
+ def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = {
+ val out = new ByteArrayOutputStream
+ val dataOut = new DataOutputStream(out)
+ dataOut.writeInt(locs.length)
+ val grouped = locs.zipWithIndex.groupBy(_._1)
+ dataOut.writeInt(grouped.size)
+ for ((id, pairs) <- grouped if id != null) {
+ dataOut.writeUTF(id.ip)
+ dataOut.writeInt(id.port)
+ dataOut.writeInt(pairs.length)
+ for ((_, blockIndex) <- pairs) {
+ dataOut.writeInt(blockIndex)
+ }
+ }
+ dataOut.close()
+ out.toByteArray
+ }
+
+ // Opposite of serializeLocations.
+ def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = {
+ val dataIn = new DataInputStream(new ByteArrayInputStream(bytes))
+ val length = dataIn.readInt()
+ val array = new Array[BlockManagerId](length)
+ val numGroups = dataIn.readInt()
+ for (i <- 0 until numGroups) {
+ val ip = dataIn.readUTF()
+ val port = dataIn.readInt()
+ val id = new BlockManagerId(ip, port)
+ val numBlocks = dataIn.readInt()
+ for (j <- 0 until numBlocks) {
+ array(dataIn.readInt()) = id
+ }
+ }
+ array
+ }
}
diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala
index d024d38aa9..14f708a3f8 100644
--- a/core/src/main/scala/spark/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/NewHadoopRDD.scala
@@ -28,7 +28,9 @@ class NewHadoopRDD[K, V](
@transient conf: Configuration)
extends RDD[(K, V)](sc) {
- private val serializableConf = new SerializableWritable(conf)
+ // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+ val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ // private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -41,7 +43,7 @@ class NewHadoopRDD[K, V](
@transient
private val splits_ : Array[Split] = {
val inputFormat = inputFormatClass.newInstance
- val jobContext = new JobContext(serializableConf.value, jobId)
+ val jobContext = new JobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Split](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -54,9 +56,9 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Split) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
- val conf = serializableConf.value
+ val conf = confBroadcast.value.value
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
- val context = new TaskAttemptContext(serializableConf.value, attemptId)
+ val context = new TaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
val reader = format.createRecordReader(split.serializableHadoopSplit.value, context)
reader.initialize(split.serializableHadoopSplit.value, context)
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3fe8e8a4bf..d28f3593fe 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def getStorageLevel = storageLevel
- def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
+ def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
if (!level.useDisk && level.replication < 2) {
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
}
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 5d21bb793f..555b3454ee 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -8,6 +8,9 @@ import scala.io.Source
import java.nio.ByteBuffer
import java.net.InetAddress
+import akka.dispatch.Await
+import akka.util.duration._
+
object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
if (args.length < 2) {
@@ -53,7 +56,7 @@ object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
- val results = futures.map(f => f())
+ val results = futures.map(f => Await.result(f, 1.second))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index b9f0a0d6d0..99984fb557 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -31,7 +31,8 @@ object ShuffleMapTask {
return old
} else {
val out = new ByteArrayOutputStream
- val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
objOut.writeObject(rdd)
objOut.writeObject(dep)
objOut.close()
@@ -63,10 +64,8 @@ object ShuffleMapTask {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
return (rdd, dep)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 750231ac31..952c9766bf 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -118,6 +118,7 @@ class ClusterScheduler(sc: SparkContext)
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized {
+ SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
slaveIdToHost(o.slaveId) = o.hostname
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 5f98a396b4..e25a11e7c5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -243,6 +243,11 @@ class TaskSetManager(
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
+ if (info.failed) {
+ // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
+ // or even from Mesos itself when acks get delayed.
+ return
+ }
val index = info.index
info.markSuccessful()
if (!finished(index)) {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 31784985dc..fdf007ffb2 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -80,6 +80,8 @@ class CoarseMesosSchedulerBackend(
"property, the SPARK_HOME environment variable or the SparkContext constructor")
}
+ val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+
var nextMesosTaskId = 0
def newMesosTaskId(): Int = {
@@ -177,7 +179,7 @@ class CoarseMesosSchedulerBackend(
val task = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse))
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", executorMemory))
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index cb7b0c8bc1..3a51f6bd96 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -484,8 +484,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) {
+ val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
Future {
- replicate(blockId, bytes, level)
+ replicate(blockId, bufferView, level)
}
} else {
null
@@ -537,21 +538,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Replicate block to another node.
*/
+ var firstTime = true
+ var peers : Seq[BlockManagerId] = null
private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
- var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ if (firstTime) {
+ peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ firstTime = false;
+ }
for (peer: BlockManagerId <- peers) {
val start = System.nanoTime
+ data.rewind()
logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is "
- + data.array().length + " Bytes. To node: " + peer)
+ + data.limit() + " Bytes. To node: " + peer)
if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
new ConnectionManagerId(peer.ip, peer.port))) {
logError("Failed to call syncPutBlock to " + peer)
}
logDebug("Replicated BlockId " + blockId + " once used " +
(System.nanoTime - start) / 1e6 + " s; The size of the data is " +
- data.array().length + " bytes.")
+ data.limit() + " bytes.")
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 0eaa558f44..0ad1ad056c 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -71,7 +71,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: " + bytes)
blockManager.putBytes(id, bytes, level)
logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
- + " with data size: " + bytes.array().length)
+ + " with data size: " + bytes.limit)
}
private def getBlock(id: String): ByteBuffer = {
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index b9833273e5..5e2ccb199a 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -12,7 +12,7 @@ case class GetBlock(id: String)
case class GotBlock(id: String, data: ByteBuffer)
case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
-class BlockMessage() extends Logging{
+class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
@@ -22,8 +22,6 @@ class BlockMessage() extends Logging{
private var data: ByteBuffer = null
private var level: StorageLevel = null
- initLogging()
-
def set(getBlock: GetBlock) {
typ = BlockMessage.TYPE_GET_BLOCK
id = getBlock.id
@@ -62,8 +60,6 @@ class BlockMessage() extends Logging{
}
id = idBuilder.toString()
- logDebug("Set from buffer Result: " + typ + " " + id)
- logDebug("Buffer position is " + buffer.position)
if (typ == BlockMessage.TYPE_PUT_BLOCK) {
val booleanInt = buffer.getInt()
@@ -77,23 +73,18 @@ class BlockMessage() extends Logging{
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 2: " + level + " " + data)
} else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
val dataLength = buffer.getInt()
- logDebug("Data length is "+ dataLength)
- logDebug("Buffer position is " + buffer.position)
data = ByteBuffer.allocate(dataLength)
if (dataLength != buffer.remaining) {
throw new Exception("Error parsing buffer")
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 3: " + data)
}
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " from bytebuffer in " + (finishTime - startTime) / 1000.0 + " s")
}
def set(bufferMsg: BufferMessage) {
@@ -145,8 +136,6 @@ class BlockMessage() extends Logging{
buffers += data
}
- logDebug("Start to log buffers.")
- buffers.foreach((x: ByteBuffer) => logDebug("" + x))
/*
println()
println("BlockMessage: ")
@@ -160,7 +149,6 @@ class BlockMessage() extends Logging{
println()
*/
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " to buffer message in " + (finishTime - startTime) / 1000.0 + " s")
return Message.createBufferMessage(buffers)
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index f66b5bc897..09287faba0 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -87,12 +87,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
} else {
- val entry = new Entry(bytes, bytes.array().length, false)
- ensureFreeSpace(bytes.array.length)
+ val entry = new Entry(bytes, bytes.limit, false)
+ ensureFreeSpace(bytes.limit)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += bytes.array().length
+ currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.array().length, freeMemory))
+ blockId, bytes.limit, freeMemory))
}
}
@@ -111,12 +111,12 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return Left(elements.iterator)
} else {
val bytes = dataSerialize(values)
- ensureFreeSpace(bytes.array().length)
- val entry = new Entry(bytes, bytes.array().length, false)
+ ensureFreeSpace(bytes.limit)
+ val entry = new Entry(bytes, bytes.limit, false)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
- currentMemory += bytes.array().length
+ currentMemory += bytes.limit
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.array.length, freeMemory))
+ blockId, bytes.limit, freeMemory))
return Right(bytes)
}
}
@@ -133,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry.deserialized) {
return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
} else {
- return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer]))
+ return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate()))
}
}
@@ -219,12 +219,12 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
val file = createFile(blockId)
if (file != null) {
val channel = new RandomAccessFile(file, "rw").getChannel()
- val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.array.length)
- buffer.put(bytes.array)
+ val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+ buffer.put(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.array.length, (finishTime - startTime)))
+ blockId, bytes.limit, (finishTime - startTime)))
} else {
logError("File not created for block " + blockId)
}
@@ -233,7 +233,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
val bytes = dataSerialize(values)
- logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes")
+ logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
putBytes(blockId, bytes, level)
return Right(bytes)
}
@@ -242,9 +242,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
val file = getFile(blockId)
val length = file.length().toInt
val channel = new RandomAccessFile(file, "r").getChannel()
- val bytes = ByteBuffer.allocate(length)
- bytes.put(channel.map(MapMode.READ_WRITE, 0, length))
- return Some(bytes)
+ Some(channel.map(MapMode.READ_WRITE, 0, length))
}
def getValues(blockId: String): Option[Iterator[Any]] = {
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index 1d38ca13cc..b168c8e869 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -66,6 +66,7 @@ class StorageLevel(
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
+ val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, 2)
val MEMORY_ONLY_DESER = new StorageLevel(false, true, true)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 57d212e4ca..df4e23bfd6 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -31,6 +31,8 @@ object AkkaUtils {
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s
+ akka.remote.netty.execution-pool-size = 8
+ akka.actor.default-dispatcher.throughput = 30
""".format(host, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)