aboutsummaryrefslogtreecommitdiff
Commit message (Collapse)AuthorAgeFilesLines
...
* [SPARK-19611][SQL] Preserve metastore field order when merging inferred schemaBudde2017-03-102-4/+22
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in #16944 may not preserve the same field order as the metastore schema in some cases, which can cause queries to fail. This change ensures that the metastore field order is preserved. ## How was this patch tested? A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.``` The particular failure usecase from #16944 was tested manually as well. Author: Budde <budde@amazon.com> Closes #17249 from budde/PreserveMetastoreFieldOrder.
* [SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and ↵Yong Tang2017-03-1012-131/+3
| | | | | | | | | | | | | | | | | | | | | | | | SPARK_JAVA_OPTS This fix removes deprecated support for config `SPARK_YARN_USER_ENV`, as is mentioned in SPARK-17979. This fix also removes deprecated support for the following: ``` SPARK_YARN_USER_ENV SPARK_JAVA_OPTS SPARK_CLASSPATH SPARK_WORKER_INSTANCES ``` Related JIRA: [SPARK-14453]: https://issues.apache.org/jira/browse/SPARK-14453 [SPARK-12344]: https://issues.apache.org/jira/browse/SPARK-12344 [SPARK-15781]: https://issues.apache.org/jira/browse/SPARK-15781 Existing tests should pass. Author: Yong Tang <yong.tang.github@outlook.com> Closes #17212 from yongtang/SPARK-17979.
* [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical planCarson Wang2017-03-101-1/+1
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing. This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator. ## How was this patch tested? Before the patch, the physical plan shows two different exchange coordinator id for Join. ``` == Physical Plan == *Project [key1#3L, value2#12L] +- *SortMergeJoin [key1#3L], [key2#11L], Inner :- *Sort [key1#3L ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864] : +- *Project [(id#0L % 500) AS key1#3L] : +- *Filter isnotnull((id#0L % 500)) : +- *Range (0, 1000, step=1, splits=Some(10)) +- *Sort [key2#11L ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864] +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L] +- *Filter isnotnull((id#8L % 500)) +- *Range (0, 1000, step=1, splits=Some(10)) ``` After the patch, two exchange coordinator id are the same. Author: Carson Wang <carson.wang@intel.com> Closes #16952 from carsonwang/FixCoordinatorId.
* [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding ↵Kazuaki Ishizaki2017-03-105-7/+55
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | range() ## What changes were proposed in this pull request? This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local varibles 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x. Benchmark program: ```java val N = 1 << 29 val iters = 2 val benchmark = new Benchmark("range.count", N * iters) benchmark.addCase(s"with this PR") { i => var n = 0 var len = 0 while (n < iters) { len += sparkSession.range(N).selectExpr("count(id)").collect.length n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1349 / 1356 796.2 1.3 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 177 / 271 6065.3 0.2 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ while (range_number != range_batchEnd) { /* 071 */ long range_value = range_number; /* 072 */ range_number += 1L; /* 073 */ /* 074 */ // do aggregate /* 075 */ // common sub-expressions /* 076 */ /* 077 */ // evaluate aggregate function /* 078 */ boolean agg_isNull1 = false; /* 079 */ /* 080 */ long agg_value1 = -1L; /* 081 */ agg_value1 = agg_bufValue + 1L; /* 082 */ // update aggregation buffer /* 083 */ agg_bufIsNull = false; /* 084 */ agg_bufValue = agg_value1; /* 085 */ /* 086 */ if (shouldStop()) return; /* 087 */ } /* 088 */ /* 089 */ if (range_taskContext.isInterrupted()) { /* 090 */ throw new TaskKilledException(); /* 091 */ } /* 092 */ /* 093 */ long range_nextBatchTodo; /* 094 */ if (range_numElementsTodo > 1000L) { /* 095 */ range_nextBatchTodo = 1000L; /* 096 */ range_numElementsTodo -= 1000L; /* 097 */ } else { /* 098 */ range_nextBatchTodo = range_numElementsTodo; /* 099 */ range_numElementsTodo = 0; /* 100 */ if (range_nextBatchTodo == 0) break; /* 101 */ } /* 102 */ range_numOutputRows.add(range_nextBatchTodo); /* 103 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 104 */ /* 105 */ range_batchEnd += range_nextBatchTodo * 1L; /* 106 */ } /* 107 */ /* 108 */ } /* 109 */ /* 110 */ private void initRange(int idx) { /* 111 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 112 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 113 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 114 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 115 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 117 */ /* 118 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 119 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 120 */ range_number = Long.MAX_VALUE; /* 121 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 122 */ range_number = Long.MIN_VALUE; /* 123 */ } else { /* 124 */ range_number = st.longValue(); /* 125 */ } /* 126 */ range_batchEnd = range_number; /* 127 */ /* 128 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 129 */ .multiply(step).add(start); /* 130 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 131 */ partitionEnd = Long.MAX_VALUE; /* 132 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 133 */ partitionEnd = Long.MIN_VALUE; /* 134 */ } else { /* 135 */ partitionEnd = end.longValue(); /* 136 */ } /* 137 */ /* 138 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 139 */ java.math.BigInteger.valueOf(range_number)); /* 140 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 141 */ if (range_numElementsTodo < 0) { /* 142 */ range_numElementsTodo = 0; /* 143 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 144 */ range_numElementsTodo++; /* 145 */ } /* 146 */ } /* 147 */ /* 148 */ protected void processNext() throws java.io.IOException { /* 149 */ while (!agg_initAgg) { /* 150 */ agg_initAgg = true; /* 151 */ long agg_beforeAgg = System.nanoTime(); /* 152 */ agg_doAggregateWithoutKey(); /* 153 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 154 */ /* 155 */ // output the result /* 156 */ /* 157 */ agg_numOutputRows.add(1); /* 158 */ agg_rowWriter.zeroOutNullBytes(); /* 159 */ /* 160 */ if (agg_bufIsNull) { /* 161 */ agg_rowWriter.setNullAt(0); /* 162 */ } else { /* 163 */ agg_rowWriter.write(0, agg_bufValue); /* 164 */ } /* 165 */ append(agg_result); /* 166 */ } /* 167 */ } /* 168 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 050 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 051 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ /* 056 */ } /* 057 */ /* 058 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 059 */ // initialize aggregation buffer /* 060 */ agg_bufIsNull = false; /* 061 */ agg_bufValue = 0L; /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ initRange(partitionIndex); /* 067 */ } /* 068 */ /* 069 */ while (true) { /* 070 */ long range_range = range_batchEnd - range_number; /* 071 */ if (range_range != 0L) { /* 072 */ int range_localEnd = (int)(range_range / 1L); /* 073 */ for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) { /* 074 */ long range_value = ((long)range_localIdx * 1L) + range_number; /* 075 */ /* 076 */ // do aggregate /* 077 */ // common sub-expressions /* 078 */ /* 079 */ // evaluate aggregate function /* 080 */ boolean agg_isNull1 = false; /* 081 */ /* 082 */ long agg_value1 = -1L; /* 083 */ agg_value1 = agg_bufValue + 1L; /* 084 */ // update aggregation buffer /* 085 */ agg_bufIsNull = false; /* 086 */ agg_bufValue = agg_value1; /* 087 */ /* 088 */ // shouldStop check is eliminated /* 089 */ } /* 090 */ range_number = range_batchEnd; /* 091 */ } /* 092 */ /* 093 */ if (range_taskContext.isInterrupted()) { /* 094 */ throw new TaskKilledException(); /* 095 */ } /* 096 */ /* 097 */ long range_nextBatchTodo; /* 098 */ if (range_numElementsTodo > 1000L) { /* 099 */ range_nextBatchTodo = 1000L; /* 100 */ range_numElementsTodo -= 1000L; /* 101 */ } else { /* 102 */ range_nextBatchTodo = range_numElementsTodo; /* 103 */ range_numElementsTodo = 0; /* 104 */ if (range_nextBatchTodo == 0) break; /* 105 */ } /* 106 */ range_numOutputRows.add(range_nextBatchTodo); /* 107 */ range_inputMetrics.incRecordsRead(range_nextBatchTodo); /* 108 */ /* 109 */ range_batchEnd += range_nextBatchTodo * 1L; /* 110 */ } /* 111 */ /* 112 */ } /* 113 */ /* 114 */ private void initRange(int idx) { /* 115 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 116 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 117 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L); /* 118 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 119 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 120 */ long partitionEnd; /* 121 */ /* 122 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 123 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 124 */ range_number = Long.MAX_VALUE; /* 125 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 126 */ range_number = Long.MIN_VALUE; /* 127 */ } else { /* 128 */ range_number = st.longValue(); /* 129 */ } /* 130 */ range_batchEnd = range_number; /* 131 */ /* 132 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 133 */ .multiply(step).add(start); /* 134 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 135 */ partitionEnd = Long.MAX_VALUE; /* 136 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 137 */ partitionEnd = Long.MIN_VALUE; /* 138 */ } else { /* 139 */ partitionEnd = end.longValue(); /* 140 */ } /* 141 */ /* 142 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 143 */ java.math.BigInteger.valueOf(range_number)); /* 144 */ range_numElementsTodo = startToEnd.divide(step).longValue(); /* 145 */ if (range_numElementsTodo < 0) { /* 146 */ range_numElementsTodo = 0; /* 147 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 148 */ range_numElementsTodo++; /* 149 */ } /* 150 */ } /* 151 */ /* 152 */ protected void processNext() throws java.io.IOException { /* 153 */ while (!agg_initAgg) { /* 154 */ agg_initAgg = true; /* 155 */ long agg_beforeAgg = System.nanoTime(); /* 156 */ agg_doAggregateWithoutKey(); /* 157 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 158 */ /* 159 */ // output the result /* 160 */ /* 161 */ agg_numOutputRows.add(1); /* 162 */ agg_rowWriter.zeroOutNullBytes(); /* 163 */ /* 164 */ if (agg_bufIsNull) { /* 165 */ agg_rowWriter.setNullAt(0); /* 166 */ } else { /* 167 */ agg_rowWriter.write(0, agg_bufValue); /* 168 */ } /* 169 */ append(agg_result); /* 170 */ } /* 171 */ } /* 172 */ } ``` A part of suppressing `shouldStop()` was originally developed by inouehrs ## How was this patch tested? Add new tests into `DataFrameRangeSuite` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17122 from kiszk/SPARK-19786.
* [SPARK-19891][SS] Await Batch Lock notified on stream execution exitTyson Condie2017-03-091-0/+7
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown. ## How was this patch tested? Current tests that throw exceptions at runtime will finish faster as a result of this update. zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #17231 from tcondie/kafka-writer.
* [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating ↵Kazuaki Ishizaki2017-03-094-9/+208
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | boxing/unboxing ## What changes were proposed in this pull request? This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) with cloud-fan. Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method. This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`. The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program. Without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1923 / 1952 52.0 19.2 1.0X DataFrame 526 / 548 190.2 5.3 3.7X Dataset 3094 / 3154 32.3 30.9 0.6X ``` With this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ RDD 1883 / 1892 53.1 18.8 1.0X DataFrame 502 / 642 199.1 5.0 3.7X Dataset 657 / 784 152.2 6.6 2.9X ``` ```java def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ val rdd = spark.sparkContext.range(0, numRows) val ds = spark.range(0, numRows) val func = (l: Long) => l + 1 val benchmark = new Benchmark("back-to-back map", numRows) ... benchmark.addCase("Dataset") { iter => var res = ds.as[Long] var i = 0 while (i < numChains) { res = res.map(func) i += 1 } res.queryExecution.toRdd.foreach(_ => Unit) } benchmark } ``` A motivating example ```java Seq(1, 2, 3).toDS.map(i => i * 7).show ``` Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ Object mapelements_funcResult = null; /* 054 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 055 */ if (mapelements_funcResult == null) { /* 056 */ mapelements_isNull = true; /* 057 */ } else { /* 058 */ mapelements_value = (Integer) mapelements_funcResult; /* 059 */ } /* 060 */ /* 061 */ } /* 062 */ /* 063 */ } /* 064 */ /* 065 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 066 */ /* 067 */ if (mapelements_isNull) { /* 068 */ serializefromobject_rowWriter.setNullAt(0); /* 069 */ } else { /* 070 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 071 */ } /* 072 */ append(serializefromobject_result); /* 073 */ if (shouldStop()) return; /* 074 */ } /* 075 */ } /* 076 */ } ``` Generated code with this PR (lines 48-56 are changed) ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private UnsafeRow deserializetoobject_result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 012 */ private int mapelements_argValue; /* 013 */ private UnsafeRow mapelements_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 016 */ private UnsafeRow serializefromobject_result; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 019 */ /* 020 */ public GeneratedIterator(Object[] references) { /* 021 */ this.references = references; /* 022 */ } /* 023 */ /* 024 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 025 */ partitionIndex = index; /* 026 */ this.inputs = inputs; /* 027 */ inputadapter_input = inputs[0]; /* 028 */ deserializetoobject_result = new UnsafeRow(1); /* 029 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0); /* 030 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 031 */ /* 032 */ mapelements_result = new UnsafeRow(1); /* 033 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0); /* 034 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 035 */ serializefromobject_result = new UnsafeRow(1); /* 036 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0); /* 037 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 038 */ /* 039 */ } /* 040 */ /* 041 */ protected void processNext() throws java.io.IOException { /* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ int inputadapter_value = inputadapter_row.getInt(0); /* 045 */ /* 046 */ boolean mapelements_isNull = true; /* 047 */ int mapelements_value = -1; /* 048 */ if (!false) { /* 049 */ mapelements_argValue = inputadapter_value; /* 050 */ /* 051 */ mapelements_isNull = false; /* 052 */ if (!mapelements_isNull) { /* 053 */ mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue); /* 054 */ } /* 055 */ /* 056 */ } /* 057 */ /* 058 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 059 */ /* 060 */ if (mapelements_isNull) { /* 061 */ serializefromobject_rowWriter.setNullAt(0); /* 062 */ } else { /* 063 */ serializefromobject_rowWriter.write(0, mapelements_value); /* 064 */ } /* 065 */ append(serializefromobject_result); /* 066 */ if (shouldStop()) return; /* 067 */ } /* 068 */ } /* 069 */ } ``` Java bytecode for methods for `i => i * 7` ```java $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class Compiled from "Test.scala" public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable { public static final long serialVersionUID; public final int apply(int); Code: 0: aload_0 1: iload_1 2: invokevirtual #18 // Method apply$mcII$sp:(I)I 5: ireturn public int apply$mcII$sp(int); Code: 0: iload_1 1: bipush 7 3: imul 4: ireturn public final java.lang.Object apply(java.lang.Object); Code: 0: aload_0 1: aload_1 2: invokestatic #29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I 5: invokevirtual #31 // Method apply:(I)I 8: invokestatic #35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer; 11: areturn public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5); Code: 0: aload_0 1: invokespecial #42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V 4: return } ``` ## How was this patch tested? Added new test suites to `DatasetPrimitiveSuite`. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17172 from kiszk/SPARK-19008.
* [SPARK-19886] Fix reportDataLoss if statement in SS KafkaSourceBurak Yavuz2017-03-092-13/+54
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix the `throw new IllegalStateException` if statement part. ## How is this patch tested Regression test Author: Burak Yavuz <brkyvz@gmail.com> Closes #17228 from brkyvz/kafka-cause-fix.
* [SPARK-19611][SQL] Introduce configurable table schema inferenceBudde2017-03-0911-159/+489
| | | | | | | | | | | | | | | | | | | | | | | | | | | ## Summary of changes Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) ## How was this patch tested? The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde <budde@amazon.com> Closes #16944 from budde/SPARK-19611.
* [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc ↵Jeff Zhang2017-03-094-9/+25
| | | | | | | | | | | | file in DataFrameReader.orc Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <zjffdu@apache.org> Closes #10307 from zjffdu/SPARK-12334.
* [SPARK-19861][SS] watermark should not be a negative time.uncleGen2017-03-092-1/+26
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17202 from uncleGen/SPARK-19861.
* [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSourceLiwei Lin2017-03-094-21/+72
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only.
* [SPARK-19793] Use clock.getTimeMillis when mark task as finished in ↵jinxing2017-03-094-7/+17
| | | | | | | | | | | | | | | TaskSetManager. ## What changes were proposed in this pull request? TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct. ## How was this patch tested? Existing tests. Author: jinxing <jinxing6042@126.com> Closes #17133 from jinxing64/SPARK-19793.
* [SPARK-19757][CORE] DriverEndpoint#makeOffers race against ↵Jimmy Xiang2017-03-091-12/+26
| | | | | | | | | | | | | | CoarseGrainedSchedulerBackend#killExecutors ## What changes were proposed in this pull request? While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized. ## How was this patch tested? manual tests Author: Jimmy Xiang <jxiang@apache.org> Closes #17091 from jxiang/spark-19757.
* [SPARK-19561][SQL] add int case handling for TimestampTypeJason White2017-03-092-0/+10
| | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int. These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range. Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3. ## How was this patch tested? Added a new PySpark-side test that fails without the change. The contribution is my original work and I license the work to the project under the project’s open source license. Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks. Author: Jason White <jason.white@shopify.com> Closes #17200 from JasonMWhite/SPARK-19561.
* [SPARK-19763][SQL] qualified external datasource table location stored in ↵windpiger2017-03-098-31/+64
| | | | | | | | | | | | | | | | | | | | | | | | | | | catalog ## What changes were proposed in this pull request? If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog. ``` CREATE TABLE t(a string) USING parquet LOCATION '/path/xx' CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(b) LOCATION '/path/xx' ``` when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx' ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17095 from windpiger/tablepathQualified.
* [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.uncleGen2017-03-082-9/+19
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A follow up to SPARK-19859: - extract the calculation of `delayMs` and reuse it. - update EventTimeWatermarkExec - use the correct `delayMs` in EventTimeWatermark ## How was this patch tested? Jenkins. Author: uncleGen <hustyugm@gmail.com> Closes #17221 from uncleGen/SPARK-19859.
* [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internalShixiong Zhu2017-03-081-0/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #17217 from zsxwing/SPARK-19874.
* [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive MetastoreXiao Li2017-03-083-273/+345
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore. When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed. After this PR, a few changes are made, as summarized below, - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`. - `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`. - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded: 1. The following test cases only make sense for `InMemoryCatalog`: ``` test("desc table for parquet data source table using in-memory catalog") test("create a managed Hive source table") { test("create an external Hive source table") test("Create Hive Table As Select") ``` 2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed: ``` test("alter table: set location (datasource table)") test("alter table: set properties (datasource table)") test("alter table: unset properties (datasource table)") test("alter table: set serde (datasource table)") test("alter table: set serde partition (datasource table)") test("alter table: change column (datasource table)") test("alter table: add partition (datasource table)") test("alter table: drop partition (datasource table)") test("alter table: rename partition (datasource table)") test("drop table - data source table") ``` **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Author: gatorsmile <gatorsmile@gmail.com> Closes #16592 from gatorsmile/refactorDDLSuite.
* [MINOR][SQL] The analyzer rules are fired twice for cases when ↵Dilip Biswal2017-03-081-2/+7
| | | | | | | | | | | | | | | | | AnalysisException is raised from analyzer. ## What changes were proposed in this pull request? In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery. I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it. ## How was this patch tested? Tested manually. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17214 from dilipbiswal/analyis_twice.
* [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in ↵Burak Yavuz2017-03-083-38/+63
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | combination with maxFileAge in FileStreamSource ## What changes were proposed in this pull request? **The Problem** There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days. This causes a problem when both latestFirst = true maxFilesPerTrigger > total files to be processed. Here is what happens in all combinations 1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed. 2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind. 3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing. The bug is with case 3. **The Solution** Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set. ## How was this patch tested? Regression test in `FileStreamSourceSuite` Author: Burak Yavuz <brkyvz@gmail.com> Closes #17153 from brkyvz/maxFileAge.
* [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSVhyukjinkwon2017-03-085-30/+121
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv. It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below: - Case 1 - pre-processing ```scala val df = spark.read.text("...") // Pre-processing with this. spark.read.csv(df.as[String]) ``` - Case 2 - use other input formats ```scala val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength)) spark.read.csv(stringRdd.toDS) ``` ## How was this patch tested? Added tests in `CSVSuite` and build with Scala 2.10. ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16854 from HyukjinKwon/SPARK-15463.
* [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session ↵Kunal Khamar2017-03-0820-236/+981
| | | | | | | | | | | | | | | | has an identical copy of the SessionState Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
* [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow ↵Shixiong Zhu2017-03-0813-107/+485
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid cases ## What changes were proposed in this pull request? Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`. `UnsupportedOperationChecker` is modified to disallow unsupported cases. - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed. - For streaming (map/flatMap)GroupsWithState, see the following table: | Operators | Supported Query Output Mode | | ------------- | ------------- | | flatMapGroupsWithState(Update) without aggregation | Update | | flatMapGroupsWithState(Update) with aggregation | None | | flatMapGroupsWithState(Append) without aggregation | Append | | flatMapGroupsWithState(Append) before aggregation | Append, Update, Complete | | flatMapGroupsWithState(Append) after aggregation | None | | Multiple flatMapGroupsWithState(Append)s | Append | | Multiple mapGroupsWithStates | None | | Mxing mapGroupsWithStates and flatMapGroupsWithStates | None | | Other cases of multiple flatMapGroupsWithState | None | ## How was this patch tested? The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState: ``` [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds) [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds) [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds) [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds) [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #17197 from zsxwing/mapgroups-check.
* [SPARK-19727][SQL] Fix for round function that modifies original columnWojtek Szymanski2017-03-087-25/+54
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix for SQL round function that modifies original column when underlying data frame is created from a local product. import org.apache.spark.sql.functions._ case class NumericRow(value: BigDecimal) val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789")))) df.show() +--------------------+ | value| +--------------------+ |1.234567890000000000| +--------------------+ df.withColumn("value_rounded", round('value)).show() // before +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.000000000000000000| 1| +--------------------+-------------+ // after +--------------------+-------------+ | value|value_rounded| +--------------------+-------------+ |1.234567890000000000| 1| +--------------------+-------------+ ## How was this patch tested? New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite` Author: Wojtek Szymanski <wk.szymanski@gmail.com> Closes #17075 from wojtek-szymanski/SPARK-19727.
* [SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize ↵windpiger2017-03-087-49/+32
| | | | | | | | | | | | | | | some code ## What changes were proposed in this pull request? Currently there are lots of places to make the path qualified, it is better to provide a function to do this, then the code will be more simple. ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #17204 from windpiger/addQualifiledPathUtil.
* [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`Tejas Patil2017-03-081-4/+15
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This is as per suggestion by rxin at : https://github.com/apache/spark/pull/17184#discussion_r104841735 ## How was this patch tested? NA as this is a documentation change Author: Tejas Patil <tejasp@fb.com> Closes #17205 from tejasapatil/SPARK-19843_followup.
* [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled ↵Xiao Li2017-03-087-49/+178
| | | | | | | | | | | | | | | | | | | | | | | | | | | Repartition ### What changes were proposed in this pull request? Observed by felixcheung in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later. Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result. ```Scala val df = spark.range(0, 10000, 1, 5) val df2 = df.repartition(10) assert(df2.coalesce(13).rdd.getNumPartitions == 5) assert(df2.coalesce(7).rdd.getNumPartitions == 5) assert(df2.coalesce(3).rdd.getNumPartitions == 3) ``` This PR is to fix the issue. We preserve shuffle-enabled Repartition. ### How was this patch tested? Added a test case Author: Xiao Li <gatorsmile@gmail.com> Closes #16933 from gatorsmile/CollapseRepartition.
* [SPARK-19865][SQL] remove the view identifier in SubqueryAliasjiangxingbo2017-03-0816-46/+42
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before. ## How was this patch tested? Update the related test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17210 from jiangxb1987/SubqueryAlias.
* [SPARK-17080][SQL] join reorderwangzhenhua2017-03-088-4/+521
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Reorder the joins using a dynamic programming algorithm (Selinger paper): First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them. When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows: ``` level 1: p({A}), p({B}), p({C}), p({D}) level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) level 4: p({A, B, C, D}) ``` where p({A, B, C, D}) is the final output plan. For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17138 from wzhfy/joinReorder.
* [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case ↵Anthony Truchet2017-03-081-3/+15
| | | | | | | | | | | | | | | of exception ## What changes were proposed in this pull request? Ensure broadcasted variable are destroyed even in case of exception ## How was this patch tested? Word2VecSuite was run locally Author: Anthony Truchet <a.truchet@criteo.com> Closes #14299 from AnthonyTruchet/SPARK-16440.
* [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically ↵Yuming Wang2017-03-083-0/+33
| | | | | | | | | | | | | | | converted to spark.sql.shuffle.partitions ## What changes were proposed in this pull request? Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`. ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #17020 from wangyum/SPARK-19693.
* [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports ↵Yanbo Liang2017-03-083-12/+77
| | | | | | | | | | | | | | tweedie distribution. ## What changes were proposed in this pull request? PySpark ```GeneralizedLinearRegression``` supports tweedie distribution. ## How was this patch tested? Add unit tests. Author: Yanbo Liang <ybliang8@gmail.com> Closes #17146 from yanboliang/spark-19806.
* [ML][MINOR] Separate estimator and model params for read/write test.Yanbo Liang2017-03-0823-54/+59
| | | | | | | | | | | | ## What changes were proposed in this pull request? Since we allow ```Estimator``` and ```Model``` not always share same params (see ```ALSParams``` and ```ALSModelParams```), we should pass in test params for estimator and model separately in function ```testEstimatorAndModelReadWrite```. ## How was this patch tested? Existing tests. Author: Yanbo Liang <ybliang8@gmail.com> Closes #17151 from yanboliang/test-rw.
* [SPARK-18055][SQL] Use correct mirror in ExpresionEncoderMichael Armbrust2017-03-082-2/+13
| | | | | | | | | | Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library. This patch changes us to always reflect based on a mirror created using the context classloader. Author: Michael Armbrust <michael@databricks.com> Closes #17201 from marmbrus/replSeqEncoder.
* [SPARK-17629][ML] methods to return synonyms directlyAsher Krim2017-03-072-12/+45
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? provide methods to return synonyms directly, without wrapping them in a dataframe In performance sensitive applications (such as user facing apis) the roundtrip to and from dataframes is costly and unnecessary The methods are named ``findSynonymsArray`` to make the return type clear, which also implies a local datastructure ## How was this patch tested? updated word2vec tests Author: Asher Krim <akrim@hubspot.com> Closes #16811 from Krimit/w2vFindSynonymsLocal.
* [SPARK-19859][SS] The new watermark should override the old oneShixiong Zhu2017-03-072-0/+21
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected. ## How was this patch tested? The new test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17199 from zsxwing/SPARK-19859.
* [SPARK-19841][SS] watermarkPredicate should filter based on keysShixiong Zhu2017-03-072-8/+39
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? `StreamingDeduplicateExec.watermarkPredicate` should filter based on keys. Otherwise, it may generate a wrong answer if the watermark column in `keyExpression` has a different position in the row. `StateStoreSaveExec` has the same codes but its parent can makes sure the watermark column positions in `keyExpression` and `row` are the same. ## How was this patch tested? The added test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17183 from zsxwing/SPARK-19841.
* [SPARK-18389][SQL] Disallow cyclic view referencejiangxingbo2017-03-072-6/+90
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Disallow cyclic view references, a cyclic view reference may be created by the following queries: ``` CREATE VIEW testView AS SELECT id FROM tbl CREATE VIEW testView2 AS SELECT id FROM testView ALTER VIEW testView AS SELECT * FROM testView2 ``` In the above example, a reference cycle (testView -> testView2 -> testView) exsits. We disallow cyclic view references by checking that in ALTER VIEW command, when the `analyzedPlan` contains the same `View` node with the altered view, we should prevent the behavior and throw an AnalysisException. ## How was this patch tested? Test by `SQLViewSuite.test("correctly handle a cyclic view reference")`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17152 from jiangxb1987/cyclic-view.
* [SPARK-19843][SQL] UTF8String => (int / long) conversion expensive for ↵Tejas Patil2017-03-073-82/+247
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | invalid inputs ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-19843 Created wrapper classes (`IntWrapper`, `LongWrapper`) to wrap the result of parsing (which are primitive types). In case of problem in parsing, the method would return a boolean. ## How was this patch tested? - Added new unit tests - Ran a prod job which had conversion from string -> int and verified the outputs ## Performance Tiny regression when all strings are valid integers ``` conversion to int: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------- trunk 502 / 522 33.4 29.9 1.0X SPARK-19843 493 / 503 34.0 29.4 1.0X ``` Huge gain when all strings are invalid integers ``` conversion to int: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------- trunk 33913 / 34219 0.5 2021.4 1.0X SPARK-19843 154 / 162 108.8 9.2 220.0X ``` Author: Tejas Patil <tejasp@fb.com> Closes #17184 from tejasapatil/SPARK-19843_is_numeric_maybe.
* Revert "[SPARK-19561] [PYTHON] cast TimestampType.toInternal output to long"Wenchen Fan2017-03-072-7/+1
| | | | This reverts commit 711addd46e98e42deca97c5b9c0e55fddebaa458.
* [SPARK-19857][YARN] Correctly calculate next credential update time.Marcelo Vanzin2017-03-071-3/+4
| | | | | | | | | | | | Add parentheses so that both lines form a single statement; also add a log message so that the issue becomes more explicit if it shows up again. Tested manually with integration test that exercises the feature. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #17198 from vanzin/SPARK-19857.
* [SPARK-19702][MESOS] Increase default refuse_seconds timeout in the Mesos ↵Michael Gummelt2017-03-077-105/+187
| | | | | | | | | | | | | | | | | | Spark Dispatcher ## What changes were proposed in this pull request? Increase default refuse_seconds timeout, and make it configurable. See JIRA for details on how this reduces the risk of starvation. ## How was this patch tested? Unit tests, Manual testing, and Mesos/Spark integration test suite cc susanxhuynh skonto jmlvanre Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #17031 from mgummelt/SPARK-19702-suppress-revive.
* [SPARK-19561] [PYTHON] cast TimestampType.toInternal output to longJason White2017-03-072-1/+7
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Cast the output of `TimestampType.toInternal` to long to allow for proper Timestamp creation in DataFrames near the epoch. ## How was this patch tested? Added a new test that fails without the change. dongjoon-hyun davies Mind taking a look? The contribution is my original work and I license the work to the project under the project’s open source license. Author: Jason White <jason.white@shopify.com> Closes #16896 from JasonMWhite/SPARK-19561.
* [SPARK-19803][TEST] flaky BlockManagerReplicationSuite test failureuncleGen2017-03-071-5/+5
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? 200ms may be too short. Give more time for replication to happen and new block be reported to master ## How was this patch tested? test manully Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17144 from uncleGen/SPARK-19803.
* [SPARK-19516][DOC] update public doc to use SparkSession instead of SparkContextWenchen Fan2017-03-072-103/+76
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? After Spark 2.0, `SparkSession` becomes the new entry point for Spark applications. We should update the public documents to reflect this. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16856 from cloud-fan/doc.
* [SPARK-17498][ML] StringIndexer enhancement for handling unseen labelsVinceShieh2017-03-074-30/+95
| | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR is an enhancement to ML StringIndexer. Before this PR, String Indexer only supports "skip"/"error" options to deal with unseen records. But those unseen records might still be useful and user would like to keep the unseen labels in certain use cases, This PR enables StringIndexer to support keeping unseen labels as indices [numLabels]. '''Before StringIndexer().setHandleInvalid("skip") StringIndexer().setHandleInvalid("error") '''After support the third option "keep" StringIndexer().setHandleInvalid("keep") ## How was this patch tested? Test added in StringIndexerSuite Signed-off-by: VinceShieh <vincent.xieintel.com> (Please fill in changes proposed in this fix) Author: VinceShieh <vincent.xie@intel.com> Closes #16883 from VinceShieh/spark-17498.
* [SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached ↵Wenchen Fan2017-03-079-98/+119
| | | | | | | | | | | | | | | | | | | | | plans that refer to this table ## What changes were proposed in this pull request? When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. This PR also includes some refactors: 1. use `java.util.LinkedList` to store the cache entries, so that it's safer to remove elements while iterating 2. rename `invalidateCache` to `recacheByPlan`, which is more obvious about what it does. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17097 from cloud-fan/cache.
* [SPARK-19637][SQL] Add to_json in FunctionRegistryTakeshi Yamamuro2017-03-075-2/+136
| | | | | | | | | | | | ## What changes were proposed in this pull request? This pr added entries in `FunctionRegistry` and supported `to_json` in SQL. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #16981 from maropu/SPARK-19637.
* [SPARK-17075][SQL][FOLLOWUP] fix filter estimation issueswangzhenhua2017-03-062-274/+297
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? 1. support boolean type in binary expression estimation. 2. deal with compound Not conditions. 3. avoid convert BigInt/BigDecimal directly to double unless it's within range (0, 1). 4. reorganize test code. ## How was this patch tested? modify related test cases. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17148 from wzhfy/fixFilter.
* [SPARK-19832][SQL] DynamicPartitionWriteTask get partitionPath should escape ↵windpiger2017-03-063-2/+58
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | the partition name ## What changes were proposed in this pull request? Currently in DynamicPartitionWriteTask, when we get the paritionPath of a parition, we just escape the partition value, not escape the partition name. this will cause some problems for some special partition name situation, for example : 1) if the partition name contains '%' etc, there will be two partition path created in the filesytem, one is for escaped path like '/path/a%25b=1', another is for unescaped path like '/path/a%b=1'. and the data inserted stored in unescaped path, while the show partitions table will return 'a%25b=1' which the partition name is escaped. So here it is not consist. And I think the data should be stored in the escaped path in filesystem, which Hive2.0.0 also have the same action. 2) if the partition name contains ':', there will throw exception that new Path("/path","a:b"), this is illegal which has a colon in the relative path. ``` java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: a:b at org.apache.hadoop.fs.Path.initialize(Path.java:205) at org.apache.hadoop.fs.Path.<init>(Path.java:171) at org.apache.hadoop.fs.Path.<init>(Path.java:88) ... 48 elided Caused by: java.net.URISyntaxException: Relative path in absolute URI: a:b at java.net.URI.checkPath(URI.java:1823) at java.net.URI.<init>(URI.java:745) at org.apache.hadoop.fs.Path.initialize(Path.java:202) ... 50 more ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17173 from windpiger/fixDatasourceSpecialCharPartitionName.