aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/network/BufferMessage.scala
blob: 7b0e489a6c0cc222afa1e8581ba4ccce3f71c93b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package spark.network

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer

import spark.storage.BlockManager


private[spark]
class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
  extends Message(Message.BUFFER_MESSAGE, id_) {

  val initialSize = currentSize()
  var gotChunkForSendingOnce = false

  def size = initialSize

  def currentSize() = {
    if (buffers == null || buffers.isEmpty) {
      0
    } else {
      buffers.map(_.remaining).reduceLeft(_ + _)
    }
  }

  def getChunkForSending(maxChunkSize: Int): Option[MessageChunk] = {
    if (maxChunkSize <= 0) {
      throw new Exception("Max chunk size is " + maxChunkSize)
    }

    if (size == 0 && gotChunkForSendingOnce == false) {
      val newChunk = new MessageChunk(
        new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
      gotChunkForSendingOnce = true
      return Some(newChunk)
    }

    while(!buffers.isEmpty) {
      val buffer = buffers(0)
      if (buffer.remaining == 0) {
        BlockManager.dispose(buffer)
        buffers -= buffer
      } else {
        val newBuffer = if (buffer.remaining <= maxChunkSize) {
          buffer.duplicate()
        } else {
          buffer.slice().limit(maxChunkSize).asInstanceOf[ByteBuffer]
        }
        buffer.position(buffer.position + newBuffer.remaining)
        val newChunk = new MessageChunk(new MessageChunkHeader(
            typ, id, size, newBuffer.remaining, ackId, senderAddress), newBuffer)
        gotChunkForSendingOnce = true
        return Some(newChunk)
      }
    }
    None
  }

  def getChunkForReceiving(chunkSize: Int): Option[MessageChunk] = {
    // STRONG ASSUMPTION: BufferMessage created when receiving data has ONLY ONE data buffer
    if (buffers.size > 1) {
      throw new Exception("Attempting to get chunk from message with multiple data buffers")
    }
    val buffer = buffers(0)
    if (buffer.remaining > 0) {
      if (buffer.remaining < chunkSize) {
        throw new Exception("Not enough space in data buffer for receiving chunk")
      }
      val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
      buffer.position(buffer.position + newBuffer.remaining)
      val newChunk = new MessageChunk(new MessageChunkHeader(
          typ, id, size, newBuffer.remaining, ackId, senderAddress), newBuffer)
      return Some(newChunk)
    }
    None
  }

  def flip() {
    buffers.foreach(_.flip)
  }

  def hasAckId() = (ackId != 0)

  def isCompletelyReceived() = !buffers(0).hasRemaining

  override def toString = {
    if (hasAckId) {
      "BufferAckMessage(aid = " + ackId + ", id = " + id + ", size = " + size + ")"
    } else {
      "BufferMessage(id = " + id + ", size = " + size + ")"
    }
  }
}