aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/StorageUtils.scala
blob: 950c0cdf352f736b157a7a05d71563b95371f377 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package spark.storage

import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus

private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
  blocks: Map[String, BlockStatus]) {

  def memUsed(blockPrefix: String = "") = {
    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
      reduceOption(_+_).getOrElse(0l)
  }

  def diskUsed(blockPrefix: String = "") = {
    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize).
      reduceOption(_+_).getOrElse(0l)
  }

  def memRemaining : Long = maxMem - memUsed()

}

case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
  numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
  extends Ordered[RDDInfo] {
  override def toString = {
    import Utils.memoryBytesToString
    "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
      storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
  }

  override def compare(that: RDDInfo) = {
    this.id - that.id
  }
}

/* Helper methods for storage-related objects */
private[spark]
object StorageUtils {

  /* Given the current storage status of the BlockManager, returns information for each RDD */
  def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
    sc: SparkContext) : Array[RDDInfo] = {
    rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
  }

  /* Given a list of BlockStatus objets, returns information for each RDD */
  def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
    sc: SparkContext) : Array[RDDInfo] = {

    // Group by rddId, ignore the partition name
    val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) =>
      k.substring(0,k.lastIndexOf('_'))
    }.mapValues(_.values.toArray)

    // For each RDD, generate an RDDInfo object
    val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) =>
      // Add up memory and disk sizes
      val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
      val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)

      // Find the id of the RDD, e.g. rdd_1 => 1
      val rddId = rddKey.split("_").last.toInt

      // Get the friendly name and storage level for the RDD, if available
      sc.persistentRdds.get(rddId).map { r =>
        val rddName = Option(r.name).getOrElse(rddKey)
        val rddStorageLevel = r.getStorageLevel
        RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
      }
    }.flatten.toArray

    scala.util.Sorting.quickSort(rddInfos)

    rddInfos
  }

  /* Removes all BlockStatus object that are not part of a block prefix */
  def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
    prefix: String) : Array[StorageStatus] = {

    storageStatusList.map { status =>
      val newBlocks = status.blocks.filterKeys(_.startsWith(prefix))
      //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _)
      StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
    }

  }

}