--- layout: global title: Spark Configuration --- Spark provides three locations to configure the system: * [Spark properties](#spark-properties) control most application parameters and can be set by passing a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java system properties. * [Environment variables](#environment-variables) can be used to set per-machine settings, such as the IP address, through the `conf/spark-env.sh` script on each node. * [Logging](#configuring-logging) can be configured through `log4j.properties`. # Spark Properties Spark properties control most application settings and are configured separately for each application. The preferred way to set them is by passing a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) class to your SparkContext constructor. Alternatively, Spark will also load them from Java system properties, for compatibility with old versions of Spark. SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could initialize an application as follows: {% highlight scala %} val conf = new SparkConf() .setMaster("local") .setAppName("My application") .set("spark.executor.memory", "1g") val sc = new SparkContext(conf) {% endhighlight %} Most of the properties control internal settings that have reasonable default values. However, there are at least five properties that you will commonly want to control:
Property NameDefaultMeaning
spark.executor.memory 512m Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g).
spark.serializer org.apache.spark.serializer.
JavaSerializer
Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serializer.
spark.kryo.registrator (none) If you use Kryo serialization, set this class to register your custom classes with Kryo. It should be set to a class that extends KryoRegistrator. See the tuning guide for more details.
spark.local.dir /tmp Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) envrionment variables set by the cluster manager.
spark.cores.max (not set) When running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.
Apart from these, the following properties are also available, and may be useful in some situations:
Property NameDefaultMeaning
spark.default.parallelism
  • Mesos fine grained mode: 8
  • Local mode: core number of the local machine
  • Others: total core number of all executor nodes or 2, whichever is larger
Default number of tasks to use across the cluster for distributed shuffle operations (groupByKey, reduceByKey, etc) when not set by user.
spark.storage.memoryFraction 0.6 Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase it if you configure your own old generation size.
spark.shuffle.memoryFraction 0.3 Fraction of Java heap to use for aggregation and cogroups during shuffles, if spark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.
spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir") Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by spark.tachyonStore.url. It can also be a comma-separated list of multiple directories on Tachyon file system.
spark.tachyonStore.url tachyon://localhost:19998 The URL of the underlying Tachyon file system in the TachyonStore.
spark.mesos.coarse false If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job.
spark.ui.port 4040 Port for your application's dashboard, which shows memory and workload data
spark.ui.retainedStages 1000 How many stages the Spark UI remembers before garbage collecting.
spark.ui.filters None Comma separated list of filter class names to apply to the Spark web ui. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a java system property of spark.<class name of filter>.params='param1=value1,param2=value2' (e.g.-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing')
spark.ui.acls.enable false Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has access permissions to view the web ui. See spark.ui.view.acls for more details. Also note this requires the user to be known, if the user comes across as null no checks are done. Filters can be used to authenticate and set the user.
spark.ui.view.acls Empty Comma separated list of users that have view access to the spark web ui. By default only the user that started the Spark job has view access.
spark.ui.killEnabled true Allows stages and corresponding jobs to be killed from the web ui.
spark.shuffle.compress true Whether to compress map output files. Generally a good idea.
spark.shuffle.spill.compress true Whether to compress data spilled during shuffles.
spark.broadcast.compress true Whether to compress broadcast variables before sending them. Generally a good idea.
spark.rdd.compress false Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER). Can save substantial space at the cost of some extra CPU time.
spark.io.compression.codec org.apache.spark.io.
LZFCompressionCodec
The codec used to compress internal data such as RDD partitions and shuffle outputs. By default, Spark provides two codecs: org.apache.spark.io.LZFCompressionCodec and org.apache.spark.io.SnappyCompressionCodec.
spark.io.compression.snappy.block.size 32768 Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
spark.scheduler.mode FIFO The scheduling mode between jobs submitted to the same SparkContext. Can be set to FAIR to use fair sharing instead of queueing jobs one after another. Useful for multi-user services.
spark.scheduler.revive.interval 1000 The interval length for the scheduler to revive the worker resource offers to run tasks. (in milliseconds)
spark.reducer.maxMbInFlight 48 Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory.
spark.closure.serializer org.apache.spark.serializer.
JavaSerializer
Serializer class to use for closures. Generally Java is fine unless your distributed functions (e.g. map functions) reference large objects in the driver program.
spark.kryo.referenceTracking true Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case.
spark.kryoserializer.buffer.mb 2 Maximum object size to allow within Kryo (the library needs to create a buffer at least as large as the largest single object you'll serialize). Increase this if you get a "buffer limit exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker.
spark.serializer.objectStreamReset 10000 When serializing using org.apache.spark.serializer.JavaSerializer, the serializer caches objects to prevent writing redundant data, however that stops garbage collection of those objects. By calling 'reset' you flush that info from the serializer, and allow old objects to be collected. To turn off this periodic reset set it to a value of <= 0. By default it will reset the serializer every 10,000 objects.
spark.broadcast.factory org.apache.spark.broadcast.
HttpBroadcastFactory
Which broadcast implementation to use.
spark.locality.wait 3000 Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
spark.worker.timeout 60 Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats.
spark.worker.cleanup.enabled true Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently.
spark.worker.cleanup.interval 1800 (30 minutes) Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine.
spark.worker.cleanup.appDataTtl 7 * 24 * 3600 (7 days) The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently.
spark.akka.frameSize 10 Maximum message size to allow in "control plane" communication (for serialized tasks and task results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. using collect() on a large dataset).
spark.akka.threads 4 Number of actor threads to use for communication. Can be useful to increase on large clusters when the driver has a lot of CPU cores.
spark.akka.timeout 100 Communication timeout between Spark nodes, in seconds.
spark.akka.heartbeat.pauses 600 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of `spark.akka.heartbeat.interval` and `spark.akka.failure-detector.threshold` if you need to.
spark.akka.failure-detector.threshold 300.0 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka's `akka.remote.transport-failure-detector.threshold`. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.heartbeat.interval` if you need to.
spark.akka.heartbeat.interval 1000 This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.
spark.driver.host (local hostname) Hostname or IP address for the driver to listen on.
spark.driver.port (random) Port for the driver to listen on.
spark.cleaner.ttl (infinite) Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
spark.streaming.blockInterval 200 Duration (milliseconds) of how long to batch new objects coming from network receivers used in Spark Streaming.
spark.streaming.unpersist true Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from Spark's memory. The raw input data received by Spark Streaming is also automatically cleared. Setting this to false will allow the raw data and persisted RDDs to be accessible outside the streaming application as they will not be cleared automatically. But it comes at the cost of higher memory usage in Spark.
spark.task.maxFailures 4 Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
spark.broadcast.blockSize 4096 Size of each piece of a block in kilobytes for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, BlockManager might take a performance hit.
spark.shuffle.consolidateFiles false If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.
spark.shuffle.file.buffer.kb 100 Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
spark.shuffle.spill true If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling threshold is specified by spark.shuffle.memoryFraction.
spark.speculation false If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark.speculation.interval 100 How often Spark will check for tasks to speculate, in milliseconds.
spark.speculation.quantile 0.75 Percentage of tasks which must be complete before speculation is enabled for a particular stage.
spark.speculation.multiplier 1.5 How many times slower a task is than the median to be considered for speculation.
spark.logConf false Whether to log the supplied SparkConf as INFO at start of spark context.
spark.eventLog.enabled false Whether to log spark events, useful for reconstructing the Web UI after the application has finished.
spark.eventLog.compress false Whether to compress logged events, if spark.eventLog.enabled is true.
spark.eventLog.dir file:///tmp/spark-events Base directory in which spark events are logged, if spark.eventLog.enabled is true. Within this base directory, Spark creates a sub-directory for each application, and logs the events specific to the application in this directory.
spark.deploy.spreadOut true Whether the standalone cluster manager should spread applications out across nodes or try to consolidate them onto as few nodes as possible. Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads.
Note: this setting needs to be configured in the standalone cluster master, not in individual applications; you can set it through SPARK_MASTER_OPTS in spark-env.sh.
spark.deploy.defaultCores (infinite) Default number of cores to give to applications in Spark's standalone mode if they don't set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default.
Note: this setting needs to be configured in the standalone cluster master, not in individual applications; you can set it through SPARK_MASTER_OPTS in spark-env.sh.
spark.files.overwrite false Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
spark.files.fetchTimeout false Communication timeout to use when fetching files added through SparkContext.addFile() from the driver.
spark.files.userClassPathFirst false (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading classes in Executors. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature.
spark.authenticate false Whether spark authenticates its internal connections. See spark.authenticate.secret if not running on Yarn.
spark.authenticate.secret None Set the secret key used for Spark to authenticate between components. This needs to be set if not running on Yarn and authentication is enabled.
spark.core.connection.auth.wait.timeout 30 Number of seconds for the connection to wait for authentication to occur before timing out and giving up.
spark.task.cpus 1 Number of cores to allocate for each task.
spark.executor.extraJavaOptions (none) A string of extra JVM options to pass to executors. For instance, GC settings or other logging. Note that it is illegal to set Spark properties or heap size settings with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Heap size settings can be set with spark.executor.memory.
spark.executor.extraClassPath (none) Extra classpath entries to append to the classpath of executors. This exists primarily for backwards-compatibility with older versions of Spark. Users typically should not need to set this option.
spark.executor.extraLibraryPath (none) Set a special library path to use when launching executor JVM's.
## Viewing Spark Properties The application web UI at `http://:4040` lists Spark properties in the "Environment" tab. This is a useful place to check to make sure that your properties have been set correctly. # Environment Variables Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh` script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). In Standalone and Mesos modes, this file can give machine specific information such as hostnames. It is also sourced when running local Spark applications or submission scripts. Note that `conf/spark-env.sh` does not exist by default when Spark is installed. However, you can copy `conf/spark-env.sh.template` to create it. Make sure you make the copy executable. The following variables can be set in `spark-env.sh`: * `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`) * `PYSPARK_PYTHON`, the Python binary to use for PySpark * `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to. * `SPARK_PUBLIC_DNS`, the hostname your Spark program will advertise to other machines. * Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores to use on each machine and maximum memory. Since `spark-env.sh` is a shell script, some of these can be set programmatically -- for example, you might compute `SPARK_LOCAL_IP` by looking up the IP of a specific network interface. # Configuring Logging Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there.