1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
package spark
import java.io._
import java.net._
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
@serializable
class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
def value = value_
DfsBroadcast.synchronized {
DfsBroadcast.values.put(uuid, value_)
}
if (!isLocal) {
sendBroadcast
}
def sendBroadcast (): Unit = {
val out = new ObjectOutputStream (DfsBroadcast.openFileForWriting(uuid))
out.writeObject (value_)
out.close
}
// Called by JVM when deserializing an object
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject
DfsBroadcast.synchronized {
val cachedVal = DfsBroadcast.values.get(uuid)
if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T]
} else {
logInfo( "Started reading Broadcasted variable " + uuid)
val start = System.nanoTime
val fileIn = new ObjectInputStream(DfsBroadcast.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T]
DfsBroadcast.values.put(uuid, value_)
fileIn.close
val time = (System.nanoTime - start) / 1e9
logInfo( "Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
}
class DfsBroadcastFactory
extends BroadcastFactory {
def initialize (isMaster: Boolean) = DfsBroadcast.initialize
def newBroadcast[T] (value_ : T, isLocal: Boolean) =
new DfsBroadcast[T] (value_, isLocal)
}
private object DfsBroadcast
extends Logging {
val values = Cache.newKeySpace()
private var initialized = false
private var fileSystem: FileSystem = null
private var workDir: String = null
private var compress: Boolean = false
private var bufferSize: Int = 65536
def initialize (): Unit = {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val dfs = System.getProperty("spark.dfs", "file:///")
if (!dfs.startsWith("file://")) {
val conf = new Configuration()
conf.setInt("io.file.buffer.size", bufferSize)
val rep = System.getProperty("spark.dfs.replication", "3").toInt
conf.setInt("dfs.replication", rep)
fileSystem = FileSystem.get(new URI(dfs), conf)
}
workDir = System.getProperty("spark.dfs.workdir", "/tmp")
compress = System.getProperty("spark.compress", "false").toBoolean
initialized = true
}
}
}
private def getPath(uuid: UUID) = new Path(workDir + "/broadcast-" + uuid)
def openFileForReading(uuid: UUID): InputStream = {
val fileStream = if (fileSystem != null) {
fileSystem.open(getPath(uuid))
} else {
// Local filesystem
new FileInputStream(getPath(uuid).toString)
}
if (compress) {
// LZF stream does its own buffering
new LZFInputStream(fileStream)
} else if (fileSystem == null) {
new BufferedInputStream(fileStream, bufferSize)
} else {
// Hadoop streams do their own buffering
fileStream
}
}
def openFileForWriting(uuid: UUID): OutputStream = {
val fileStream = if (fileSystem != null) {
fileSystem.create(getPath(uuid))
} else {
// Local filesystem
new FileOutputStream(getPath(uuid).toString)
}
if (compress) {
// LZF stream does its own buffering
new LZFOutputStream(fileStream)
} else if (fileSystem == null) {
new BufferedOutputStream(fileStream, bufferSize)
} else {
// Hadoop streams do their own buffering
fileStream
}
}
}
|