aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authormwws <wei.mao@intel.com>2016-03-10 15:45:06 +0000
committerSean Owen <sowen@cloudera.com>2016-03-10 15:45:06 +0000
commit74267beb3546d316c659499a9ff577437541f072 (patch)
treeac281e3b1b01d6c4be20f0bd4b1918f3f7ec2d88 /sql/core/src
parent927e22eff894cb9b4fe8e5de3578517997292a8a (diff)
downloadspark-74267beb3546d316c659499a9ff577437541f072.tar.gz
spark-74267beb3546d316c659499a9ff577437541f072.tar.bz2
spark-74267beb3546d316c659499a9ff577437541f072.zip
[SPARK-13758][STREAMING][CORE] enhance exception message to avoid misleading
We have a recoverable Spark streaming job with checkpoint enabled, it could be executed correctly at first time, but throw following exception when restarted and recovered from checkpoint. ``` org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) at org.apache.spark.rdd.RDD.union(RDD.scala:565) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) ``` According to exception, it shows I invoked transformations and actions in other transformations, but I did not. The real reason is that I used external RDD in DStream operation. External RDD data is not stored in checkpoint, so that during recovering, the initial value of _sc in this RDD is assigned to null and hit above exception. But you can find the error message is misleading, it indicates nothing about the real issue Here is the code to reproduce it. ```scala object Repo { def createContext(ip: String, port: Int, checkpointDirectory: String):StreamingContext = { println("Creating new context") val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpointDirectory) var cached = ssc.sparkContext.parallelize(Seq("apple, banana")) val words = ssc.socketTextStream(ip, port).flatMap(_.split(" ")) words.foreachRDD((rdd: RDD[String]) => { val res = rdd.map(word => (word, word.length)).collect() println("words: " + res.mkString(", ")) cached = cached.union(rdd) cached.checkpoint() println("cached words: " + cached.collect.mkString(", ")) }) ssc } def main(args: Array[String]) { val ip = "localhost" val port = 9999 val dir = "/home/maowei/tmp" val ssc = StreamingContext.getOrCreate(dir, () => { createContext(ip, port, dir) }) ssc.start() ssc.awaitTermination() } } ``` Author: mwws <wei.mao@intel.com> Closes #11595 from mwws/SPARK-MissleadingLog.
Diffstat (limited to 'sql/core/src')
0 files changed, 0 insertions, 0 deletions