aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-06-03 13:43:13 -0500
committerImran Rashid <irashid@cloudera.com>2015-06-03 13:43:13 -0500
commitd2a86eb8f0fcc02304604da56c589ea58c77587a (patch)
tree78d95e02f898f2925f0e5553bc6ba7adedfd31f0 /core
parentd053a31be93d789e3f26cf55d747ecf6ca386c29 (diff)
downloadspark-d2a86eb8f0fcc02304604da56c589ea58c77587a.tar.gz
spark-d2a86eb8f0fcc02304604da56c589ea58c77587a.tar.bz2
spark-d2a86eb8f0fcc02304604da56c589ea58c77587a.zip
[SPARK-7161] [HISTORY SERVER] Provide REST api to download event logs fro...
...m History Server This PR adds a new API that allows the user to download event logs for an application as a zip file. APIs have been added to download all logs for a given application or just for a specific attempt. This also add an additional method to the ApplicationHistoryProvider to get the raw files, zipped. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #5792 from harishreedharan/eventlog-download and squashes the following commits: 221cc26 [Hari Shreedharan] Update docs with new API information. a131be6 [Hari Shreedharan] Fix style issues. 5528bd8 [Hari Shreedharan] Merge branch 'master' into eventlog-download 6e8156e [Hari Shreedharan] Simplify tests, use Guava stream copy methods. d8ddede [Hari Shreedharan] Remove unnecessary case in EventLogDownloadResource. ffffb53 [Hari Shreedharan] Changed interface to use zip stream. Added more tests. 1100b40 [Hari Shreedharan] Ensure that `Path` does not appear in interfaces, by rafactoring interfaces. 5a5f3e2 [Hari Shreedharan] Fix test ordering issue. 0b66948 [Hari Shreedharan] Minor formatting/import fixes. 4fc518c [Hari Shreedharan] Fix rat failures. a48b91f [Hari Shreedharan] Refactor to make attemptId optional in the API. Also added tests. 0fc1424 [Hari Shreedharan] File download now works for individual attempts and the entire application. 350d7e8 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into eventlog-download fd6ab00 [Hari Shreedharan] Fix style issues 32b7662 [Hari Shreedharan] Use UIRoot directly in ApiRootResource. Also, use `Response` class to set headers. 7b362b2 [Hari Shreedharan] Almost working. 3d18ebc [Hari Shreedharan] [WIP] Try getting the event log download to work.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala70
-rw-r--r--core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json16
-rw-r--r--core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json16
-rw-r--r--core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json34
-rw-r--r--core/src/test/resources/spark-events/local-1430917381535_15
-rw-r--r--core/src/test/resources/spark-events/local-1430917381535_25
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala88
12 files changed, 357 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 298a820196..5f5e0fe1c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -17,6 +17,9 @@
package org.apache.spark.deploy.history
+import java.util.zip.ZipOutputStream
+
+import org.apache.spark.SparkException
import org.apache.spark.ui.SparkUI
private[spark] case class ApplicationAttemptInfo(
@@ -62,4 +65,12 @@ private[history] abstract class ApplicationHistoryProvider {
*/
def getConfig(): Map[String, String] = Map()
+ /**
+ * Writes out the event logs to the output stream provided. The logs will be compressed into a
+ * single zip file and written out.
+ * @throws SparkException if the logs for the app id cannot be found.
+ */
+ @throws(classOf[SparkException])
+ def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 45c2be34c8..52b149b273 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,16 +17,18 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
+import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.mutable
+import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.AccessControlException
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
@@ -59,7 +61,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)
- private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf))
+ private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)
// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -219,6 +222,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
+ override def writeEventLogs(
+ appId: String,
+ attemptId: Option[String],
+ zipStream: ZipOutputStream): Unit = {
+
+ /**
+ * This method compresses the files passed in, and writes the compressed data out into the
+ * [[OutputStream]] passed in. Each file is written as a new [[ZipEntry]] with its name being
+ * the name of the file being compressed.
+ */
+ def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
+ val fs = FileSystem.get(hadoopConf)
+ val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
+ try {
+ outputStream.putNextEntry(new ZipEntry(entryName))
+ ByteStreams.copy(inputStream, outputStream)
+ outputStream.closeEntry()
+ } finally {
+ inputStream.close()
+ }
+ }
+
+ applications.get(appId) match {
+ case Some(appInfo) =>
+ try {
+ // If no attempt is specified, or there is no attemptId for attempts, return all attempts
+ appInfo.attempts.filter { attempt =>
+ attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
+ }.foreach { attempt =>
+ val logPath = new Path(logDir, attempt.logPath)
+ // If this is a legacy directory, then add the directory to the zipStream and add
+ // each file to that directory.
+ if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
+ val files = fs.listFiles(logPath, false)
+ zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
+ zipStream.closeEntry()
+ while (files.hasNext) {
+ val file = files.next().getPath
+ zipFileToStream(file, attempt.logPath + Path.SEPARATOR + file.getName, zipStream)
+ }
+ } else {
+ zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
+ }
+ }
+ } finally {
+ zipStream.close()
+ }
+ case None => throw new SparkException(s"Logs for $appId not found.")
+ }
+ }
+
+
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5a0eb585a9..10638afb74 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.history
import java.util.NoSuchElementException
+import java.util.zip.ZipOutputStream
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import com.google.common.cache._
@@ -173,6 +174,13 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}
+ override def writeEventLogs(
+ appId: String,
+ attemptId: Option[String],
+ zipStream: ZipOutputStream): Unit = {
+ provider.writeEventLogs(appId, attemptId, zipStream)
+ }
+
/**
* Returns the provider configuration to show in the listing page.
*
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index f73c742732..9af90ee5ec 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1
+import java.util.zip.ZipOutputStream
import javax.servlet.ServletContext
import javax.ws.rs._
import javax.ws.rs.core.{Context, Response}
@@ -164,6 +165,18 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
}
}
+ @Path("applications/{appId}/logs")
+ def getEventLogs(
+ @PathParam("appId") appId: String): EventLogDownloadResource = {
+ new EventLogDownloadResource(uiRoot, appId, None)
+ }
+
+ @Path("applications/{appId}/{attemptId}/logs")
+ def getEventLogs(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
+ new EventLogDownloadResource(uiRoot, appId, Some(attemptId))
+ }
}
private[spark] object ApiRootResource {
@@ -193,6 +206,13 @@ private[spark] trait UIRoot {
def getSparkUI(appKey: String): Option[SparkUI]
def getApplicationInfoList: Iterator[ApplicationInfo]
+ def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit = {
+ Response.serverError()
+ .entity("Event logs are only available through the history server.")
+ .status(Response.Status.SERVICE_UNAVAILABLE)
+ .build()
+ }
+
/**
* Get the spark UI with the given appID, and apply a function
* to it. If there is no such app, throw an appropriate exception
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
new file mode 100644
index 0000000000..d416dba832
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.io.OutputStream
+import java.util.zip.ZipOutputStream
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+
+@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
+private[v1] class EventLogDownloadResource(
+ val uIRoot: UIRoot,
+ val appId: String,
+ val attemptId: Option[String]) extends Logging {
+ val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf)
+
+ @GET
+ def getEventLogs(): Response = {
+ try {
+ val fileName = {
+ attemptId match {
+ case Some(id) => s"eventLogs-$appId-$id.zip"
+ case None => s"eventLogs-$appId.zip"
+ }
+ }
+
+ val stream = new StreamingOutput {
+ override def write(output: OutputStream) = {
+ val zipStream = new ZipOutputStream(output)
+ try {
+ uIRoot.writeEventLogs(appId, attemptId, zipStream)
+ } finally {
+ zipStream.close()
+ }
+
+ }
+ }
+
+ Response.ok(stream)
+ .header("Content-Disposition", s"attachment; filename=$fileName")
+ .header("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
+ .build()
+ } catch {
+ case NonFatal(e) =>
+ Response.serverError()
+ .entity(s"Event logs are not available for app: $appId.")
+ .status(Response.Status.SERVICE_UNAVAILABLE)
+ .build()
+ }
+ }
+}
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index ce4fe80b66..d575bf2f28 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -8,6 +8,22 @@
"completed" : true
} ]
}, {
+ "id" : "local-1430917381535",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "attemptId" : "2",
+ "startTime" : "2015-05-06T13:03:00.893GMT",
+ "endTime" : "2015-05-06T13:03:00.950GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ }, {
+ "attemptId" : "1",
+ "startTime" : "2015-05-06T13:03:00.880GMT",
+ "endTime" : "2015-05-06T13:03:00.890GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
"attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index ce4fe80b66..d575bf2f28 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -8,6 +8,22 @@
"completed" : true
} ]
}, {
+ "id" : "local-1430917381535",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "attemptId" : "2",
+ "startTime" : "2015-05-06T13:03:00.893GMT",
+ "endTime" : "2015-05-06T13:03:00.950GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ }, {
+ "attemptId" : "1",
+ "startTime" : "2015-05-06T13:03:00.880GMT",
+ "endTime" : "2015-05-06T13:03:00.890GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
+}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
"attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index dca86fe5f7..15c2de8ef9 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -7,6 +7,22 @@
"sparkUser" : "irashid",
"completed" : true
} ]
+}, {
+ "id" : "local-1430917381535",
+ "name" : "Spark shell",
+ "attempts" : [ {
+ "attemptId" : "2",
+ "startTime" : "2015-05-06T13:03:00.893GMT",
+ "endTime" : "2015-05-06T13:03:00.950GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ }, {
+ "attemptId" : "1",
+ "startTime" : "2015-05-06T13:03:00.880GMT",
+ "endTime" : "2015-05-06T13:03:00.890GMT",
+ "sparkUser" : "irashid",
+ "completed" : true
+ } ]
}, {
"id" : "local-1426533911241",
"name" : "Spark shell",
@@ -24,12 +40,14 @@
"completed" : true
} ]
}, {
- "id" : "local-1425081759269",
- "name" : "Spark shell",
- "attempts" : [ {
- "startTime" : "2015-02-28T00:02:38.277GMT",
- "endTime" : "2015-02-28T00:02:46.912GMT",
- "sparkUser" : "irashid",
- "completed" : true
- } ]
+ "id": "local-1425081759269",
+ "name": "Spark shell",
+ "attempts": [
+ {
+ "startTime": "2015-02-28T00:02:38.277GMT",
+ "endTime": "2015-02-28T00:02:46.912GMT",
+ "sparkUser": "irashid",
+ "completed": true
+ }
+ ]
} ] \ No newline at end of file
diff --git a/core/src/test/resources/spark-events/local-1430917381535_1 b/core/src/test/resources/spark-events/local-1430917381535_1
new file mode 100644
index 0000000000..d5a1303344
--- /dev/null
+++ b/core/src/test/resources/spark-events/local-1430917381535_1
@@ -0,0 +1,5 @@
+{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380880}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380880,"User":"irashid","App Attempt ID":"1"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380890} \ No newline at end of file
diff --git a/core/src/test/resources/spark-events/local-1430917381535_2 b/core/src/test/resources/spark-events/local-1430917381535_2
new file mode 100644
index 0000000000..abb637a22e
--- /dev/null
+++ b/core/src/test/resources/spark-events/local-1430917381535_2
@@ -0,0 +1,5 @@
+{"Event":"SparkListenerLogStart","Spark Version":"1.4.0-SNAPSHOT"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"localhost","Port":61103},"Maximum Memory":278019440,"Timestamp":1430917380893}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","Java Version":"1.8.0_25 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.driver.host":"192.168.1.102","spark.eventLog.enabled":"true","spark.driver.port":"61101","spark.repl.class.uri":"http://192.168.1.102:61100","spark.jars":"","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.executor.id":"driver","spark.master":"local[*]","spark.eventLog.dir":"/Users/irashid/github/kraps/core/src/test/resources/spark-events","spark.fileserver.uri":"http://192.168.1.102:61102","spark.tachyonStore.folderName":"spark-aaaf41b3-d1dd-447f-8951-acf51490758b","spark.app.id":"local-1430917381534"},"System Properties":{"java.io.tmpdir":"/var/folders/36/m29jw1z95qv4ywb1c4n0rz000000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/irashid","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib","user.dir":"/Users/irashid/github/spark","java.library.path":"/Users/irashid/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.25-b02","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_25-b17","java.vm.info":"mixed mode","java.ext.dirs":"/Users/irashid/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.9.5","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"irashid","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=/Users/irashid/github/kraps/core/src/test/resources/spark-events --class org.apache.spark.repl.Main spark-shell","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/jre","java.version":"1.8.0_25","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/etc/hadoop":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/Users/irashid/github/spark/conf/":"System Classpath","/Users/irashid/github/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.0.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-core-3.2.10.jar":"System Classpath","/Users/irashid/github/spark/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath"}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1430917381535","Timestamp":1430917380893,"User":"irashid","App Attempt ID":"2"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1430917380950} \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 0f6933df9e..09075eeb53 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,12 +17,16 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File,
+ FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
+import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.io.Source
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
@@ -335,6 +339,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
+ test("Event log copy") {
+ val provider = new FsHistoryProvider(createTestConf())
+ val logs = (1 to 2).map { i =>
+ val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
+ writeFile(log, true, None,
+ SparkListenerApplicationStart(
+ "downloadApp1", Some("downloadApp1"), 5000 * i, "test", Some(s"attempt$i")),
+ SparkListenerApplicationEnd(5001 * i)
+ )
+ log
+ }
+ provider.checkForLogs()
+
+ (1 to 2).foreach { i =>
+ val underlyingStream = new ByteArrayOutputStream()
+ val outputStream = new ZipOutputStream(underlyingStream)
+ provider.writeEventLogs("downloadApp1", Some(s"attempt$i"), outputStream)
+ outputStream.close()
+ val inputStream = new ZipInputStream(new ByteArrayInputStream(underlyingStream.toByteArray))
+ var totalEntries = 0
+ var entry = inputStream.getNextEntry
+ entry should not be null
+ while (entry != null) {
+ val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8)
+ val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8)
+ actual should be (expected)
+ totalEntries += 1
+ entry = inputStream.getNextEntry
+ }
+ totalEntries should be (1)
+ inputStream.close()
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 14f2d1a589..e5b5e1bb65 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -16,10 +16,13 @@
*/
package org.apache.spark.deploy.history
-import java.io.{File, FileInputStream, FileWriter, IOException}
+import java.io.{File, FileInputStream, FileWriter, InputStream, IOException}
import java.net.{HttpURLConnection, URL}
+import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import com.google.common.base.Charsets
+import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -147,6 +150,70 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
}
+ test("download all logs for app with multiple attempts") {
+ doDownloadTest("local-1430917381535", None)
+ }
+
+ test("download one log for app with multiple attempts") {
+ (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) }
+ }
+
+ test("download legacy logs - all attempts") {
+ doDownloadTest("local-1426533911241", None, legacy = true)
+ }
+
+ test("download legacy logs - single attempts") {
+ (1 to 2). foreach {
+ attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true)
+ }
+ }
+
+ // Test that the files are downloaded correctly, and validate them.
+ def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = {
+
+ val url = attemptId match {
+ case Some(id) =>
+ new URL(s"${generateURL(s"applications/$appId")}/$id/logs")
+ case None =>
+ new URL(s"${generateURL(s"applications/$appId")}/logs")
+ }
+
+ val (code, inputStream, error) = HistoryServerSuite.connectAndGetInputStream(url)
+ code should be (HttpServletResponse.SC_OK)
+ inputStream should not be None
+ error should be (None)
+
+ val zipStream = new ZipInputStream(inputStream.get)
+ var entry = zipStream.getNextEntry
+ entry should not be null
+ val totalFiles = {
+ if (legacy) {
+ attemptId.map { x => 3 }.getOrElse(6)
+ } else {
+ attemptId.map { x => 1 }.getOrElse(2)
+ }
+ }
+ var filesCompared = 0
+ while (entry != null) {
+ if (!entry.isDirectory) {
+ val expectedFile = {
+ if (legacy) {
+ val splits = entry.getName.split("/")
+ new File(new File(logDir, splits(0)), splits(1))
+ } else {
+ new File(logDir, entry.getName)
+ }
+ }
+ val expected = Files.toString(expectedFile, Charsets.UTF_8)
+ val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)
+ actual should be (expected)
+ filesCompared += 1
+ }
+ entry = zipStream.getNextEntry
+ }
+ filesCompared should be (totalFiles)
+ }
+
test("response codes on bad paths") {
val badAppId = getContentAndCode("applications/foobar")
badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
@@ -202,7 +269,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
def getUrl(path: String): String = {
- HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
+ HistoryServerSuite.getUrl(generateURL(path))
+ }
+
+ def generateURL(path: String): URL = {
+ new URL(s"http://localhost:$port/api/v1/$path")
}
def generateExpectation(name: String, path: String): Unit = {
@@ -233,13 +304,18 @@ object HistoryServerSuite {
}
def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
+ val (code, in, errString) = connectAndGetInputStream(url)
+ val inString = in.map(IOUtils.toString)
+ (code, inString, errString)
+ }
+
+ def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("GET")
connection.connect()
val code = connection.getResponseCode()
- val inString = try {
- val in = Option(connection.getInputStream())
- in.map(IOUtils.toString)
+ val inStream = try {
+ Option(connection.getInputStream())
} catch {
case io: IOException => None
}
@@ -249,7 +325,7 @@ object HistoryServerSuite {
} catch {
case io: IOException => None
}
- (code, inString, errString)
+ (code, inStream, errString)
}