aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-08-17 13:31:34 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-08-17 13:31:34 -0700
commitd60af8f6aa53373de1333cc642cf2a9d7b39d912 (patch)
tree1c7a67060ab5a016bf8576af3719ff15614355e7 /streaming/src
parentcc97ea188e1d5b8e851d1a8438b8af092783ec04 (diff)
downloadspark-d60af8f6aa53373de1333cc642cf2a9d7b39d912.tar.gz
spark-d60af8f6aa53373de1333cc642cf2a9d7b39d912.tar.bz2
spark-d60af8f6aa53373de1333cc642cf2a9d7b39d912.zip
[SPARK-17096][SQL][STREAMING] Improve exception string reported through the StreamingQueryListener
## What changes were proposed in this pull request? Currently, the stackTrace (as `Array[StackTraceElements]`) reported through StreamingQueryListener.onQueryTerminated is useless as it has the stack trace of where StreamingQueryException is defined, not the stack trace of underlying exception. For example, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.stackTrace` will have ``` org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:211) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` This is basically useless, as it is location where the StreamingQueryException was defined. What we want is Here is the right way to reason about what should be posted as through StreamingQueryListener.onQueryTerminated - The actual exception could either be a SparkException, or an arbitrary exception. - SparkException reports the relevant executor stack trace of a failed task as a string in the the exception message. The `Array[StackTraceElements]` returned by `SparkException.stackTrace()` is mostly irrelevant. - For any arbitrary exception, the `Array[StackTraceElements]` returned by `exception.stackTrace()` may be relevant. - When there is an error in a streaming query, it's hard to reason whether the `Array[StackTraceElements]` is useful or not. In fact, it is not clear whether it is even useful to report the stack trace as this array of Java objects. It may be sufficient to report the strack trace as a string, along with the message. This is how Spark reported executor stra - Hence, this PR simplifies the API by removing the array `stackTrace` from `QueryTerminated`. Instead the `exception` returns a string containing the message and the stack trace of the actual underlying exception that failed the streaming query (i.e. not that of the StreamingQueryException). If anyone is interested in the actual stack trace as an array, can always access them through `streamingQuery.exception` which returns the exception object. With this change, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.exception` will be ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ArithmeticException: / by zero at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcII$sp(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:226) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1429) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1416) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1416) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) ... ``` It contains the relevant executor stack trace. In a case non-SparkException, if the streaming source MemoryStream throws an exception, exception message will have the relevant stack trace. ``` java.lang.RuntimeException: this is the exception message at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:103) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:313) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:313) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` Note that this change in the public `QueryTerminated` class is okay as the APIs are still experimental. ## How was this patch tested? Unit tests that test whether the right information is present in the exception message reported through QueryTerminated object. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14675 from tdas/SPARK-17096.
Diffstat (limited to 'streaming/src')
0 files changed, 0 insertions, 0 deletions