aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAla Luszczak <ala@databricks.com>2017-02-09 19:07:06 +0100
committerReynold Xin <rxin@databricks.com>2017-02-09 19:07:06 +0100
commit4064574d031215fcfdf899a1ee9f3b6fecb1bfc9 (patch)
tree73e2eb131378c3bb24a3f0a32f86fcfe491e44c9 /sql/catalyst
parent3fc8e8caf81d0049daf9b776ad4059b0df81630f (diff)
downloadspark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.tar.gz
spark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.tar.bz2
spark-4064574d031215fcfdf899a1ee9f3b6fecb1bfc9.zip
[SPARK-19514] Making range interruptible.
## What changes were proposed in this pull request? Previously range operator could not be interrupted. For example, using DAGScheduler.cancelStage(...) on a query with range might have been ineffective. This change adds periodic checks of TaskContext.isInterrupted to codegen version, and InterruptibleOperator to non-codegen version. I benchmarked the performance of codegen version on a sample query `spark.range(1000L * 1000 * 1000 * 10).count()` and there is no measurable difference. ## How was this patch tested? Adds a unit test. Author: Ala Luszczak <ala@databricks.com> Closes #16872 from ala/SPARK-19514b.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 04b812e79e..374d714ad5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -23,14 +23,14 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.language.existentials
import scala.util.control.NonFatal
import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler}
import org.codehaus.janino.util.ClassFile
-import scala.language.existentials
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.catalyst.InternalRow
@@ -933,7 +933,9 @@ object CodeGenerator extends Logging {
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
- classOf[Expression].getName
+ classOf[Expression].getName,
+ classOf[TaskContext].getName,
+ classOf[TaskKilledException].getName
))
evaluator.setExtendedClass(classOf[GeneratedClass])