aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/broadcast/Broadcast.scala
blob: 415bde5d677cb3b0acf59ecc29d86be776f519c9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package spark.broadcast

import java.io._
import java.util.concurrent.atomic.AtomicLong

import spark._

abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
  def value: T

  // We cannot have an abstract readObject here due to some weird issues with
  // readObject having to be 'private' in sub-classes.

  override def toString = "spark.Broadcast(" + id + ")"
}

private[spark] 
class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {

  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  // Called by SparkContext or Executor before using Broadcast
  private def initialize() {
    synchronized {
      if (!initialized) {
        val broadcastFactoryClass = System.getProperty(
          "spark.broadcast.factory", "spark.broadcast.HttpBroadcastFactory")

        broadcastFactory =
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

        // Initialize appropriate BroadcastFactory and BroadcastObject
        broadcastFactory.initialize(isDriver)

        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  def newBroadcast[T](value_ : T, isLocal: Boolean) =
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

  def isDriver = _isDriver
}