diff options
author | Ala Luszczak <ala@databricks.com> | 2017-02-09 19:07:06 +0100 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2017-02-09 19:07:06 +0100 |
commit | 4064574d031215fcfdf899a1ee9f3b6fecb1bfc9 (patch) | |
tree | 73e2eb131378c3bb24a3f0a32f86fcfe491e44c9 /sql/catalyst/src | |
parent | 3fc8e8caf81d0049daf9b776ad4059b0df81630f (diff) | |
download | spark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.tar.gz spark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.tar.bz2 spark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.zip |
[SPARK-19514] Making range interruptible.
## What changes were proposed in this pull request?
Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective.
This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version.
I benchmarked the performance of codegen version on a sample query `spark.range(1000L * 1000 * 1000 * 10).count()` and there is no measurable difference.
## How was this patch tested?
Adds a unit test.
Author: Ala Luszczak <ala@databricks.com>
Closes #16872 from ala/SPARK-19514b.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 04b812e79e..374d714ad5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -23,14 +23,14 @@ import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.language.existentials import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} import org.codehaus.janino.util.ClassFile -import scala.language.existentials -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow @@ -933,7 +933,9 @@ object CodeGenerator extends Logging { classOf[UnsafeArrayData].getName, classOf[MapData].getName, classOf[UnsafeMapData].getName, - classOf[Expression].getName + classOf[Expression].getName, + classOf[TaskContext].getName, + classOf[TaskKilledException].getName )) evaluator.setExtendedClass(classOf[GeneratedClass]) |