aboutsummaryrefslogtreecommitdiff
path: root/src/scala/spark/DfsBroadcast.scala
blob: 7b1ebce851050585aad59ece1ad300bafa49736b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package spark

import java.io._
import java.net._
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}

import spark.compress.lzf.{LZFInputStream, LZFOutputStream}

@serializable 
class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean) 
extends Broadcast[T] with Logging {
  
  def value = value_

  DfsBroadcast.synchronized { 
    DfsBroadcast.values.put(uuid, value_) 
  }

  if (!isLocal) { 
    sendBroadcast 
  }

  def sendBroadcast (): Unit = {
    val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
    out.writeObject (value_)
    out.close
  }

  // Called by JVM when deserializing an object
  private def readObject(in: ObjectInputStream): Unit = {
    in.defaultReadObject
    DfsBroadcast.synchronized {
      val cachedVal = DfsBroadcast.values.get(uuid)
      if (cachedVal != null) {
        value_ = cachedVal.asInstanceOf[T]
      } else {
        logInfo( "Started reading Broadcasted variable " + uuid)
        val start = System.nanoTime
        
        val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
        value_ = fileIn.readObject.asInstanceOf[T]
        DfsBroadcast.values.put(uuid, value_)
        fileIn.close
        
        val time = (System.nanoTime - start) / 1e9
        logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
      }
    }
  }
}

class DfsBroadcastFactory 
extends BroadcastFactory {
  def initialize (isMaster: Boolean) = DfsBroadcast.initialize
  def newBroadcast[T] (value_ : T, isLocal: Boolean) = 
    new DfsBroadcast[T] (value_, isLocal)
}

private object DfsBroadcast
extends Logging {
  val values = Cache.newKeySpace()

  private var initialized = false

  private var fileSystem: FileSystem = null
  private var workDir: String = null
  private var compress: Boolean = false
  private var bufferSize: Int = 65536

  def initialize (): Unit = {
    synchronized {
      if (!initialized) {
        bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
        val dfs = System.getProperty("spark.dfs", "file:///")
        if (!dfs.startsWith("file://")) {
          val conf = new Configuration()
          conf.setInt("io.file.buffer.size", bufferSize)
          val rep = System.getProperty("spark.dfs.replication", "3").toInt
          conf.setInt("dfs.replication", rep)
          fileSystem = FileSystem.get(new URI(dfs), conf)
        }
        workDir = System.getProperty("spark.dfs.workdir", "/tmp")
        compress = System.getProperty("spark.compress", "false").toBoolean

        initialized = true
      }
    }
  }

  private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)

  def openFileForReading(uuid: UUID): InputStream = {
    val fileStream = if (fileSystem != null) {
      fileSystem.open(getPath(uuid))
    } else {
      // Local filesystem
      new FileInputStream(getPath(uuid).toString)
    }
    
    if (compress) {
      // LZF stream does its own buffering
      new LZFInputStream(fileStream) 
    } else if (fileSystem == null) {
      new BufferedInputStream(fileStream, bufferSize)
    } else { 
      // Hadoop streams do their own buffering
      fileStream 
    }
  }

  def openFileForWriting(uuid: UUID): OutputStream = {
    val fileStream = if (fileSystem != null) {
      fileSystem.create(getPath(uuid))
    } else {
      // Local filesystem
      new FileOutputStream(getPath(uuid).toString)
    }
    
    if (compress) {
      // LZF stream does its own buffering
      new LZFOutputStream(fileStream) 
    } else if (fileSystem == null) {
      new BufferedOutputStream(fileStream, bufferSize)
    } else {
      // Hadoop streams do their own buffering
      fileStream 
    }
  }
}