aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala13
-rw-r--r--sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt4
-rw-r--r--sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala42
4 files changed, 63 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 3eff8d952b..2424586431 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -72,6 +72,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
+ case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
+ // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
+ // It's safe since no place uses them.
+ logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
@@ -102,4 +106,13 @@ private[spark] object ReplayListenerBus {
// utility filter that selects all event logs during replay
val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+
+ /**
+ * Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
+ * old json may fail and we can just ignore these failures.
+ */
+ val KNOWN_REMOVED_CLASSES = Set(
+ "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
+ "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
+ )
}
diff --git a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
new file mode 100644
index 0000000000..aa7e9a8c20
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.0.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@514502dc","offsetDesc":"[-]"}},"exception":"Query hello terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477593059313}
diff --git a/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
new file mode 100644
index 0000000000..646cf10718
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-event-logs-version-2.0.1.txt
@@ -0,0 +1,4 @@
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}}}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}},"exception":null}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@70c61dc8","offsetDesc":"[-]"}},"exception":"org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)\n\tat org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.collect(Dataset.scala:2173)\n\tat org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:154)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:366)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)\n\tat org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)\nCaused by: java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n"}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1477701734609}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index ff843865a0..cebb32a0a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.streaming
+import scala.collection.mutable
+
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
+import org.apache.spark.scheduler._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
@@ -206,6 +209,45 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
+ test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+ // query-event-logs-version-2.0.0.txt has all types of events generated by
+ // Structured Streaming in Spark 2.0.0.
+ // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
+ // to verify that we can skip broken jsons generated by Structured Streaming.
+ testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
+ }
+
+ test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+ // query-event-logs-version-2.0.1.txt has all types of events generated by
+ // Structured Streaming in Spark 2.0.1.
+ // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
+ // to verify that we can skip broken jsons generated by Structured Streaming.
+ testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
+ }
+
+ private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
+ val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
+ val events = mutable.ArrayBuffer[SparkListenerEvent]()
+ try {
+ val replayer = new ReplayListenerBus() {
+ // Redirect all parsed events to `events`
+ override def doPostEvent(
+ listener: SparkListenerInterface,
+ event: SparkListenerEvent): Unit = {
+ events += event
+ }
+ }
+ // Add a dummy listener so that "doPostEvent" will be called.
+ replayer.addListener(new SparkListener {})
+ replayer.replay(input, fileName)
+ // SparkListenerApplicationEnd is the only valid event
+ assert(events.size === 1)
+ assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
+ } finally {
+ input.close()
+ }
+ }
+
private def assertStreamingQueryInfoEquals(
expected: StreamingQueryStatus,
actual: StreamingQueryStatus): Unit = {