aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala40
1 files changed, 39 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933df9e..09075eeb53 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
+ FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
+import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
@@ -335,6 +339,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
+ test("Event log copy") {
+ val provider = new FsHistoryProvider(createTestConf())
+ val logs = (1 to 2).map { i =>
+ val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
+ writeFile(log, true, None,
+ SparkListenerApplicationStart(
+ "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
+ SparkListenerApplicationEnd(5001 * i)
+ )
+ log
+ }
+ provider.checkForLogs()
+
+ (1 to 2).foreach { i =>
+ val underlyingStream = new ByteArrayOutputStream()
+ val outputStream = new ZipOutputStream(underlyingStream)
+ provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
+ outputStream.close()
+ val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
+ var totalEntries = 0
+ var entry = inputStream.getNextEntry
+ entry should not be null
+ while (entry != null) {
+ val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
+ val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
+ actual should be (expected)
+ totalEntries += 1
+ entry = inputStream.getNextEntry
+ }
+ totalEntries should be (1)
+ inputStream.close()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example: