aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
blob: f205d4f0d60b58fc9eada9543be380f249f6175c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.io

import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

import org.apache.spark.SparkFunSuite
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.util.io.ChunkedByteBuffer

class ChunkedByteBufferSuite extends SparkFunSuite {

  test("no chunks") {
    val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
    assert(emptyChunkedByteBuffer.size === 0)
    assert(emptyChunkedByteBuffer.getChunks().isEmpty)
    assert(emptyChunkedByteBuffer.toArray === Array.empty)
    assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
    assert(emptyChunkedByteBuffer.toNetty.capacity() === 0)
    emptyChunkedByteBuffer.toInputStream(dispose = false).close()
    emptyChunkedByteBuffer.toInputStream(dispose = true).close()
  }

  test("chunks must be non-empty") {
    intercept[IllegalArgumentException] {
      new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
    }
  }

  test("getChunks() duplicates chunks") {
    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
    chunkedByteBuffer.getChunks().head.position(4)
    assert(chunkedByteBuffer.getChunks().head.position() === 0)
  }

  test("copy() does not affect original buffer's position") {
    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
    chunkedByteBuffer.copy(ByteBuffer.allocate)
    assert(chunkedByteBuffer.getChunks().head.position() === 0)
  }

  test("writeFully() does not affect original buffer's position") {
    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    assert(chunkedByteBuffer.getChunks().head.position() === 0)
  }

  test("toArray()") {
    val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
    assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
  }

  test("toArray() throws UnsupportedOperationException if size exceeds 2GB") {
    val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
    fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
    val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
    assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
    intercept[UnsupportedOperationException] {
      chunkedByteBuffer.toArray
    }
  }

  test("toInputStream()") {
    val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
    val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
    assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())

    val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
    val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
    ByteStreams.readFully(inputStream, bytesFromStream)
    assert(bytesFromStream === bytes1.array() ++ bytes2.array())
    assert(chunkedByteBuffer.getChunks().head.position() === 0)
  }
}