aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-14 12:56:17 -0700
committerReynold Xin <rxin@databricks.com>2015-07-14 12:56:17 -0700
commitd267c2834a639aaebd0559355c6a82613abb689b (patch)
treed96f025b32797c82611332d1499986f9f081f5de /core/src/test
parent8fb3a65cbb714120d612e58ef9d12b0521a83260 (diff)
downloadspark-d267c2834a639aaebd0559355c6a82613abb689b.tar.gz
spark-d267c2834a639aaebd0559355c6a82613abb689b.tar.bz2
spark-d267c2834a639aaebd0559355c6a82613abb689b.zip
[SPARK-9031] Merge BlockObjectWriter and DiskBlockObject writer to remove abstract class
BlockObjectWriter has only one concrete non-test class, DiskBlockObjectWriter. In order to simplify the code in preparation for other refactorings, I think that we should remove this base class and have only DiskBlockObjectWriter. While at one time we may have planned to have multiple BlockObjectWriter implementations, that doesn't seem to have happened, so the extra abstraction seems unnecessary. Author: Josh Rosen <joshrosen@databricks.com> Closes #7391 from JoshRosen/shuffle-write-interface-refactoring and squashes the following commits: c418e33 [Josh Rosen] Fix compilation 5047995 [Josh Rosen] Fix comments d5dc548 [Josh Rosen] Update references in comments 89dc797 [Josh Rosen] Rename test suite. 5755918 [Josh Rosen] Remove unnecessary val in case class 1607c91 [Josh Rosen] Merge BlockObjectWriter and DiskBlockObjectWriter
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala (renamed from core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala52
3 files changed, 29 insertions, 29 deletions
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 542f8f4512..cc7342f1ec 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -68,8 +68,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
any[SerializerInstance],
anyInt(),
any[ShuffleWriteMetrics]
- )).thenAnswer(new Answer[BlockObjectWriter] {
- override def answer(invocation: InvocationOnMock): BlockObjectWriter = {
+ )).thenAnswer(new Answer[DiskBlockObjectWriter] {
+ override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
val args = invocation.getArguments
new DiskBlockObjectWriter(
args(0).asInstanceOf[BlockId],
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 7bdea724fe..66af6e1a79 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
-class BlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
+class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
var tempDir: File = _
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
index 6d2459d48d..3b67f62064 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
@@ -17,15 +17,20 @@
package org.apache.spark.util.collection
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.google.common.io.ByteStreams
+import org.mockito.Matchers.any
+import org.mockito.Mockito._
+import org.mockito.Mockito.RETURNS_SMART_NULLS
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
import org.scalatest.Matchers._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.storage.{FileSegment, BlockObjectWriter}
+import org.apache.spark.storage.DiskBlockObjectWriter
class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
test("OrderedInputStream single record") {
@@ -79,13 +84,13 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
val struct = SomeStruct("something", 5)
buffer.insert(4, 10, struct)
val it = buffer.destructiveSortedWritablePartitionedIterator(None)
- val writer = new SimpleBlockObjectWriter
+ val (writer, baos) = createMockWriter()
assert(it.hasNext)
it.nextPartition should be (4)
it.writeNext(writer)
assert(!it.hasNext)
- val stream = serializerInstance.deserializeStream(writer.getInputStream)
+ val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray))
stream.readObject[AnyRef]() should be (10)
stream.readObject[AnyRef]() should be (struct)
}
@@ -101,7 +106,7 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
buffer.insert(5, 3, struct3)
val it = buffer.destructiveSortedWritablePartitionedIterator(None)
- val writer = new SimpleBlockObjectWriter
+ val (writer, baos) = createMockWriter()
assert(it.hasNext)
it.nextPartition should be (4)
it.writeNext(writer)
@@ -113,7 +118,7 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
it.writeNext(writer)
assert(!it.hasNext)
- val stream = serializerInstance.deserializeStream(writer.getInputStream)
+ val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray))
val iter = stream.asIterator
iter.next() should be (2)
iter.next() should be (struct2)
@@ -123,26 +128,21 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
iter.next() should be (struct1)
assert(!iter.hasNext)
}
-}
-
-case class SomeStruct(val str: String, val num: Int)
-
-class SimpleBlockObjectWriter extends BlockObjectWriter(null) {
- val baos = new ByteArrayOutputStream()
- override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = {
- baos.write(bytes, offs, len)
+ def createMockWriter(): (DiskBlockObjectWriter, ByteArrayOutputStream) = {
+ val writer = mock(classOf[DiskBlockObjectWriter], RETURNS_SMART_NULLS)
+ val baos = new ByteArrayOutputStream()
+ when(writer.write(any(), any(), any())).thenAnswer(new Answer[Unit] {
+ override def answer(invocationOnMock: InvocationOnMock): Unit = {
+ val args = invocationOnMock.getArguments
+ val bytes = args(0).asInstanceOf[Array[Byte]]
+ val offset = args(1).asInstanceOf[Int]
+ val length = args(2).asInstanceOf[Int]
+ baos.write(bytes, offset, length)
+ }
+ })
+ (writer, baos)
}
-
- def getInputStream(): InputStream = new ByteArrayInputStream(baos.toByteArray)
-
- override def open(): BlockObjectWriter = this
- override def close(): Unit = { }
- override def isOpen: Boolean = true
- override def commitAndClose(): Unit = { }
- override def revertPartialWritesAndClose(): Unit = { }
- override def fileSegment(): FileSegment = null
- override def write(key: Any, value: Any): Unit = { }
- override def recordWritten(): Unit = { }
- override def write(b: Int): Unit = { }
}
+
+case class SomeStruct(str: String, num: Int)