aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala88
2 files changed, 121 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933df9e..09075eeb53 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
+ FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
+import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
@@ -335,6 +339,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
+ test("Event log copy") {
+ val provider = new FsHistoryProvider(createTestConf())
+ val logs = (1 to 2).map { i =>
+ val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
+ writeFile(log, true, None,
+ SparkListenerApplicationStart(
+ "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
+ SparkListenerApplicationEnd(5001 * i)
+ )
+ log
+ }
+ provider.checkForLogs()
+
+ (1 to 2).foreach { i =>
+ val underlyingStream = new ByteArrayOutputStream()
+ val outputStream = new ZipOutputStream(underlyingStream)
+ provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
+ outputStream.close()
+ val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
+ var totalEntries = 0
+ var entry = inputStream.getNextEntry
+ entry should not be null
+ while (entry != null) {
+ val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
+ val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
+ actual should be (expected)
+ totalEntries += 1
+ entry = inputStream.getNextEntry
+ }
+ totalEntries should be (1)
+ inputStream.close()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 14f2d1a589..e5b5e1bb65 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -16,10 +16,13 @@
*/
package org.apache.spark.deploy.history
-import java.io.{File, FileInputStream, FileWriter, IOException}
+import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
import java.net.{HttpURLConnection, URL}
+import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -147,6 +150,70 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
}
+ test("download all logs for app with multiple attempts") {
+ doDownloadTest("local-1430917381535", None)
+ }
+
+ test("download one log for app with multiple attempts") {
+ (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
+ }
+
+ test("download legacy logs - all attempts") {
+ doDownloadTest("local-1426533911241", None, legacy = true)
+ }
+
+ test("download legacy logs - single attempts") {
+ (1 to 2). foreach {
+ attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
+ }
+ }
+
+ // Test that the files are downloaded correctly, and validate them.
+ def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
+
+ val url = attemptId match {
+ case Some(id) =>
+ new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
+ case None =>
+ new URL(s"${generateURL(s"applications/$appId")}/logs")
+ }
+
+ val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
+ code should be (HttpServletResponse.SC_OK)
+ inputStream should not be None
+ error should be (None)
+
+ val zipStream = new ZipInputStream(inputStream.get)
+ var entry = zipStream.getNextEntry
+ entry should not be null
+ val totalFiles = {
+ if (legacy) {
+ attemptId.map { x => 3 }.getOrElse(6)
+ } else {
+ attemptId.map { x => 1 }.getOrElse(2)
+ }
+ }
+ var filesCompared = 0
+ while (entry != null) {
+ if (!entry.isDirectory) {
+ val expectedFile = {
+ if (legacy) {
+ val splits = entry.getName.split("/")
+ new File(new File(logDir, splits(0)), splits(1))
+ } else {
+ new File(logDir, entry.getName)
+ }
+ }
+ val expected = Files.toString(expectedFile, Charsets.UTF_8)
+ val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
+ actual should be (expected)
+ filesCompared += 1
+ }
+ entry = zipStream.getNextEntry
+ }
+ filesCompared should be (totalFiles)
+ }
+
test("response codes on bad paths") {
val badAppId = getContentAndCode("applications/foobar")
badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
@@ -202,7 +269,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
def getUrl(path: String): String = {
- HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
+ HistoryServerSuite.getUrl(generateURL(path))
+ }
+
+ def generateURL(path: String): URL = {
+ new URL(s"http://localhost:$port/api/v1/$path")
}
def generateExpectation(name: String, path: String): Unit = {
@@ -233,13 +304,18 @@ object HistoryServerSuite {
}
def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
+ val (code, in, errString) = connectAndGetInputStream(url)
+ val inString = in.map(IOUtils.toString)
+ (code, inString, errString)
+ }
+
+ def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("GET")
connection.connect()
val code = connection.getResponseCode()
- val inString = try {
- val in = Option(connection.getInputStream())
- in.map(IOUtils.toString)
+ val inStream = try {
+ Option(connection.getInputStream())
} catch {
case io: IOException => None
}
@@ -249,7 +325,7 @@ object HistoryServerSuite {
} catch {
case io: IOException => None
}
- (code, inString, errString)
+ (code, inStream, errString)
}