aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-02-27 21:52:55 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-27 21:52:55 -0800
commitedf8a56ab7eaee1f7c3b4579eb10464984d31d7a (patch)
tree2c7258f351eda1f5cd9e0c9faa8f62570e29263e
parent40e080a68a8fd025435e9ff84fa9280b4aba4dcf (diff)
downloadspark-edf8a56ab7eaee1f7c3b4579eb10464984d31d7a.tar.gz
spark-edf8a56ab7eaee1f7c3b4579eb10464984d31d7a.tar.bz2
spark-edf8a56ab7eaee1f7c3b4579eb10464984d31d7a.zip
Remote BlockFetchTracker trait
This trait seems to have been created a while ago when there were multiple implementations; now that there's just one, I think it makes sense to merge it into the BlockFetcherIterator trait. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #39 from kayousterhout/remove_tracker and squashes the following commits: 8173939 [Kay Ousterhout] Remote BlockFetchTracker.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala28
2 files changed, 17 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
deleted file mode 100644
index 2e0b0e6eda..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-private[spark] trait BlockFetchTracker {
- def totalBlocks : Int
- def numLocalBlocks: Int
- def numRemoteBlocks: Int
- def remoteFetchTime : Long
- def fetchWaitTime: Long
- def remoteBytesRead : Long
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 925022e7fe..fb50b45bd4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
*/
private[storage]
-trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
- with Logging with BlockFetchTracker {
+trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
+ def totalBlocks: Int
+ def numLocalBlocks: Int
+ def numRemoteBlocks: Int
+ def remoteFetchTime: Long
+ def fetchWaitTime: Long
+ def remoteBytesRead: Long
}
@@ -233,7 +238,16 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}
- //an iterator that will read fetched blocks off the queue as they arrive.
+ override def totalBlocks: Int = numLocal + numRemote
+ override def numLocalBlocks: Int = numLocal
+ override def numRemoteBlocks: Int = numRemote
+ override def remoteFetchTime: Long = _remoteFetchTime
+ override def fetchWaitTime: Long = _fetchWaitTime
+ override def remoteBytesRead: Long = _remoteBytesRead
+
+
+ // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
+ // as they arrive.
@volatile protected var resultsGotten = 0
override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
@@ -251,14 +265,6 @@ object BlockFetcherIterator {
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}
-
- // Implementing BlockFetchTracker trait.
- override def totalBlocks: Int = numLocal + numRemote
- override def numLocalBlocks: Int = numLocal
- override def numRemoteBlocks: Int = numRemote
- override def remoteFetchTime: Long = _remoteFetchTime
- override def fetchWaitTime: Long = _fetchWaitTime
- override def remoteBytesRead: Long = _remoteBytesRead
}
// End of BasicBlockFetcherIterator