diff options
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala | 40 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala | 88 |
2 files changed, 121 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 0f6933df9e..09075eeb53 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -17,12 +17,16 @@ package org.apache.spark.deploy.history -import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter} +import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File, + FileOutputStream, OutputStreamWriter} import java.net.URI import java.util.concurrent.TimeUnit +import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.io.Source +import com.google.common.base.Charsets +import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter @@ -335,6 +339,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(!log2.exists()) } + test("Event log copy") { + val provider = new FsHistoryProvider(createTestConf()) + val logs = (1 to 2).map { i => + val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) + writeFile(log, true, None, + SparkListenerApplicationStart( + "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")), + SparkListenerApplicationEnd(5001 * i) + ) + log + } + provider.checkForLogs() + + (1 to 2).foreach { i => + val underlyingStream = new ByteArrayOutputStream() + val outputStream = new ZipOutputStream(underlyingStream) + provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream) + outputStream.close() + val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray)) + var totalEntries = 0 + var entry = inputStream.getNextEntry + entry should not be null + while (entry != null) { + val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8) + val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8) + actual should be (expected) + totalEntries += 1 + entry = inputStream.getNextEntry + } + totalEntries should be (1) + inputStream.close() + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 14f2d1a589..e5b5e1bb65 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -16,10 +16,13 @@ */ package org.apache.spark.deploy.history -import java.io.{File, FileInputStream, FileWriter, IOException} +import java.io.{File, FileInputStream, FileWriter, InputStream, IOException} import java.net.{HttpURLConnection, URL} +import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import com.google.common.base.Charsets +import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.{FileUtils, IOUtils} import org.mockito.Mockito.when import org.scalatest.{BeforeAndAfter, Matchers} @@ -147,6 +150,70 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } + test("download all logs for app with multiple attempts") { + doDownloadTest("local-1430917381535", None) + } + + test("download one log for app with multiple attempts") { + (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) } + } + + test("download legacy logs - all attempts") { + doDownloadTest("local-1426533911241", None, legacy = true) + } + + test("download legacy logs - single attempts") { + (1 to 2). foreach { + attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true) + } + } + + // Test that the files are downloaded correctly, and validate them. + def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = { + + val url = attemptId match { + case Some(id) => + new URL(s"${generateURL(s"applications/$appId")}/$id/logs") + case None => + new URL(s"${generateURL(s"applications/$appId")}/logs") + } + + val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url) + code should be (HttpServletResponse.SC_OK) + inputStream should not be None + error should be (None) + + val zipStream = new ZipInputStream(inputStream.get) + var entry = zipStream.getNextEntry + entry should not be null + val totalFiles = { + if (legacy) { + attemptId.map { x => 3 }.getOrElse(6) + } else { + attemptId.map { x => 1 }.getOrElse(2) + } + } + var filesCompared = 0 + while (entry != null) { + if (!entry.isDirectory) { + val expectedFile = { + if (legacy) { + val splits = entry.getName.split("/") + new File(new File(logDir, splits(0)), splits(1)) + } else { + new File(logDir, entry.getName) + } + } + val expected = Files.toString(expectedFile, Charsets.UTF_8) + val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8) + actual should be (expected) + filesCompared += 1 + } + entry = zipStream.getNextEntry + } + filesCompared should be (totalFiles) + } + test("response codes on bad paths") { val badAppId = getContentAndCode("applications/foobar") badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND) @@ -202,7 +269,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } def getUrl(path: String): String = { - HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path")) + HistoryServerSuite.getUrl(generateURL(path)) + } + + def generateURL(path: String): URL = { + new URL(s"http://localhost:$port/api/v1/$path") } def generateExpectation(name: String, path: String): Unit = { @@ -233,13 +304,18 @@ object HistoryServerSuite { } def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = { + val (code, in, errString) = connectAndGetInputStream(url) + val inString = in.map(IOUtils.toString) + (code, inString, errString) + } + + def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod("GET") connection.connect() val code = connection.getResponseCode() - val inString = try { - val in = Option(connection.getInputStream()) - in.map(IOUtils.toString) + val inStream = try { + Option(connection.getInputStream()) } catch { case io: IOException => None } @@ -249,7 +325,7 @@ object HistoryServerSuite { } catch { case io: IOException => None } - (code, inString, errString) + (code, inStream, errString) } |