aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala
blob: a60bffe611f14d3d7a2b7cb9ddad9a2f73001530 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.util.collection

import java.io.OutputStream

import scala.collection.mutable.ArrayBuffer

/**
 * A logical byte buffer that wraps a list of byte arrays. All the byte arrays have equal size. The
 * advantage of this over a standard ArrayBuffer is that it can grow without claiming large amounts
 * of memory and needing to copy the full contents. The disadvantage is that the contents don't
 * occupy a contiguous segment of memory.
 */
private[spark] class ChainedBuffer(chunkSize: Int) {
  private val chunkSizeLog2 = (math.log(chunkSize) / math.log(2)).toInt
  assert(math.pow(2, chunkSizeLog2).toInt == chunkSize,
    s"ChainedBuffer chunk size $chunkSize must be a power of two")
  private val chunks: ArrayBuffer[Array[Byte]] = new ArrayBuffer[Array[Byte]]()
  private var _size: Int = _

  /**
   * Feed bytes from this buffer into a BlockObjectWriter.
   *
   * @param pos Offset in the buffer to read from.
   * @param os OutputStream to read into.
   * @param len Number of bytes to read.
   */
  def read(pos: Int, os: OutputStream, len: Int): Unit = {
    if (pos + len > _size) {
      throw new IndexOutOfBoundsException(
        s"Read of $len bytes at position $pos would go past size ${_size} of buffer")
    }
    var chunkIndex = pos >> chunkSizeLog2
    var posInChunk = pos - (chunkIndex << chunkSizeLog2)
    var written = 0
    while (written < len) {
      val toRead = math.min(len - written, chunkSize - posInChunk)
      os.write(chunks(chunkIndex), posInChunk, toRead)
      written += toRead
      chunkIndex += 1
      posInChunk = 0
    }
  }

  /**
   * Read bytes from this buffer into a byte array.
   *
   * @param pos Offset in the buffer to read from.
   * @param bytes Byte array to read into.
   * @param offs Offset in the byte array to read to.
   * @param len Number of bytes to read.
   */
  def read(pos: Int, bytes: Array[Byte], offs: Int, len: Int): Unit = {
    if (pos + len > _size) {
      throw new IndexOutOfBoundsException(
        s"Read of $len bytes at position $pos would go past size of buffer")
    }
    var chunkIndex = pos >> chunkSizeLog2
    var posInChunk = pos - (chunkIndex << chunkSizeLog2)
    var written = 0
    while (written < len) {
      val toRead = math.min(len - written, chunkSize - posInChunk)
      System.arraycopy(chunks(chunkIndex), posInChunk, bytes, offs + written, toRead)
      written += toRead
      chunkIndex += 1
      posInChunk = 0
    }
  }

  /**
   * Write bytes from a byte array into this buffer.
   *
   * @param pos Offset in the buffer to write to.
   * @param bytes Byte array to write from.
   * @param offs Offset in the byte array to write from.
   * @param len Number of bytes to write.
   */
  def write(pos: Int, bytes: Array[Byte], offs: Int, len: Int): Unit = {
    if (pos > _size) {
      throw new IndexOutOfBoundsException(
        s"Write at position $pos starts after end of buffer ${_size}")
    }
    // Grow if needed
    val endChunkIndex = (pos + len - 1) >> chunkSizeLog2
    while (endChunkIndex >= chunks.length) {
      chunks += new Array[Byte](chunkSize)
    }

    var chunkIndex = pos >> chunkSizeLog2
    var posInChunk = pos - (chunkIndex << chunkSizeLog2)
    var written = 0
    while (written < len) {
      val toWrite = math.min(len - written, chunkSize - posInChunk)
      System.arraycopy(bytes, offs + written, chunks(chunkIndex), posInChunk, toWrite)
      written += toWrite
      chunkIndex += 1
      posInChunk = 0
    }

    _size = math.max(_size, pos + len)
  }

  /**
   * Total size of buffer that can be written to without allocating additional memory.
   */
  def capacity: Int = chunks.size * chunkSize

  /**
   * Size of the logical buffer.
   */
  def size: Int = _size
}

/**
 * Output stream that writes to a ChainedBuffer.
 */
private[spark] class ChainedBufferOutputStream(chainedBuffer: ChainedBuffer) extends OutputStream {
  private var pos = 0

  override def write(b: Int): Unit = {
    throw new UnsupportedOperationException()
  }

  override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = {
    chainedBuffer.write(pos, bytes, offs, len)
    pos += len
  }
}