aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-14 00:24:17 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-14 00:24:17 -0700
commit4a45019fb0458b5f943253c0c16c9e257ef2c129 (patch)
treeabb8d32dafb29492491caa906e8896abfe000c31
parentda896115ec8861f78922fa9502ef257b1e54e3c2 (diff)
downloadspark-4a45019fb0458b5f943253c0c16c9e257ef2c129.tar.gz
spark-4a45019fb0458b5f943253c0c16c9e257ef2c129.tar.bz2
spark-4a45019fb0458b5f943253c0c16c9e257ef2c129.zip
Address Matei's comments
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala13
8 files changed, 28 insertions, 34 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index 097b2ad6d5..cfd8132891 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
- String path = pResolver.getAbsolutePath(blockId.asFilename());
+ String path = pResolver.getAbsolutePath(blockId.name());
// if getFilePath returns null, close the channel
if (path == null) {
//ctx.close();
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 919f9765de..609464e38d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging {
}
def write(id: Long, value: Any) {
- val file = new File(broadcastDir, BroadcastBlockId(id).asFilename)
+ val file = new File(broadcastDir, BroadcastBlockId(id).name)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
@@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging {
}
def read[T](id: Long): T = {
- val url = serverUri + "/" + BroadcastBlockId(id).asFilename
+ val url = serverUri + "/" + BroadcastBlockId(id).name
val in = {
if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream())
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index eac2177b54..481ff8c3e0 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging {
try {
fc.init()
fc.connect(host, port)
- fc.sendRequest(blockId.asFilename)
+ fc.sendRequest(blockId.name)
fc.waitForClose()
fc.close()
} catch {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index b88fbaa4d1..1586dff254 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
- val file = new File(subDir, blockId.asFilename)
+ val file = new File(subDir, blockId.name)
return file.getAbsolutePath
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index b477a82e33..c7efc67a4a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -24,13 +24,10 @@ package org.apache.spark.storage
*
* If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method.
*/
-private[spark] abstract class BlockId {
+private[spark] sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String
- /** Physical filename for this block. May not be valid for Blocks are not file-backed. */
- def asFilename = name
-
// convenience methods
def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD = isInstanceOf[RDDBlockId]
@@ -73,21 +70,27 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
private[spark] object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
- val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
- val Broadcast = "broadcast_([0-9]+)".r
- val TaskResult = "taskresult_([0-9]+)".r
- val StreamInput = "input-([0-9]+)-([0-9]+)".r
- val Test = "test_(.*)".r
+ val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
+ val BROADCAST = "broadcast_([0-9]+)".r
+ val TASKRESULT = "taskresult_([0-9]+)".r
+ val STREAM = "input-([0-9]+)-([0-9]+)".r
+ val TEST = "test_(.*)".r
/** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String) = id match {
- case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
- case Shuffle(shuffleId, mapId, reduceId) =>
+ case RDD(rddId, splitIndex) =>
+ RDDBlockId(rddId.toInt, splitIndex.toInt)
+ case SHUFFLE(shuffleId, mapId, reduceId) =>
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
- case Broadcast(broadcastId) => BroadcastBlockId(broadcastId.toLong)
- case TaskResult(taskId) => TaskResultBlockId(taskId.toLong)
- case StreamInput(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong)
- case Test(value) => TestBlockId(value)
- case _ => throw new IllegalStateException("Unrecognized BlockId: " + id)
+ case BROADCAST(broadcastId) =>
+ BroadcastBlockId(broadcastId.toLong)
+ case TASKRESULT(taskId) =>
+ TaskResultBlockId(taskId.toLong)
+ case STREAM(streamId, uniqueId) =>
+ StreamBlockId(streamId.toInt, uniqueId.toLong)
+ case TEST(value) =>
+ TestBlockId(value)
+ case _ =>
+ throw new IllegalStateException("Unrecognized BlockId: " + id)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4ca86b7015..801f88a3db 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -970,7 +970,7 @@ private[spark] class BlockManager(
case ShuffleBlockId(_, _, _) => compressShuffle
case BroadcastBlockId(_) => compressBroadcast
case RDDBlockId(_, _) => compressRdds
- case _ => false // Won't happen in a real cluster, but it can in tests
+ case _ => false
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c0b0076a67..b7ca61e938 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
}
- new File(subDir, blockId.asFilename)
+ new File(subDir, blockId.name)
}
private def createLocalDirs(): Array[File] = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index 27f0dce9c9..cb76275e39 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -22,29 +22,20 @@ import org.scalatest.FunSuite
class BlockIdSuite extends FunSuite {
def assertSame(id1: BlockId, id2: BlockId) {
assert(id1.name === id2.name)
- assert(id1.asFilename === id2.asFilename)
assert(id1.hashCode === id2.hashCode)
assert(id1 === id2)
}
def assertDifferent(id1: BlockId, id2: BlockId) {
assert(id1.name != id2.name)
- assert(id1.asFilename != id2.asFilename)
assert(id1.hashCode != id2.hashCode)
assert(id1 != id2)
}
- test("basic-functions") {
- case class MyBlockId(name: String) extends BlockId
-
- val id = MyBlockId("a")
- assertSame(id, MyBlockId("a"))
- assertDifferent(id, MyBlockId("b"))
- assert(id.asRDDId === None)
-
+ test("test-bad-deserialization") {
try {
// Try to deserialize an invalid block id.
- BlockId("a")
+ BlockId("myblock")
fail()
} catch {
case e: IllegalStateException => // OK