aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-07 21:50:01 -0800
committerAndrew Or <andrew@databricks.com>2016-03-07 21:50:01 -0800
commite52e597db48d069b98c1d404b221d3365f38fbb8 (patch)
tree4726833e40fca0e9aa9c265ea772afe5c81bdc7e /core/src/test
parent017cdf2be67776978a940609d610afea79856b17 (diff)
downloadspark-e52e597db48d069b98c1d404b221d3365f38fbb8.tar.gz
spark-e52e597db48d069b98c1d404b221d3365f38fbb8.tar.bz2
spark-e52e597db48d069b98c1d404b221d3365f38fbb8.zip
[SPARK-13659] Refactor BlockStore put*() APIs to remove returnValues
In preparation for larger refactoring, this patch removes the confusing `returnValues` option from the BlockStore put() APIs: returning the value is only useful in one place (caching) and in other situations, such as block replication, it's simpler to put() and then get(). As part of this change, I needed to refactor `BlockManager.doPut()`'s block replication code. I also changed `doPut()` to access the memory and disk stores directly rather than calling them through the BlockStore interface; this is in anticipation of a followup patch to remove the BlockStore interface so that the disk store can expose a binary-data-oriented API which is not concerned with Java objects or serialization. These changes should be covered by the existing storage unit tests. The best way to review this patch is probably to look at the individual commits, all of which are small and have useful descriptions to guide the review. /cc davies for review. Author: Josh Rosen <joshrosen@databricks.com> Closes #11502 from JoshRosen/remove-returnvalues.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala52
1 files changed, 22 insertions, 30 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89b427049b..cfcbf1745d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
- assert(result1.size > 0) // unroll was successful
- assert(result2.size > 0)
- assert(result1.data.isLeft) // unroll did not drop this block to disk
- assert(result2.data.isLeft)
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Re-put these two blocks so block manager knows about them too. Otherwise, block manager
@@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
- assert(result3.size > 0)
- assert(result3.data.isLeft)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1185,9 +1182,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
- assert(result4.size === 0) // unroll was unsuccessful
- assert(result4.data.isLeft)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+ assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1214,8 +1210,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
- assert(result3.size > 0)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1229,9 +1225,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. This should fail and drop the new block to disk
// directly in addition to kicking out b2 in the process. Memory store should contain only
// b3, while disk store should contain b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
- assert(result4.size > 0)
- assert(result4.data.isRight) // unroll returned bytes from disk
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+ assert(result4.isRight)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1252,28 +1247,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// All unroll memory used is released because unrollSafely returned an array
- memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b1", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because unrollSafely returned an iterator
// that still depends on the underlying vector used in the process
- memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b3", smallIterator, memOnly)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b4", smallIterator, memOnly)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b5", smallIterator, memOnly)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b6", smallIterator, memOnly)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b7", smallIterator, memOnly)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1286,11 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockId = BlockId("rdd_3_10")
store.blockInfoManager.lockNewBlockForWriting(
blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
- val result = memoryStore.putBytes(blockId, 13000, () => {
+ memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
- assert(result.size === 13000)
- assert(result.data === null)
}
test("put a small ByteBuffer to MemoryStore") {
@@ -1298,12 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
var bytes: ByteBuffer = null
- val result = memoryStore.putBytes(blockId, 10000, () => {
+ memoryStore.putBytes(blockId, 10000, () => {
bytes = ByteBuffer.allocate(10000)
bytes
})
- assert(result.size === 10000)
- assert(result.data === Right(bytes))
+ assert(memoryStore.getSize(blockId) === 10000)
}
test("read-locked blocks cannot be evicted from the MemoryStore") {