aboutsummaryrefslogtreecommitdiff
path: root/docs/configuration.md
blob: b125eeb03c86c21f57d9a06882a6feb93330d93a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
---
layout: global
title: Spark Configuration
---

Spark provides three main locations to configure the system:

* [Environment variables](#environment-variables) for launching Spark workers, which can
  be set either in your driver program or in the `conf/spark-env.sh` script.
* [Java system properties](#system-properties), which control internal configuration parameters and can be set either
  programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through the
  `SPARK_JAVA_OPTS` environment variable in `spark-env.sh`.
* [Logging configuration](#configuring-logging), which is done through `log4j.properties`.


# Environment Variables

Spark determines how to initialize the JVM on worker nodes, or even on the local node when you run `spark-shell`,
by running the `conf/spark-env.sh` script in the directory where it is installed. This script does not exist by default
in the Git repository, but but you can create it by copying `conf/spark-env.sh.template`. Make sure that you make
the copy executable.

Inside `spark-env.sh`, you *must* set at least the following two variables:

* `SCALA_HOME`, to point to your Scala installation, or `SCALA_LIBRARY_PATH` to point to the directory for Scala
  library JARs (if you install Scala as a Debian or RPM package, there is no `SCALA_HOME`, but these libraries
  are in a separate path, typically /usr/share/java; look for `scala-library.jar`).
* `MESOS_NATIVE_LIBRARY`, if you are [running on a Mesos cluster](running-on-mesos.html).

In addition, there are four other variables that control execution. These should be set *in the environment that
launches the job's driver program* instead of `spark-env.sh`, because they will be automatically propagated to
workers. Setting these per-job instead of in `spark-env.sh` ensures that different jobs can have different settings
for these variables.

* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
   JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
   the `spark.executor.memory` system property, so we recommend using that in new code.

Beware that if you do set these variables in `spark-env.sh`, they will override the values set by user programs,
which is undesirable; if you prefer, you can choose to have `spark-env.sh` set them only if the user program
hasn't, as follows:

{% highlight bash %}
if [ -z "$SPARK_JAVA_OPTS" ] ; then
  SPARK_JAVA_OPTS="-verbose:gc"
fi
{% endhighlight %}

# System Properties

To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows:

{% highlight scala %}
System.setProperty("spark.cores.max", "5")
val sc = new SparkContext(...)
{% endhighlight %}

Most of the configurable system properties control internal settings that have reasonable default values. However,
there are at least five properties that you will commonly want to control:

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
  <td>spark.executor.memory</td>
  <td>512m</td>
  <td>
    Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. `512m`, `2g`).
  </td>
</tr>
<tr>
  <td>spark.serializer</td>
  <td>spark.JavaSerializer</td>
  <td>
    Class to use for serializing objects that will be sent over the network or need to be cached
    in serialized form. The default of Java serialization works with any Serializable Java object but is
    quite slow, so we recommend <a href="tuning.html">using <code>spark.KryoSerializer</code>
    and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
    <a href="api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
  </td>
</tr>
<tr>
  <td>spark.kryo.registrator</td>
  <td>(none)</td>
  <td>
    If you use Kryo serialization, set this class to register your custom classes with Kryo.
    You need to set it to a class that extends
    <a href="api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
    See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
  </td>
</tr>
<tr>
  <td>spark.local.dir</td>
  <td>/tmp</td>
  <td>
    Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored
    on disk. This should be on a fast, local disk in your system. It can also be a comma-separated
    list of multiple directories.
  </td>
</tr>
<tr>
  <td>spark.cores.max</td>
  <td>(infinite)</td>
  <td>
    When running on a <a href="spark-standalone.html">standalone deploy cluster</a> or a
    <a href="running-on-mesos.html#mesos-run-modes">Mesos cluster in "coarse-grained"
    sharing mode</a>, how many CPU cores to request at most. The default will use all available cores.
  </td>
</tr>
</table>


Apart from these, the following properties are also available, and may be useful in some situations:

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
  <td>spark.mesos.coarse</td>
  <td>false</td>
  <td>
    If set to "true", runs over Mesos clusters in
    <a href="running-on-mesos.html#mesos-run-modes">"coarse-grained" sharing mode</a>,
    where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task.
    This gives lower-latency scheduling for short queries, but leaves resources in use for the whole
    duration of the Spark job.
  </td>
</tr>
<tr>
  <td>spark.default.parallelism</td>
  <td>8</td>
  <td>
    Default number of tasks to use for distributed shuffle operations (<code>groupByKey</code>,
    <code>reduceByKey</code>, etc) when not set by user.
  </td>
</tr>
<tr>
  <td>spark.storage.memoryFraction</td>
  <td>0.66</td>
  <td>
    Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
    generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
    it if you configure your own old generation size.
  </td>
</tr>
<tr>
  <td>spark.ui.port</td>
  <td>3030</td>
  <td>
    Port for your application's dashboard, which shows memory and workload data
  </td>
</tr>
<tr>
  <td>spark.ui.retained_stages</td>
  <td>1000</td>
  <td>
    How many stages the Spark UI remembers before garbage collecting.
  </td>
</tr>
<tr>
  <td>spark.shuffle.compress</td>
  <td>true</td>
  <td>
    Whether to compress map output files. Generally a good idea.
  </td>
</tr>
<tr>
  <td>spark.broadcast.compress</td>
  <td>true</td>
  <td>
    Whether to compress broadcast variables before sending them. Generally a good idea.
  </td>
</tr>
<tr>
  <td>spark.rdd.compress</td>
  <td>false</td>
  <td>
    Whether to compress serialized RDD partitions (e.g. for <code>StorageLevel.MEMORY_ONLY_SER</code>).
    Can save substantial space at the cost of some extra CPU time.
  </td>
</tr>
<tr>
  <td>spark.io.compression.codec</td>
  <td>spark.io.SnappyCompressionCodec</td>
  <td>
    The compression codec class to use for various compressions. By default, Spark provides two
    codecs: <code>spark.io.LZFCompressionCodec</code> and <code>spark.io.SnappyCompressionCodec</code>.
  </td>
</tr>
<tr>
  <td>spark.io.compression.snappy.block.size</td>
  <td>32768</td>
  <td>
    Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
  </td>
</tr>
<tr>
  <td>spark.reducer.maxMbInFlight</td>
  <td>48</td>
  <td>
    Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since
    each output requires us to create a buffer to receive it, this represents a fixed memory overhead
    per reduce task, so keep it small unless you have a large amount of memory.
  </td>
</tr>
<tr>
  <td>spark.closure.serializer</td>
  <td>spark.JavaSerializer</td>
  <td>
    Serializer class to use for closures. Generally Java is fine unless your distributed functions
    (e.g. map functions) reference large objects in the driver program.
  </td>
</tr>
<tr>
  <td>spark.kryo.referenceTracking</td>
  <td>true</td>
  <td>
    Whether to track references to the same object when serializing data with Kryo, which is
    necessary if your object graphs have loops and useful for efficiency if they contain multiple
    copies of the same object. Can be disabled to improve performance if you know this is not the
    case.
  </td>
</tr>
<tr>
  <td>spark.kryoserializer.buffer.mb</td>
  <td>2</td>
  <td>
    Maximum object size to allow within Kryo (the library needs to create a buffer at least as
    large as the largest single object you'll serialize). Increase this if you get a "buffer limit
    exceeded" exception inside Kryo. Note that there will be one buffer <i>per core</i> on each worker.
  </td>
</tr>
<tr>
  <td>spark.broadcast.factory</td>
  <td>spark.broadcast.HttpBroadcastFactory</td>
  <td>
    Which broadcast implementation to use.
  </td>
</tr>
<tr>
  <td>spark.locality.wait</td>
  <td>3000</td>
  <td>
    Number of milliseconds to wait to launch a data-local task before giving up and launching it
    on a less-local node. The same wait will be used to step through multiple locality levels
    (process-local, node-local, rack-local and then any). It is also possible to customize the
    waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
    You should increase this setting if your tasks are long and see poor locality, but the
    default usually works well.
  </td>
</tr>
<tr>
  <td>spark.locality.wait.process</td>
  <td>spark.locality.wait</td>
  <td>
    Customize the locality wait for process locality. This affects tasks that attempt to access
    cached data in a particular executor process.
  </td>
</tr>
<tr>
  <td>spark.locality.wait.node</td>
  <td>spark.locality.wait</td>
  <td>
    Customize the locality wait for node locality. For example, you can set this to 0 to skip
    node locality and search immediately for rack locality (if your cluster has rack information).
  </td>
</tr>
<tr>
  <td>spark.locality.wait.rack</td>
  <td>spark.locality.wait</td>
  <td>
    Customize the locality wait for rack locality.
  </td>
</tr>
<tr>
  <td>spark.worker.timeout</td>
  <td>60</td>
  <td>
    Number of seconds after which the standalone deploy master considers a worker lost if it
    receives no heartbeats.
  </td>
</tr>
<tr>
  <td>spark.akka.frameSize</td>
  <td>10</td>
  <td>
    Maximum message size to allow in "control plane" communication (for serialized tasks and task
    results), in MB. Increase this if your tasks need to send back large results to the driver
    (e.g. using <code>collect()</code> on a large dataset).
  </td>
</tr>
<tr>
  <td>spark.akka.threads</td>
  <td>4</td>
  <td>
    Number of actor threads to use for communication. Can be useful to increase on large clusters
    when the driver has a lot of CPU cores.
  </td>
</tr>
<tr>
  <td>spark.akka.timeout</td>
  <td>20</td>
  <td>
    Communication timeout between Spark nodes, in seconds.
  </td>
</tr>
<tr>
  <td>spark.driver.host</td>
  <td>(local hostname)</td>
  <td>
    Hostname or IP address for the driver to listen on.
  </td>
</tr>
<tr>
  <td>spark.driver.port</td>
  <td>(random)</td>
  <td>
    Port for the driver to listen on.
  </td>
</tr>
<tr>
  <td>spark.cleaner.ttl</td>
  <td>(disable)</td>
  <td>
    Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.).
    Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is
    useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming
    applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
  </td>
</tr>
<tr>
  <td>spark.streaming.blockInterval</td>
  <td>200</td>
  <td>
    Duration (milliseconds) of how long to batch new objects coming from network receivers.
  </td>
</tr>
<tr>
  <td>spark.task.maxFailures</td>
  <td>4</td>
  <td>
    Number of individual task failures before giving up on the job.
    Should be greater than or equal to 1. Number of allowed retries = this value - 1.
  </td>
</tr>

</table>

# Configuring Logging

Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties`
file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there.