aboutsummaryrefslogblamecommitdiff
path: root/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
blob: 83906cff123bfc277351c9b4abfb4b4ac3c56e1b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
















                                                                           

                        
                                                 
 
                                        

                                  


                                     
 
                                         
                                             
                                                                                    
                                                              
 
                                                                                              



                                                                                                



                                                                         





                                                                                                

                                                                                             

   
                                          
                                                      
                                                         

                                                       
                                                        


                                                 


                                                      












                                                                    
                                                   


























                                                                               
                                                       































                                                                           


                                                                    










                                                    
   
 
                                        













                                                                                       


                                                                                          


     
















































                                                                                                    

















                                                             
                                                                                   
                       


                                                                   


                                                                    

   


                                   
                                                  










                                                     
                                                                 

                                             
                                                                    
   













                                                                       


















                                                                          









                                                 
 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark

import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Random, Try}

import com.esotericsoftware.kryo.Kryo

import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
  test("Test byteString conversion") {
    val conf = new SparkConf()
    // Simply exercise the API, we don't need a complete conversion test since that's handled in
    // UtilsSuite.scala
    assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1))
    assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1))
    assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1))
    assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1))
  }

  test("Test timeString conversion") {
    val conf = new SparkConf()
    // Simply exercise the API, we don't need a complete conversion test since that's handled in
    // UtilsSuite.scala
    assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1))
    assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
  }

  test("loading from system properties") {
    System.setProperty("spark.test.testProperty", "2")
    System.setProperty("nonspark.test.testProperty", "0")
    val conf = new SparkConf()
    assert(conf.get("spark.test.testProperty") === "2")
    assert(!conf.contains("nonspark.test.testProperty"))
  }

  test("initializing without loading defaults") {
    System.setProperty("spark.test.testProperty", "2")
    val conf = new SparkConf(false)
    assert(!conf.contains("spark.test.testProperty"))
  }

  test("named set methods") {
    val conf = new SparkConf(false)

    conf.setMaster("local[3]")
    conf.setAppName("My app")
    conf.setSparkHome("/path")
    conf.setJars(Seq("a.jar", "b.jar"))
    conf.setExecutorEnv("VAR1", "value1")
    conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3")))

    assert(conf.get("spark.master") === "local[3]")
    assert(conf.get("spark.app.name") === "My app")
    assert(conf.get("spark.home") === "/path")
    assert(conf.get("spark.jars") === "a.jar,b.jar")
    assert(conf.get("spark.executorEnv.VAR1") === "value1")
    assert(conf.get("spark.executorEnv.VAR2") === "value2")
    assert(conf.get("spark.executorEnv.VAR3") === "value3")

    // Test the Java-friendly versions of these too
    conf.setJars(Array("c.jar", "d.jar"))
    conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
    assert(conf.get("spark.jars") === "c.jar,d.jar")
    assert(conf.get("spark.executorEnv.VAR4") === "value4")
    assert(conf.get("spark.executorEnv.VAR5") === "value5")
  }

  test("basic get and set") {
    val conf = new SparkConf(false)
    assert(conf.getAll.toSet === Set())
    conf.set("k1", "v1")
    conf.setAll(Seq(("k2", "v2"), ("k3", "v3")))
    assert(conf.getAll.toSet === Set(("k1", "v1"), ("k2", "v2"), ("k3", "v3")))
    conf.set("k1", "v4")
    conf.setAll(Seq(("k2", "v5"), ("k3", "v6")))
    assert(conf.getAll.toSet === Set(("k1", "v4"), ("k2", "v5"), ("k3", "v6")))
    assert(conf.contains("k1"), "conf did not contain k1")
    assert(!conf.contains("k4"), "conf contained k4")
    assert(conf.get("k1") === "v4")
    intercept[Exception] { conf.get("k4") }
    assert(conf.get("k4", "not found") === "not found")
    assert(conf.getOption("k1") === Some("v4"))
    assert(conf.getOption("k4") === None)
  }

  test("creating SparkContext without master and app name") {
    val conf = new SparkConf(false)
    intercept[SparkException] { sc = new SparkContext(conf) }
  }

  test("creating SparkContext without master") {
    val conf = new SparkConf(false).setAppName("My app")
    intercept[SparkException] { sc = new SparkContext(conf) }
  }

  test("creating SparkContext without app name") {
    val conf = new SparkConf(false).setMaster("local")
    intercept[SparkException] { sc = new SparkContext(conf) }
  }

  test("creating SparkContext with both master and app name") {
    val conf = new SparkConf(false).setMaster("local").setAppName("My app")
    sc = new SparkContext(conf)
    assert(sc.master === "local")
    assert(sc.appName === "My app")
  }

  test("SparkContext property overriding") {
    val conf = new SparkConf(false).setMaster("local").setAppName("My app")
    sc = new SparkContext("local[2]", "My other app", conf)
    assert(sc.master === "local[2]")
    assert(sc.appName === "My other app")
  }

  test("nested property names") {
    // This wasn't supported by some external conf parsing libraries
    System.setProperty("spark.test.a", "a")
    System.setProperty("spark.test.a.b", "a.b")
    System.setProperty("spark.test.a.b.c", "a.b.c")
    val conf = new SparkConf()
    assert(conf.get("spark.test.a") === "a")
    assert(conf.get("spark.test.a.b") === "a.b")
    assert(conf.get("spark.test.a.b.c") === "a.b.c")
    conf.set("spark.test.a.b", "A.B")
    assert(conf.get("spark.test.a") === "a")
    assert(conf.get("spark.test.a.b") === "A.B")
    assert(conf.get("spark.test.a.b.c") === "a.b.c")
  }

  test("Thread safeness - SPARK-5425") {
    val executor = Executors.newSingleThreadScheduledExecutor()
    val sf = executor.scheduleAtFixedRate(new Runnable {
      override def run(): Unit =
        System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
    }, 0, 1, TimeUnit.MILLISECONDS)

    try {
      val t0 = System.currentTimeMillis()
      while ((System.currentTimeMillis() - t0) < 1000) {
        val conf = Try(new SparkConf(loadDefaults = true))
        assert(conf.isSuccess === true)
      }
    } finally {
      executor.shutdownNow()
      val sysProps = System.getProperties
      for (key <- sysProps.stringPropertyNames().asScala if key.startsWith("spark.5425."))
        sysProps.remove(key)
    }
  }

  test("register kryo classes through registerKryoClasses") {
    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

    conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
    assert(conf.get("spark.kryo.classesToRegister") ===
      classOf[Class1].getName + "," + classOf[Class2].getName)

    conf.registerKryoClasses(Array(classOf[Class3]))
    assert(conf.get("spark.kryo.classesToRegister") ===
      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)

    conf.registerKryoClasses(Array(classOf[Class2]))
    assert(conf.get("spark.kryo.classesToRegister") ===
      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)

    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
    // blow up.
    val serializer = new KryoSerializer(conf)
    serializer.newInstance().serialize(new Class1())
    serializer.newInstance().serialize(new Class2())
    serializer.newInstance().serialize(new Class3())
  }

  test("register kryo classes through registerKryoClasses and custom registrator") {
    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

    conf.registerKryoClasses(Array(classOf[Class1]))
    assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)

    conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)

    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
    // blow up.
    val serializer = new KryoSerializer(conf)
    serializer.newInstance().serialize(new Class1())
    serializer.newInstance().serialize(new Class2())
  }

  test("register kryo classes through conf") {
    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
    conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
    conf.set("spark.serializer", classOf[KryoSerializer].getName)

    // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
    // blow up.
    val serializer = new KryoSerializer(conf)
    serializer.newInstance().serialize(new StringBuffer())
  }

  test("deprecated configs") {
    val conf = new SparkConf()
    val newName = "spark.history.fs.update.interval"

    assert(!conf.contains(newName))

    conf.set("spark.history.updateInterval", "1")
    assert(conf.get(newName) === "1")

    conf.set("spark.history.fs.updateInterval", "2")
    assert(conf.get(newName) === "2")

    conf.set("spark.history.fs.update.interval.seconds", "3")
    assert(conf.get(newName) === "3")

    conf.set(newName, "4")
    assert(conf.get(newName) === "4")

    val count = conf.getAll.count { case (k, v) => k.startsWith("spark.history.") }
    assert(count === 4)

    conf.set("spark.yarn.applicationMaster.waitTries", "42")
    assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)

    conf.set("spark.kryoserializer.buffer.mb", "1.1")
    assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
  }

  test("akka deprecated configs") {
    val conf = new SparkConf()

    assert(!conf.contains("spark.rpc.numRetries"))
    assert(!conf.contains("spark.rpc.retry.wait"))
    assert(!conf.contains("spark.rpc.askTimeout"))
    assert(!conf.contains("spark.rpc.lookupTimeout"))

    conf.set("spark.akka.num.retries", "1")
    assert(RpcUtils.numRetries(conf) === 1)

    conf.set("spark.akka.retry.wait", "2")
    assert(RpcUtils.retryWaitMs(conf) === 2L)

    conf.set("spark.akka.askTimeout", "3")
    assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))

    conf.set("spark.akka.lookupTimeout", "4")
    assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
  }

  test("SPARK-13727") {
    val conf = new SparkConf()
    // set the conf in the deprecated way
    conf.set("spark.io.compression.lz4.block.size", "12345")
    // get the conf in the recommended way
    assert(conf.get("spark.io.compression.lz4.blockSize") === "12345")
    // we can still get the conf in the deprecated way
    assert(conf.get("spark.io.compression.lz4.block.size") === "12345")
    // the contains() also works as expected
    assert(conf.contains("spark.io.compression.lz4.block.size"))
    assert(conf.contains("spark.io.compression.lz4.blockSize"))
    assert(conf.contains("spark.io.unknown") === false)
  }

  val serializers = Map(
    "java" -> new JavaSerializer(new SparkConf()),
    "kryo" -> new KryoSerializer(new SparkConf()))

  serializers.foreach { case (name, ser) =>
    test(s"SPARK-17240: SparkConf should be serializable ($name)") {
      val conf = new SparkConf()
      conf.set(DRIVER_CLASS_PATH, "${" + DRIVER_JAVA_OPTIONS.key + "}")
      conf.set(DRIVER_JAVA_OPTIONS, "test")

      val serializer = ser.newInstance()
      val bytes = serializer.serialize(conf)
      val deser = serializer.deserialize[SparkConf](bytes)

      assert(conf.get(DRIVER_CLASS_PATH) === deser.get(DRIVER_CLASS_PATH))
    }
  }

}

class Class1 {}
class Class2 {}
class Class3 {}

class CustomRegistrator extends KryoRegistrator {
  def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Class2])
  }
}