aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-13 11:15:02 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-13 11:15:02 -0700
commitda896115ec8861f78922fa9502ef257b1e54e3c2 (patch)
treef1b48c15709b5e6f322f699f9f04981fb3caaf5b
parentd60352283cdc34722749024f6fd47e9309f01e37 (diff)
downloadspark-da896115ec8861f78922fa9502ef257b1e54e3c2.tar.gz
spark-da896115ec8861f78922fa9502ef257b1e54e3c2.tar.bz2
spark-da896115ec8861f78922fa9502ef257b1e54e3c2.zip
Change BlockId filename to name + rest of Patrick's comments
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala22
11 files changed, 39 insertions, 36 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index 299162f12c..097b2ad6d5 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
- String path = pResolver.getAbsolutePath(blockId.filename());
+ String path = pResolver.getAbsolutePath(blockId.asFilename());
// if getFilePath returns null, close the channel
if (path == null) {
//ctx.close();
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 5948fcef88..919f9765de 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging {
}
def write(id: Long, value: Any) {
- val file = new File(broadcastDir, BroadcastBlockId(id).filename)
+ val file = new File(broadcastDir, BroadcastBlockId(id).asFilename)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
@@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging {
}
def read[T](id: Long): T = {
- val url = serverUri + "/" + BroadcastBlockId(id).filename
+ val url = serverUri + "/" + BroadcastBlockId(id).asFilename
val in = {
if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream())
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index 9ade373823..1b9fa1e53a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -30,8 +30,8 @@ private[spark] class FileHeader (
val buf = Unpooled.buffer()
buf.capacity(FileHeader.HEADER_SIZE)
buf.writeInt(fileLen)
- buf.writeInt(blockId.filename.length)
- blockId.filename.foreach((x: Char) => buf.writeByte(x))
+ buf.writeInt(blockId.name.length)
+ blockId.name.foreach((x: Char) => buf.writeByte(x))
//padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index bb1c8ac20e..eac2177b54 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging {
try {
fc.init()
fc.connect(host, port)
- fc.sendRequest(blockId.filename)
+ fc.sendRequest(blockId.asFilename)
fc.waitForClose()
fc.close()
} catch {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 611a44e5b9..b88fbaa4d1 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
- val file = new File(subDir, blockId.filename)
+ val file = new File(subDir, blockId.asFilename)
return file.getAbsolutePath
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 0fc212d8f9..b477a82e33 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -25,8 +25,11 @@ package org.apache.spark.storage
* If your BlockId should be serializable, be sure to add it to the BlockId.fromString() method.
*/
private[spark] abstract class BlockId {
- // Physical filename and unique identifier of this Block.
- def filename: String
+ /** A globally unique identifier for this Block. Can be used for ser/de. */
+ def name: String
+
+ /** Physical filename for this block. May not be valid for Blocks are not file-backed. */
+ def asFilename = name
// convenience methods
def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
@@ -34,41 +37,40 @@ private[spark] abstract class BlockId {
def isShuffle = isInstanceOf[ShuffleBlockId]
def isBroadcast = isInstanceOf[BroadcastBlockId]
- override def toString = filename
- override def hashCode = filename.hashCode
+ override def toString = name
+ override def hashCode = name.hashCode
override def equals(other: Any): Boolean = other match {
- case o: BlockId => filename.equals(o.filename)
+ case o: BlockId => getClass == o.getClass && name.equals(o.name)
case _ => false
}
}
private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
- def filename = "rdd_" + rddId + "_" + splitIndex
+ def name = "rdd_" + rddId + "_" + splitIndex
}
private[spark]
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
- def filename = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
+ def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
private[spark] case class BroadcastBlockId(broadcastId: Long) extends BlockId {
- def filename = "broadcast_" + broadcastId
+ def name = "broadcast_" + broadcastId
}
private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
- def filename = "taskresult_" + taskId
+ def name = "taskresult_" + taskId
}
private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
- def filename = "input-" + streamId + "-" + uniqueId
+ def name = "input-" + streamId + "-" + uniqueId
}
// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
- def filename = "test_" + id
+ def name = "test_" + id
}
-// Contains deserialization logic (i.e., String -> BlockId).
private[spark] object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
@@ -77,6 +79,7 @@ private[spark] object BlockId {
val StreamInput = "input-([0-9]+)-([0-9]+)".r
val Test = "test_(.*)".r
+ /** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String) = id match {
case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
case Shuffle(shuffleId, mapId, reduceId) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index cc0c46ec16..45f51da288 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -62,7 +62,7 @@ private[storage] object BlockManagerMessages {
override def writeExternal(out: ObjectOutput) {
blockManagerId.writeExternal(out)
- out.writeUTF(blockId.filename)
+ out.writeUTF(blockId.name)
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 7e8ee2486e..80dcb5a207 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -117,9 +117,9 @@ private[spark] class BlockMessage() {
def toBufferMessage: BufferMessage = {
val startTime = System.currentTimeMillis
val buffers = new ArrayBuffer[ByteBuffer]()
- var buffer = ByteBuffer.allocate(4 + 4 + id.filename.length * 2)
- buffer.putInt(typ).putInt(id.filename.length)
- id.filename.foreach((x: Char) => buffer.putChar(x))
+ var buffer = ByteBuffer.allocate(4 + 4 + id.name.length * 2)
+ buffer.putInt(typ).putInt(id.name.length)
+ id.name.foreach((x: Char) => buffer.putChar(x))
buffer.flip()
buffers += buffer
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 4200935d93..c0b0076a67 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
}
- new File(subDir, blockId.filename)
+ new File(subDir, blockId.asFilename)
}
private def createLocalDirs(): Array[File] = {
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 74e5deb35d..b83cd54f3c 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -46,7 +46,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
"Executors")
val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.
- sortWith(_._1.filename < _._1.filename)
+ sortWith(_._1.name < _._1.name)
val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
val blocks = blockStatuses.map {
case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index 538482f6ff..27f0dce9c9 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -21,21 +21,21 @@ import org.scalatest.FunSuite
class BlockIdSuite extends FunSuite {
def assertSame(id1: BlockId, id2: BlockId) {
- assert(id1.filename === id2.filename)
- assert(id1.toString === id2.toString)
+ assert(id1.name === id2.name)
+ assert(id1.asFilename === id2.asFilename)
assert(id1.hashCode === id2.hashCode)
assert(id1 === id2)
}
def assertDifferent(id1: BlockId, id2: BlockId) {
- assert(id1.filename != id2.filename)
- assert(id1.toString != id2.toString)
+ assert(id1.name != id2.name)
+ assert(id1.asFilename != id2.asFilename)
assert(id1.hashCode != id2.hashCode)
assert(id1 != id2)
}
test("basic-functions") {
- case class MyBlockId(filename: String) extends BlockId
+ case class MyBlockId(name: String) extends BlockId
val id = MyBlockId("a")
assertSame(id, MyBlockId("a"))
@@ -56,7 +56,7 @@ class BlockIdSuite extends FunSuite {
val id = RDDBlockId(1, 2)
assertSame(id, RDDBlockId(1, 2))
assertDifferent(id, RDDBlockId(1, 1))
- assert(id.toString === "rdd_1_2")
+ assert(id.name === "rdd_1_2")
assert(id.asRDDId.get.rddId === 1)
assert(id.asRDDId.get.splitIndex === 2)
assert(id.isRDD)
@@ -67,7 +67,7 @@ class BlockIdSuite extends FunSuite {
val id = ShuffleBlockId(1, 2, 3)
assertSame(id, ShuffleBlockId(1, 2, 3))
assertDifferent(id, ShuffleBlockId(3, 2, 3))
- assert(id.toString === "shuffle_1_2_3")
+ assert(id.name === "shuffle_1_2_3")
assert(id.asRDDId === None)
assert(id.shuffleId === 1)
assert(id.mapId === 2)
@@ -80,7 +80,7 @@ class BlockIdSuite extends FunSuite {
val id = BroadcastBlockId(42)
assertSame(id, BroadcastBlockId(42))
assertDifferent(id, BroadcastBlockId(123))
- assert(id.toString === "broadcast_42")
+ assert(id.name === "broadcast_42")
assert(id.asRDDId === None)
assert(id.broadcastId === 42)
assert(id.isBroadcast)
@@ -91,7 +91,7 @@ class BlockIdSuite extends FunSuite {
val id = TaskResultBlockId(60)
assertSame(id, TaskResultBlockId(60))
assertDifferent(id, TaskResultBlockId(61))
- assert(id.toString === "taskresult_60")
+ assert(id.name === "taskresult_60")
assert(id.asRDDId === None)
assert(id.taskId === 60)
assert(!id.isRDD)
@@ -102,7 +102,7 @@ class BlockIdSuite extends FunSuite {
val id = StreamBlockId(1, 100)
assertSame(id, StreamBlockId(1, 100))
assertDifferent(id, StreamBlockId(2, 101))
- assert(id.toString === "input-1-100")
+ assert(id.name === "input-1-100")
assert(id.asRDDId === None)
assert(id.streamId === 1)
assert(id.uniqueId === 100)
@@ -114,7 +114,7 @@ class BlockIdSuite extends FunSuite {
val id = TestBlockId("abc")
assertSame(id, TestBlockId("abc"))
assertDifferent(id, TestBlockId("ab"))
- assert(id.toString === "test_abc")
+ assert(id.name === "test_abc")
assert(id.asRDDId === None)
assert(id.id === "abc")
assert(!id.isShuffle)