aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorYucai Yu <yucai.yu@intel.com>2016-01-12 13:23:23 +0000
committerSean Owen <sowen@cloudera.com>2016-01-12 13:23:23 +0000
commit7e15044d9d9f9839c8d422bae71f27e855d559b4 (patch)
treeca905ac445c09193d8ac1b8c02ccf8e1f501ea52 /core/src/test/scala/org/apache
parent9f0995bb0d0bbe5d9b15a1ca9fa18e246ff90d66 (diff)
downloadspark-7e15044d9d9f9839c8d422bae71f27e855d559b4.tar.gz
spark-7e15044d9d9f9839c8d422bae71f27e855d559b4.tar.bz2
spark-7e15044d9d9f9839c8d422bae71f27e855d559b4.zip
[SPARK-12582][TEST] IndexShuffleBlockResolverSuite fails in windows
[SPARK-12582][Test] IndexShuffleBlockResolverSuite fails in windows * IndexShuffleBlockResolverSuite fails in windows due to file is not closed. * mv IndexShuffleBlockResolverSuite.scala from "test/java" to "test/scala". https://issues.apache.org/jira/browse/SPARK-12582 Author: Yucai Yu <yucai.yu@intel.com> Closes #10526 from yucai/master.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala136
1 files changed, 136 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
new file mode 100644
index 0000000000..d21ce73f40
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{File, FileInputStream, FileOutputStream}
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+
+class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _
+ @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _
+
+ private var tempDir: File = _
+ private val conf: SparkConf = new SparkConf(loadDefaults = false)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ tempDir = Utils.createTempDir()
+ MockitoAnnotations.initMocks(this)
+
+ when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
+ when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
+ new Answer[File] {
+ override def answer(invocation: InvocationOnMock): File = {
+ new File(tempDir, invocation.getArguments.head.toString)
+ }
+ })
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ test("commit shuffle files multiple times") {
+ val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val lengths = Array[Long](10, 0, 20)
+ val dataTmp = File.createTempFile("shuffle", null, tempDir)
+ val out = new FileOutputStream(dataTmp)
+ Utils.tryWithSafeFinally {
+ out.write(new Array[Byte](30))
+ } {
+ out.close()
+ }
+ resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
+
+ val dataFile = resolver.getDataFile(1, 2)
+ assert(dataFile.exists())
+ assert(dataFile.length() === 30)
+ assert(!dataTmp.exists())
+
+ val lengths2 = new Array[Long](3)
+ val dataTmp2 = File.createTempFile("shuffle", null, tempDir)
+ val out2 = new FileOutputStream(dataTmp2)
+ Utils.tryWithSafeFinally {
+ out2.write(Array[Byte](1))
+ out2.write(new Array[Byte](29))
+ } {
+ out2.close()
+ }
+ resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+ assert(lengths2.toSeq === lengths.toSeq)
+ assert(dataFile.exists())
+ assert(dataFile.length() === 30)
+ assert(!dataTmp2.exists())
+
+ // The dataFile should be the previous one
+ val firstByte = new Array[Byte](1)
+ val in = new FileInputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ in.read(firstByte)
+ } {
+ in.close()
+ }
+ assert(firstByte(0) === 0)
+
+ // remove data file
+ dataFile.delete()
+
+ val lengths3 = Array[Long](10, 10, 15)
+ val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
+ val out3 = new FileOutputStream(dataTmp3)
+ Utils.tryWithSafeFinally {
+ out3.write(Array[Byte](2))
+ out3.write(new Array[Byte](34))
+ } {
+ out3.close()
+ }
+ resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
+ assert(lengths3.toSeq != lengths.toSeq)
+ assert(dataFile.exists())
+ assert(dataFile.length() === 35)
+ assert(!dataTmp2.exists())
+
+ // The dataFile should be the previous one
+ val firstByte2 = new Array[Byte](1)
+ val in2 = new FileInputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ in2.read(firstByte2)
+ } {
+ in2.close()
+ }
+ assert(firstByte2(0) === 2)
+ }
+}