aboutsummaryrefslogblamecommitdiff
path: root/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
blob: 456158d41b93f5d8a67ac5fb11a5170e8332ecb5 (plain) (tree)


















                                                                           
                
                   
                                        
                                    
                                                      
 

                                  
 
                                                
                                      
                                                   
                                       
                               
                                              
                                   
                             
                                            
 
                                                                   
                                        

                                   
                                                            
                                                                      
 
                                                                                                   


                                  
          
                                                            





                                    


                                                                           
                                   


                                                                     
                                                                                    



                                              
                                  

                                                                 

                                         
                                                                     
                                         

                                                                                                   
                                     

       
                                         

                                                                                         
                                                   

                                                                                                 

                                      
                                          
                                                                      
                                           

                                                                                                   

       
                                                                                     
                                                                             
                                      

                                                         











                                                                                   
 
                                                                     
                                                                                          
                                                     

                                                                                            

                                                                         
                                                                                           
                                              






                                                    


     
                                                         

                                                                           









                                                                                      
                                                               
                                   
                                                                                

                                     
                                                               
                                   
                                                                                



                                      


                                      

                                                      

   
                                                                
                                                          
 
                                                              
                                   
                                                                            

                                     




                                                                                   
 





                                                                                       

   











                                                              
                                           
                                                          
 
                                                              
                                   
                                                                            









                                                  
                                                  

                                                          
                                                                          
                                   
                                                                                       








                                                                          
                                                                                       







                                                                    


                                                                                        





                                      

                                                                    


                                                                               
                                   






                                                                                        
                                           























































                                                                                        









































                                                                                     






















                                                                                                  


                                                                                             








                                        








                                                                         
                                      
                             


     









                                                                                                    

                                                                    





                                                             

                                 











                                                                   
                                                                    
                                                                     
                                                                          
         

                                 








                                                            
































                                                                                                   































































































                                                                                               














                                                                                              

                                                                                          



                                                                                 





                                                          
     
 
                                                                        
                              
                                                                                                 
       







                                             



                                                                                   











                                                                                    
 











                                                                     
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.deploy.history

import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{ZipInputStream, ZipOutputStream}

import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, spy, verify}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.security.GroupMappingServiceProvider
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}

class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {

  private var testDir: File = null

  before {
    testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
  }

  after {
    Utils.deleteRecursively(testDir)
  }

  /** Create a fake log file using the new log format used in Spark 1.3+ */
  private def newLogFile(
      appId: String,
      appAttemptId: Option[String],
      inProgress: Boolean,
      codec: Option[String] = None): File = {
    val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
    val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
    val logPath = new URI(logUri).getPath + ip
    new File(logPath)
  }

  test("Parse application logs") {
    val clock = new ManualClock(12345678)
    val provider = new FsHistoryProvider(createTestConf(), clock)

    // Write a new-style application log.
    val newAppComplete = newLogFile("new1", None, inProgress = false)
    writeFile(newAppComplete, true, None,
      SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
        None),
      SparkListenerApplicationEnd(5L)
      )

    // Write a new-style application log.
    val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
      Some("lzf"))
    writeFile(newAppCompressedComplete, true, None,
      SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
        1L, "test", None),
      SparkListenerApplicationEnd(4L))

    // Write an unfinished app, new-style.
    val newAppIncomplete = newLogFile("new2", None, inProgress = true)
    writeFile(newAppIncomplete, true, None,
      SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
        None)
      )

    // Force a reload of data from the log directory, and check that logs are loaded.
    // Take the opportunity to check that the offset checks work as expected.
    updateAndCheck(provider) { list =>
      list.size should be (3)
      list.count(_.attempts.head.completed) should be (2)

      def makeAppInfo(
          id: String,
          name: String,
          start: Long,
          end: Long,
          lastMod: Long,
          user: String,
          completed: Boolean): ApplicationHistoryInfo = {
        ApplicationHistoryInfo(id, name,
          List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
      }

      // For completed files, lastUpdated would be lastModified time.
      list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
        newAppComplete.lastModified(), "test", true))
      list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
        1L, 4L, newAppCompressedComplete.lastModified(), "test", true))

      // For Inprogress files, lastUpdated would be current loading time.
      list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
        clock.getTimeMillis(), "test", false))

      // Make sure the UI can be rendered.
      list.foreach { case info =>
        val appUi = provider.getAppUI(info.id, None)
        appUi should not be null
        appUi should not be None
      }
    }
  }

  test("SPARK-3697: ignore files that cannot be read.") {
    // setReadable(...) does not work on Windows. Please refer JDK-6728842.
    assume(!Utils.isWindows)

    class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
      var mergeApplicationListingCall = 0
      override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
        super.mergeApplicationListing(fileStatus)
        mergeApplicationListingCall += 1
      }
    }
    val provider = new TestFsHistoryProvider

    val logFile1 = newLogFile("new1", None, inProgress = false)
    writeFile(logFile1, true, None,
      SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
      SparkListenerApplicationEnd(2L)
      )
    val logFile2 = newLogFile("new2", None, inProgress = false)
    writeFile(logFile2, true, None,
      SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
      SparkListenerApplicationEnd(2L)
      )
    logFile2.setReadable(false, false)

    updateAndCheck(provider) { list =>
      list.size should be (1)
    }

    provider.mergeApplicationListingCall should be (1)
  }

  test("history file is renamed from inprogress to completed") {
    val provider = new FsHistoryProvider(createTestConf())

    val logFile1 = newLogFile("app1", None, inProgress = true)
    writeFile(logFile1, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
      SparkListenerApplicationEnd(2L)
    )
    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
        endWith(EventLoggingListener.IN_PROGRESS)
    }

    logFile1.renameTo(newLogFile("app1", None, inProgress = false))
    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
        endWith(EventLoggingListener.IN_PROGRESS)
    }
  }

  test("Parse logs that application is not started") {
    val provider = new FsHistoryProvider((createTestConf()))

    val logFile1 = newLogFile("app1", None, inProgress = true)
    writeFile(logFile1, true, None,
      SparkListenerLogStart("1.4")
    )
    updateAndCheck(provider) { list =>
      list.size should be (0)
    }
  }

  test("SPARK-5582: empty log directory") {
    val provider = new FsHistoryProvider(createTestConf())

    val logFile1 = newLogFile("app1", None, inProgress = true)
    writeFile(logFile1, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
      SparkListenerApplicationEnd(2L))

    val oldLog = new File(testDir, "old1")
    oldLog.mkdir()

    provider.checkForLogs()
    val appListAfterRename = provider.getListing()
    appListAfterRename.size should be (1)
  }

  test("apps with multiple attempts with order") {
    val provider = new FsHistoryProvider(createTestConf())

    val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
    writeFile(attempt1, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
      )

    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.size should be (1)
    }

    val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
    writeFile(attempt2, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
      )

    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.size should be (2)
      list.head.attempts.head.attemptId should be (Some("attempt2"))
    }

    val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
    writeFile(attempt3, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
      SparkListenerApplicationEnd(4L)
      )

    updateAndCheck(provider) { list =>
      list should not be (null)
      list.size should be (1)
      list.head.attempts.size should be (3)
      list.head.attempts.head.attemptId should be (Some("attempt3"))
    }

    val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
    writeFile(attempt1, true, None,
      SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
      SparkListenerApplicationEnd(6L)
      )

    updateAndCheck(provider) { list =>
      list.size should be (2)
      list.head.attempts.size should be (1)
      list.last.attempts.size should be (3)
      list.head.attempts.head.attemptId should be (Some("attempt1"))

      list.foreach { case app =>
        app.attempts.foreach { attempt =>
          val appUi = provider.getAppUI(app.id, attempt.attemptId)
          appUi should not be null
        }
      }

    }
  }

  test("log cleaner") {
    val maxAge = TimeUnit.SECONDS.toMillis(10)
    val clock = new ManualClock(maxAge / 2)
    val provider = new FsHistoryProvider(
      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)

    val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
    writeFile(log1, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
      SparkListenerApplicationEnd(2L)
      )
    log1.setLastModified(0L)

    val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
    writeFile(log2, true, None,
      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
      SparkListenerApplicationEnd(4L)
      )
    log2.setLastModified(clock.getTimeMillis())

    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.size should be (2)
    }

    // Move the clock forward so log1 exceeds the max age.
    clock.advance(maxAge)

    updateAndCheck(provider) { list =>
      list.size should be (1)
      list.head.attempts.size should be (1)
      list.head.attempts.head.attemptId should be (Some("attempt2"))
    }
    assert(!log1.exists())

    // Do the same for the other log.
    clock.advance(maxAge)

    updateAndCheck(provider) { list =>
      list.size should be (0)
    }
    assert(!log2.exists())
  }

  test("log cleaner for inProgress files") {
    val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
    val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
    val maxAge = TimeUnit.SECONDS.toMillis(40)
    val clock = new ManualClock(0)
    val provider = new FsHistoryProvider(
      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)

    val log1 = newLogFile("inProgressApp1", None, inProgress = true)
    writeFile(log1, true, None,
      SparkListenerApplicationStart(
        "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
    )

    clock.setTime(firstFileModifiedTime)
    provider.checkForLogs()

    val log2 = newLogFile("inProgressApp2", None, inProgress = true)
    writeFile(log2, true, None,
      SparkListenerApplicationStart(
        "inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2"))
    )

    clock.setTime(secondFileModifiedTime)
    provider.checkForLogs()

    // This should not trigger any cleanup
    updateAndCheck(provider)(list => list.size should be(2))

    // Should trigger cleanup for first file but not second one
    clock.setTime(firstFileModifiedTime + maxAge + 1)
    updateAndCheck(provider)(list => list.size should be(1))
    assert(!log1.exists())
    assert(log2.exists())

    // Should cleanup the second file as well.
    clock.setTime(secondFileModifiedTime + maxAge + 1)
    updateAndCheck(provider)(list => list.size should be(0))
    assert(!log1.exists())
    assert(!log2.exists())
  }

  test("Event log copy") {
    val provider = new FsHistoryProvider(createTestConf())
    val logs = (1 to 2).map { i =>
      val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
      writeFile(log, true, None,
        SparkListenerApplicationStart(
          "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
        SparkListenerApplicationEnd(5001 * i)
      )
      log
    }
    provider.checkForLogs()

    (1 to 2).foreach { i =>
      val underlyingStream = new ByteArrayOutputStream()
      val outputStream = new ZipOutputStream(underlyingStream)
      provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
      outputStream.close()
      val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
      var totalEntries = 0
      var entry = inputStream.getNextEntry
      entry should not be null
      while (entry != null) {
        val actual = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8)
        val expected =
          Files.toString(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8)
        actual should be (expected)
        totalEntries += 1
        entry = inputStream.getNextEntry
      }
      totalEntries should be (1)
      inputStream.close()
    }
  }

  test("SPARK-8372: new logs with no app ID are ignored") {
    val provider = new FsHistoryProvider(createTestConf())

    // Write a new log file without an app id, to make sure it's ignored.
    val logFile1 = newLogFile("app1", None, inProgress = true)
    writeFile(logFile1, true, None,
      SparkListenerLogStart("1.4")
    )

    updateAndCheck(provider) { list =>
      list.size should be (0)
    }
  }

  test("provider correctly checks whether fs is in safe mode") {
    val provider = spy(new FsHistoryProvider(createTestConf()))
    val dfs = mock(classOf[DistributedFileSystem])
    // Asserts that safe mode is false because we can't really control the return value of the mock,
    // since the API is different between hadoop 1 and 2.
    assert(!provider.isFsInSafeMode(dfs))
  }

  test("provider waits for safe mode to finish before initializing") {
    val clock = new ManualClock()
    val provider = new SafeModeTestProvider(createTestConf(), clock)
    val initThread = provider.initialize()
    try {
      provider.getConfig().keys should contain ("HDFS State")

      clock.setTime(5000)
      provider.getConfig().keys should contain ("HDFS State")

      provider.inSafeMode = false
      clock.setTime(10000)

      eventually(timeout(1 second), interval(10 millis)) {
        provider.getConfig().keys should not contain ("HDFS State")
      }
    } finally {
      provider.stop()
    }
  }

  test("provider reports error after FS leaves safe mode") {
    testDir.delete()
    val clock = new ManualClock()
    val provider = new SafeModeTestProvider(createTestConf(), clock)
    val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
    val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
    try {
      provider.inSafeMode = false
      clock.setTime(10000)

      eventually(timeout(1 second), interval(10 millis)) {
        verify(errorHandler).uncaughtException(any(), any())
      }
    } finally {
      provider.stop()
    }
  }

  test("ignore hidden files") {

    // FsHistoryProvider should ignore hidden files.  (It even writes out a hidden file itself
    // that should be ignored).

    // write out one totally bogus hidden file
    val hiddenGarbageFile = new File(testDir, ".garbage")
    val out = new PrintWriter(hiddenGarbageFile)
    // scalastyle:off println
    out.println("GARBAGE")
    // scalastyle:on println
    out.close()

    // also write out one real event log file, but since its a hidden file, we shouldn't read it
    val tmpNewAppFile = newLogFile("hidden", None, inProgress = false)
    val hiddenNewAppFile = new File(tmpNewAppFile.getParentFile, "." + tmpNewAppFile.getName)
    tmpNewAppFile.renameTo(hiddenNewAppFile)

    // and write one real file, which should still get picked up just fine
    val newAppComplete = newLogFile("real-app", None, inProgress = false)
    writeFile(newAppComplete, true, None,
      SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
        None),
      SparkListenerApplicationEnd(5L)
    )

    val provider = new FsHistoryProvider(createTestConf())
    updateAndCheck(provider) { list =>
      list.size should be (1)
      list(0).name should be ("real-app")
    }
  }

  test("support history server ui admin acls") {
    def createAndCheck(conf: SparkConf, properties: (String, String)*)
      (checkFn: SecurityManager => Unit): Unit = {
      // Empty the testDir for each test.
      if (testDir.exists() && testDir.isDirectory) {
        testDir.listFiles().foreach { f => if (f.isFile) f.delete() }
      }

      var provider: FsHistoryProvider = null
      try {
        provider = new FsHistoryProvider(conf)
        val log = newLogFile("app1", Some("attempt1"), inProgress = false)
        writeFile(log, true, None,
          SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
            "test", Some("attempt1")),
          SparkListenerEnvironmentUpdate(Map(
            "Spark Properties" -> properties.toSeq,
            "JVM Information" -> Seq.empty,
            "System Properties" -> Seq.empty,
            "Classpath Entries" -> Seq.empty
          )),
          SparkListenerApplicationEnd(System.currentTimeMillis()))

        provider.checkForLogs()
        val appUi = provider.getAppUI("app1", Some("attempt1"))

        assert(appUi.nonEmpty)
        val securityManager = appUi.get.ui.securityManager
        checkFn(securityManager)
      } finally {
        if (provider != null) {
          provider.stop()
        }
      }
    }

    // Test both history ui admin acls and application acls are configured.
    val conf1 = createTestConf()
      .set("spark.history.ui.acls.enable", "true")
      .set("spark.history.ui.admin.acls", "user1,user2")
      .set("spark.history.ui.admin.acls.groups", "group1")
      .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)

    createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
      securityManager =>
        // Test whether user has permission to access UI.
        securityManager.checkUIViewPermissions("user1") should be (true)
        securityManager.checkUIViewPermissions("user2") should be (true)
        securityManager.checkUIViewPermissions("user") should be (true)
        securityManager.checkUIViewPermissions("abc") should be (false)

        // Test whether user with admin group has permission to access UI.
        securityManager.checkUIViewPermissions("user3") should be (true)
        securityManager.checkUIViewPermissions("user4") should be (true)
        securityManager.checkUIViewPermissions("user5") should be (true)
        securityManager.checkUIViewPermissions("user6") should be (false)
    }

    // Test only history ui admin acls are configured.
    val conf2 = createTestConf()
      .set("spark.history.ui.acls.enable", "true")
      .set("spark.history.ui.admin.acls", "user1,user2")
      .set("spark.history.ui.admin.acls.groups", "group1")
      .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
    createAndCheck(conf2) { securityManager =>
      // Test whether user has permission to access UI.
      securityManager.checkUIViewPermissions("user1") should be (true)
      securityManager.checkUIViewPermissions("user2") should be (true)
      // Check the unknown "user" should return false
      securityManager.checkUIViewPermissions("user") should be (false)

      // Test whether user with admin group has permission to access UI.
      securityManager.checkUIViewPermissions("user3") should be (true)
      securityManager.checkUIViewPermissions("user4") should be (true)
      // Check the "user5" without mapping relation should return false
      securityManager.checkUIViewPermissions("user5") should be (false)
    }

    // Test neither history ui admin acls nor application acls are configured.
     val conf3 = createTestConf()
      .set("spark.history.ui.acls.enable", "true")
      .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
    createAndCheck(conf3) { securityManager =>
      // Test whether user has permission to access UI.
      securityManager.checkUIViewPermissions("user1") should be (false)
      securityManager.checkUIViewPermissions("user2") should be (false)
      securityManager.checkUIViewPermissions("user") should be (false)

      // Test whether user with admin group has permission to access UI.
      // Check should be failed since we don't have acl group settings.
      securityManager.checkUIViewPermissions("user3") should be (false)
      securityManager.checkUIViewPermissions("user4") should be (false)
      securityManager.checkUIViewPermissions("user5") should be (false)
    }
 }

  /**
   * Asks the provider to check for logs and calls a function to perform checks on the updated
   * app list. Example:
   *
   *     updateAndCheck(provider) { list =>
   *       // asserts
   *     }
   */
  private def updateAndCheck(provider: FsHistoryProvider)
      (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
    provider.checkForLogs()
    provider.cleanLogs()
    checkFn(provider.getListing().toSeq)
  }

  private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
    events: SparkListenerEvent*) = {
    val fstream = new FileOutputStream(file)
    val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
    val bstream = new BufferedOutputStream(cstream)
    if (isNewFormat) {
      val newFormatStream = new FileOutputStream(file)
      Utils.tryWithSafeFinally {
        EventLoggingListener.initEventLog(newFormatStream)
      } {
        newFormatStream.close()
      }
    }

    val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
    Utils.tryWithSafeFinally {
      events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
    } {
      writer.close()
    }
  }

  private def createEmptyFile(file: File) = {
    new FileOutputStream(file).close()
  }

  private def createTestConf(): SparkConf = {
    new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
  }

  private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
    extends FsHistoryProvider(conf, clock) {

    @volatile var inSafeMode = true

    // Skip initialization so that we can manually start the safe mode check thread.
    private[history] override def initialize(): Thread = null

    private[history] override def isFsInSafeMode(): Boolean = inSafeMode

  }

}

class TestGroupsMappingProvider extends GroupMappingServiceProvider {
  private val mappings = Map(
    "user3" -> "group1",
    "user4" -> "group1",
    "user5" -> "group")

  override def getGroups(username: String): Set[String] = {
    mappings.get(username).map(Set(_)).getOrElse(Set.empty)
  }
}