diff options
author | Patrick Wendell <patrick@databricks.com> | 2015-04-29 00:35:08 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-29 00:35:08 -0700 |
commit | 1fd6ed9a56ac4671f4a3d25a42823ba3bf01f60f (patch) | |
tree | 22c720bcc9e24ffbdc6724bd5489225c4e9c3643 | |
parent | fe917f5ec9be8c8424416f7b5423ddb4318e03a0 (diff) | |
download | spark-1fd6ed9a56ac4671f4a3d25a42823ba3bf01f60f.tar.gz spark-1fd6ed9a56ac4671f4a3d25a42823ba3bf01f60f.tar.bz2 spark-1fd6ed9a56ac4671f4a3d25a42823ba3bf01f60f.zip |
[SPARK-7204] [SQL] Fix callSite for Dataframe and SQL operations
This patch adds SQL to the set of excluded libraries when
generating a callSite. This makes the callSite mechanism work
properly for the data frame API. I also added a small improvement for
JDBC queries where we just use the string "Spark JDBC Server Query"
instead of trying to give a callsite that doesn't make any sense
to the user.
Before (DF):
![screen shot 2015-04-28 at 1 29 26 pm](https://cloud.githubusercontent.com/assets/320616/7380170/ef63bfb0-edae-11e4-989c-f88a5ba6bbee.png)
After (DF):
![screen shot 2015-04-28 at 1 34 58 pm](https://cloud.githubusercontent.com/assets/320616/7380181/fa7f6d90-edae-11e4-9559-26f163ed63b8.png)
After (JDBC):
![screen shot 2015-04-28 at 2 00 10 pm](https://cloud.githubusercontent.com/assets/320616/7380185/02f5b2a4-edaf-11e4-8e5b-99bdc3df66dd.png)
Author: Patrick Wendell <patrick@databricks.com>
Closes #5757 from pwendell/dataframes and squashes the following commits:
0d931a4 [Patrick Wendell] Attempting to fix PySpark tests
85bf740 [Patrick Wendell] [SPARK-7204] Fix callsite for dataframe operations.
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 28 | ||||
-rw-r--r-- | python/pyspark/sql/dataframe.py | 3 |
2 files changed, 21 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4c028c06a5..4b5a5df5ef 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1299,16 +1299,18 @@ private[spark] object Utils extends Logging { } /** Default filtering function for finding call sites using `getCallSite`. */ - private def coreExclusionFunction(className: String): Boolean = { - // A regular expression to match classes of the "core" Spark API that we want to skip when - // finding the call site of a method. + private def sparkInternalExclusionFunction(className: String): Boolean = { + // A regular expression to match classes of the internal Spark API's + // that we want to skip when finding the call site of a method. val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r + val SPARK_SQL_CLASS_REGEX = """^org\.apache\.spark\.sql.*""".r val SCALA_CORE_CLASS_PREFIX = "scala" - val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined + val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined || + SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX) // If the class is a Spark internal class or a Scala class, then exclude. - isSparkCoreClass || isScalaClass + isSparkClass || isScalaClass } /** @@ -1318,7 +1320,7 @@ private[spark] object Utils extends Logging { * * @param skipClass Function that is used to exclude non-user-code classes. */ - def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = { + def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = { // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD // transformation, a SparkContext function (such as parallelize), or anything else that leads @@ -1357,9 +1359,17 @@ private[spark] object Utils extends Logging { } val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt - CallSite( - shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine", - longForm = callStack.take(callStackDepth).mkString("\n")) + val shortForm = + if (firstUserFile == "HiveSessionImpl.java") { + // To be more user friendly, show a nicer string for queries submitted from the JDBC + // server. + "Spark JDBC Server Query" + } else { + s"$lastSparkMethod at $firstUserFile:$firstUserLine" + } + val longForm = callStack.take(callStackDepth).mkString("\n") + + CallSite(shortForm, longForm) } /** Return a string containing part of a file from byte 'start' to 'end'. */ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4759f5fe78..6879fe0805 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -237,7 +237,8 @@ class DataFrame(object): :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. >>> df.explain() - PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:... + PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\ + NativeMethodAccessorImpl.java:... >>> df.explain(True) == Parsed Logical Plan == |