From d267c2834a639aaebd0559355c6a82613abb689b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 12:56:17 -0700 Subject: [SPARK-9031] Merge BlockObjectWriter and DiskBlockObject writer to remove abstract class BlockObjectWriter has only one concrete non-test class, DiskBlockObjectWriter. In order to simplify the code in preparation for other refactorings, I think that we should remove this base class and have only DiskBlockObjectWriter. While at one time we may have planned to have multiple BlockObjectWriter implementations, that doesn't seem to have happened, so the extra abstraction seems unnecessary. Author: Josh Rosen Closes #7391 from JoshRosen/shuffle-write-interface-refactoring and squashes the following commits: c418e33 [Josh Rosen] Fix compilation 5047995 [Josh Rosen] Fix comments d5dc548 [Josh Rosen] Update references in comments 89dc797 [Josh Rosen] Rename test suite. 5755918 [Josh Rosen] Remove unnecessary val in case class 1607c91 [Josh Rosen] Merge BlockObjectWriter and DiskBlockObjectWriter --- .../sort/BypassMergeSortShuffleWriterSuite.scala | 4 +- .../spark/storage/BlockObjectWriterSuite.scala | 173 --------------------- .../spark/storage/DiskBlockObjectWriterSuite.scala | 173 +++++++++++++++++++++ .../PartitionedSerializedPairBufferSuite.scala | 52 +++---- 4 files changed, 201 insertions(+), 201 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 542f8f4512..cc7342f1ec 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -68,8 +68,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte any[SerializerInstance], anyInt(), any[ShuffleWriteMetrics] - )).thenAnswer(new Answer[BlockObjectWriter] { - override def answer(invocation: InvocationOnMock): BlockObjectWriter = { + )).thenAnswer(new Answer[DiskBlockObjectWriter] { + override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = { val args = invocation.getArguments new DiskBlockObjectWriter( args(0).asInstanceOf[BlockId], diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala deleted file mode 100644 index 7bdea724fe..0000000000 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.storage - -import java.io.File - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.SparkConf -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.Utils - -class BlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { - - var tempDir: File = _ - - override def beforeEach(): Unit = { - tempDir = Utils.createTempDir() - } - - override def afterEach(): Unit = { - Utils.deleteRecursively(tempDir) - } - - test("verify write metrics") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - - writer.write(Long.box(20), Long.box(30)) - // Record metrics update on every write - assert(writeMetrics.shuffleRecordsWritten === 1) - // Metrics don't update on every write - assert(writeMetrics.shuffleBytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { - writer.flush() - writer.write(Long.box(i), Long.box(i)) - } - assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.shuffleRecordsWritten === 33) - writer.commitAndClose() - assert(file.length() == writeMetrics.shuffleBytesWritten) - } - - test("verify write metrics on revert") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - - writer.write(Long.box(20), Long.box(30)) - // Record metrics update on every write - assert(writeMetrics.shuffleRecordsWritten === 1) - // Metrics don't update on every write - assert(writeMetrics.shuffleBytesWritten == 0) - // After 32 writes, metrics should update - for (i <- 0 until 32) { - writer.flush() - writer.write(Long.box(i), Long.box(i)) - } - assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.shuffleRecordsWritten === 33) - writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleBytesWritten == 0) - assert(writeMetrics.shuffleRecordsWritten == 0) - } - - test("Reopening a closed block writer") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - - writer.open() - writer.close() - intercept[IllegalStateException] { - writer.open() - } - } - - test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - for (i <- 1 to 1000) { - writer.write(i, i) - } - writer.commitAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - assert(writeMetrics.shuffleRecordsWritten === 1000) - writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleRecordsWritten === 1000) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) - } - - test("commitAndClose() should be idempotent") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - for (i <- 1 to 1000) { - writer.write(i, i) - } - writer.commitAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - val writeTime = writeMetrics.shuffleWriteTime - assert(writeMetrics.shuffleRecordsWritten === 1000) - writer.commitAndClose() - assert(writeMetrics.shuffleRecordsWritten === 1000) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) - assert(writeMetrics.shuffleWriteTime === writeTime) - } - - test("revertPartialWritesAndClose() should be idempotent") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - for (i <- 1 to 1000) { - writer.write(i, i) - } - writer.revertPartialWritesAndClose() - val bytesWritten = writeMetrics.shuffleBytesWritten - val writeTime = writeMetrics.shuffleWriteTime - assert(writeMetrics.shuffleRecordsWritten === 0) - writer.revertPartialWritesAndClose() - assert(writeMetrics.shuffleRecordsWritten === 0) - assert(writeMetrics.shuffleBytesWritten === bytesWritten) - assert(writeMetrics.shuffleWriteTime === writeTime) - } - - test("fileSegment() can only be called after commitAndClose() has been called") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - for (i <- 1 to 1000) { - writer.write(i, i) - } - intercept[IllegalStateException] { - writer.fileSegment() - } - writer.close() - } - - test("commitAndClose() without ever opening or writing") { - val file = new File(tempDir, "somefile") - val writeMetrics = new ShuffleWriteMetrics() - val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) - writer.commitAndClose() - assert(writer.fileSegment().length === 0) - } -} diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala new file mode 100644 index 0000000000..66af6e1a79 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage + +import java.io.File + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils + +class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach { + + var tempDir: File = _ + + override def beforeEach(): Unit = { + tempDir = Utils.createTempDir() + } + + override def afterEach(): Unit = { + Utils.deleteRecursively(tempDir) + } + + test("verify write metrics") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + + writer.write(Long.box(20), Long.box(30)) + // Record metrics update on every write + assert(writeMetrics.shuffleRecordsWritten === 1) + // Metrics don't update on every write + assert(writeMetrics.shuffleBytesWritten == 0) + // After 32 writes, metrics should update + for (i <- 0 until 32) { + writer.flush() + writer.write(Long.box(i), Long.box(i)) + } + assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.shuffleRecordsWritten === 33) + writer.commitAndClose() + assert(file.length() == writeMetrics.shuffleBytesWritten) + } + + test("verify write metrics on revert") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + + writer.write(Long.box(20), Long.box(30)) + // Record metrics update on every write + assert(writeMetrics.shuffleRecordsWritten === 1) + // Metrics don't update on every write + assert(writeMetrics.shuffleBytesWritten == 0) + // After 32 writes, metrics should update + for (i <- 0 until 32) { + writer.flush() + writer.write(Long.box(i), Long.box(i)) + } + assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.shuffleRecordsWritten === 33) + writer.revertPartialWritesAndClose() + assert(writeMetrics.shuffleBytesWritten == 0) + assert(writeMetrics.shuffleRecordsWritten == 0) + } + + test("Reopening a closed block writer") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + + writer.open() + writer.close() + intercept[IllegalStateException] { + writer.open() + } + } + + test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + for (i <- 1 to 1000) { + writer.write(i, i) + } + writer.commitAndClose() + val bytesWritten = writeMetrics.shuffleBytesWritten + assert(writeMetrics.shuffleRecordsWritten === 1000) + writer.revertPartialWritesAndClose() + assert(writeMetrics.shuffleRecordsWritten === 1000) + assert(writeMetrics.shuffleBytesWritten === bytesWritten) + } + + test("commitAndClose() should be idempotent") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + for (i <- 1 to 1000) { + writer.write(i, i) + } + writer.commitAndClose() + val bytesWritten = writeMetrics.shuffleBytesWritten + val writeTime = writeMetrics.shuffleWriteTime + assert(writeMetrics.shuffleRecordsWritten === 1000) + writer.commitAndClose() + assert(writeMetrics.shuffleRecordsWritten === 1000) + assert(writeMetrics.shuffleBytesWritten === bytesWritten) + assert(writeMetrics.shuffleWriteTime === writeTime) + } + + test("revertPartialWritesAndClose() should be idempotent") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + for (i <- 1 to 1000) { + writer.write(i, i) + } + writer.revertPartialWritesAndClose() + val bytesWritten = writeMetrics.shuffleBytesWritten + val writeTime = writeMetrics.shuffleWriteTime + assert(writeMetrics.shuffleRecordsWritten === 0) + writer.revertPartialWritesAndClose() + assert(writeMetrics.shuffleRecordsWritten === 0) + assert(writeMetrics.shuffleBytesWritten === bytesWritten) + assert(writeMetrics.shuffleWriteTime === writeTime) + } + + test("fileSegment() can only be called after commitAndClose() has been called") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + for (i <- 1 to 1000) { + writer.write(i, i) + } + intercept[IllegalStateException] { + writer.fileSegment() + } + writer.close() + } + + test("commitAndClose() without ever opening or writing") { + val file = new File(tempDir, "somefile") + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) + writer.commitAndClose() + assert(writer.fileSegment().length === 0) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala index 6d2459d48d..3b67f62064 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala @@ -17,15 +17,20 @@ package org.apache.spark.util.collection -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import com.google.common.io.ByteStreams +import org.mockito.Matchers.any +import org.mockito.Mockito._ +import org.mockito.Mockito.RETURNS_SMART_NULLS +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.Matchers._ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.storage.{FileSegment, BlockObjectWriter} +import org.apache.spark.storage.DiskBlockObjectWriter class PartitionedSerializedPairBufferSuite extends SparkFunSuite { test("OrderedInputStream single record") { @@ -79,13 +84,13 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite { val struct = SomeStruct("something", 5) buffer.insert(4, 10, struct) val it = buffer.destructiveSortedWritablePartitionedIterator(None) - val writer = new SimpleBlockObjectWriter + val (writer, baos) = createMockWriter() assert(it.hasNext) it.nextPartition should be (4) it.writeNext(writer) assert(!it.hasNext) - val stream = serializerInstance.deserializeStream(writer.getInputStream) + val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray)) stream.readObject[AnyRef]() should be (10) stream.readObject[AnyRef]() should be (struct) } @@ -101,7 +106,7 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite { buffer.insert(5, 3, struct3) val it = buffer.destructiveSortedWritablePartitionedIterator(None) - val writer = new SimpleBlockObjectWriter + val (writer, baos) = createMockWriter() assert(it.hasNext) it.nextPartition should be (4) it.writeNext(writer) @@ -113,7 +118,7 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite { it.writeNext(writer) assert(!it.hasNext) - val stream = serializerInstance.deserializeStream(writer.getInputStream) + val stream = serializerInstance.deserializeStream(new ByteArrayInputStream(baos.toByteArray)) val iter = stream.asIterator iter.next() should be (2) iter.next() should be (struct2) @@ -123,26 +128,21 @@ class PartitionedSerializedPairBufferSuite extends SparkFunSuite { iter.next() should be (struct1) assert(!iter.hasNext) } -} - -case class SomeStruct(val str: String, val num: Int) - -class SimpleBlockObjectWriter extends BlockObjectWriter(null) { - val baos = new ByteArrayOutputStream() - override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = { - baos.write(bytes, offs, len) + def createMockWriter(): (DiskBlockObjectWriter, ByteArrayOutputStream) = { + val writer = mock(classOf[DiskBlockObjectWriter], RETURNS_SMART_NULLS) + val baos = new ByteArrayOutputStream() + when(writer.write(any(), any(), any())).thenAnswer(new Answer[Unit] { + override def answer(invocationOnMock: InvocationOnMock): Unit = { + val args = invocationOnMock.getArguments + val bytes = args(0).asInstanceOf[Array[Byte]] + val offset = args(1).asInstanceOf[Int] + val length = args(2).asInstanceOf[Int] + baos.write(bytes, offset, length) + } + }) + (writer, baos) } - - def getInputStream(): InputStream = new ByteArrayInputStream(baos.toByteArray) - - override def open(): BlockObjectWriter = this - override def close(): Unit = { } - override def isOpen: Boolean = true - override def commitAndClose(): Unit = { } - override def revertPartialWritesAndClose(): Unit = { } - override def fileSegment(): FileSegment = null - override def write(key: Any, value: Any): Unit = { } - override def recordWritten(): Unit = { } - override def write(b: Int): Unit = { } } + +case class SomeStruct(str: String, num: Int) -- cgit v1.2.3