aboutsummaryrefslogtreecommitdiff
Commit message (Collapse)AuthorAgeFilesLines
* [SPARK-20146][SQL] fix comment missing issue for thrift serverbomeng2017-03-292-2/+14
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The column comment was missing while constructing the Hive TableSchema. This fix will preserve the original comment. ## How was this patch tested? I have added a new test case to test the column with/without comment. Author: bomeng <bmeng@us.ibm.com> Closes #17470 from bomeng/SPARK-20146.
* [SPARK-19088][SQL] Fix 2.10 build.Takuya UESHIN2017-03-291-1/+2
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Commit 6c70a38 broke the build for scala 2.10. The commit uses some reflections which are not available in Scala 2.10. This PR fixes them. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17473 from ueshin/issues/SPARK-19088.
* [SPARK-20120][SQL] spark-sql support silent modeYuming Wang2017-03-291-0/+5
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? It is similar to Hive silent mode, just show the query result. see: [Hive LanguageManual+Cli](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) and [the implementation of Hive silent mode](https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L948-L950). This PR set the Logger level to `WARN` to get similar result. ## How was this patch tested? manual tests ![manual test spark sql silent mode](https://cloud.githubusercontent.com/assets/5399861/24390165/989b7780-13b9-11e7-8496-6e68f55757e3.gif) Author: Yuming Wang <wgyumg@gmail.com> Closes #17449 from wangyum/SPARK-20120.
* [SPARK-17075][SQL][FOLLOWUP] Add Estimation of Constant LiteralXiao Li2017-03-292-2/+124
| | | | | | | | | | | | | | | | | | | | | | | | | ### What changes were proposed in this pull request? `FalseLiteral` and `TrueLiteral` should have been eliminated by optimizer rule `BooleanSimplification`, but null literals might be added by optimizer rule `NullPropagation`. For safety, our filter estimation should handle all the eligible literal cases. Our optimizer rule BooleanSimplification is unable to remove the null literal in many cases. For example, `a < 0 or null`. Thus, we need to handle null literal in filter estimation. `Not` can be pushed down below `And` and `Or`. Then, we could see two consecutive `Not`, which need to be collapsed into one. Because of the limited expression support for filter estimation, we just need to handle the case `Not(null)` for avoiding incorrect error due to the boolean operation on null. For details, see below matrix. ``` not NULL = NULL NULL or false = NULL NULL or true = true NULL or NULL = NULL NULL and false = false NULL and true = NULL NULL and NULL = NULL ``` ### How was this patch tested? Added the test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17446 from gatorsmile/constantFilterEstimation.
* [SPARK-20009][SQL] Support DDL strings for defining schema in ↵Takeshi Yamamuro2017-03-295-25/+90
| | | | | | | | | | | | | | functions.from_json ## What changes were proposed in this pull request? This pr added `StructType.fromDDL` to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`. ## How was this patch tested? Added tests in `JsonFunctionsSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17406 from maropu/SPARK-20009.
* [SPARK-20048][SQL] Cloning SessionState does not clone query execution listenersKunal Khamar2017-03-299-56/+111
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar <kkhamar@outlook.com> Closes #17379 from kunalkhamar/clone-bugfix.
* [SPARK-19955][PYSPARK] Jenkins Python Conda based test.Holden Karau2017-03-293-28/+47
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Allow Jenkins Python tests to use the installed conda to test Python 2.7 support & test pip installability. ## How was this patch tested? Updated shell scripts, ran tests locally with installed conda, ran tests in Jenkins. Author: Holden Karau <holden@us.ibm.com> Closes #17355 from holdenk/SPARK-19955-support-python-tests-with-conda.
* [SPARK-20059][YARN] Use the correct classloader for HBaseCredentialProviderjerryshao2017-03-293-4/+15
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Currently we use system classloader to find HBase jars, if it is specified by `--jars`, then it will be failed with ClassNotFound issue. So here changing to use child classloader. Also putting added jars and main jar into classpath of submitted application in yarn cluster mode, otherwise HBase jars specified with `--jars` will never be honored in cluster mode, and fetching tokens in client side will always be failed. ## How was this patch tested? Unit test and local verification. Author: jerryshao <sshao@hortonworks.com> Closes #17388 from jerryshao/SPARK-20059.
* [SPARK-19556][CORE] Do not encrypt block manager data in memory.Marcelo Vanzin2017-03-2920-270/+710
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | This change modifies the way block data is encrypted to make the more common cases faster, while penalizing an edge case. As a side effect of the change, all data that goes through the block manager is now encrypted only when needed, including the previous path (broadcast variables) where that did not happen. The way the change works is by not encrypting data that is stored in memory; so if a serialized block is in memory, it will only be encrypted once it is evicted to disk. The penalty comes when transferring that encrypted data from disk. If the data ends up in memory again, it is as efficient as before; but if the evicted block needs to be transferred directly to a remote executor, then there's now a performance penalty, since the code now uses a custom FileRegion implementation to decrypt the data before transferring. This also means that block data transferred between executors now is not encrypted (and thus relies on the network library encryption support for secrecy). Shuffle blocks are still transferred in encrypted form, since they're handled in a slightly different way by the code. This also keeps compatibility with existing external shuffle services, which transfer encrypted shuffle blocks, and avoids having to make the external service aware of encryption at all. The serialization and deserialization APIs in the SerializerManager now do not do encryption automatically; callers need to explicitly wrap their streams with an appropriate crypto stream before using those. As a result of these changes, some of the workarounds added in SPARK-19520 are removed here. Testing: a new trait ("EncryptionFunSuite") was added that provides an easy way to run a test twice, with encryption on and off; broadcast, block manager and caching tests were modified to use this new trait so that the existing tests exercise both encrypted and non-encrypted paths. I also ran some applications with encryption turned on to verify that they still work, including streaming tests that failed without the fix for SPARK-19520. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #17295 from vanzin/SPARK-19556.
* [SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify driver ↵Reynold Xin2017-03-295-17/+34
| | | | | | | | | | | | | | side metric updates ## What changes were proposed in this pull request? It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious. ## How was this patch tested? Updated a test case to use this method. Author: Reynold Xin <rxin@databricks.com> Closes #17464 from rxin/SPARK-20134.
* [SPARK-20040][ML][PYTHON] pyspark wrapper for ChiSquareTestBago Amirbekian2017-03-285-12/+127
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? A pyspark wrapper for spark.ml.stat.ChiSquareTest ## How was this patch tested? unit tests doctests Author: Bago Amirbekian <bago@databricks.com> Closes #17421 from MrBago/chiSquareTestWrapper.
* [SPARK-20043][ML] DecisionTreeModel: ImpurityCalculator builder fails for ↵颜发才(Yan Facai)2017-03-282-1/+15
| | | | | | | | | | | | | | uppercase impurity type Gini Fix bug: DecisionTreeModel can't recongnize Impurity "Gini" when loading TODO: + [x] add unit test + [x] fix the bug Author: 颜发才(Yan Facai) <facai.yan@gmail.com> Closes #17407 from facaiy/BUG/decision_tree_loader_failer_with_Gini_impurity.
* [SPARK-19868] conflict TasksetManager lead to spark stoppedliujianhui2017-03-282-8/+34
| | | | | | | | | | ## What changes were proposed in this pull request? We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets. Author: liujianhui <liujianhui@didichuxing> Closes #17208 from liujianhuiouc/spark-19868.
* [SPARK-20125][SQL] Dataset of type option of map does not workWenchen Fan2017-03-282-0/+11
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? When we build the deserializer expression for map type, we will use `StaticInvoke` to call `ArrayBasedMapData.toScalaMap`, and declare the return type as `scala.collection.immutable.Map`. If the map is inside an Option, we will wrap this `StaticInvoke` with `WrapOption`, which requires the input to be `scala.collect.Map`. Ideally this should be fine, as `scala.collection.immutable.Map` extends `scala.collect.Map`, but our `ObjectType` is too strict about this, this PR fixes it. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17454 from cloud-fan/map.
* [SPARK-19995][YARN] Register tokens to current UGI to avoid re-issuing of ↵jerryshao2017-03-281-0/+3
| | | | | | | | | | | | | | | | | | | | tokens in yarn client mode ## What changes were proposed in this pull request? In the current Spark on YARN code, we will obtain tokens from provided services, but we're not going to add these tokens to the current user's credentials. This will make all the following operations to these services still require TGT rather than delegation tokens. This is unnecessary since we already got the tokens, also this will lead to failure in user impersonation scenario, because the TGT is granted by real user, not proxy user. So here changing to put all the tokens to the current UGI, so that following operations to these services will honor tokens rather than TGT, and this will further handle the proxy user issue mentioned above. ## How was this patch tested? Local verified in secure cluster. vanzin tgravescs mridulm dongjoon-hyun please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #17335 from jerryshao/SPARK-19995.
* [SPARK-20126][SQL] Remove HiveSessionStateHerman van Hovell2017-03-2814-193/+104
| | | | | | | | | | | | ## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17457 from hvanhovell/SPARK-20126.
* [SPARK-20124][SQL] Join reorder should keep the same order of final project ↵wangzhenhua2017-03-283-10/+31
| | | | | | | | | | | | | | | | | attributes ## What changes were proposed in this pull request? Join reorder algorithm should keep exactly the same order of output attributes in the top project. For example, if user want to select a, b, c, after reordering, we should output a, b, c in the same order as specified by user, instead of b, a, c or other orders. ## How was this patch tested? A new test case is added in `JoinReorderSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17453 from wzhfy/keepOrderInProject.
* [SPARK-20094][SQL] Preventing push down of IN subquery to Join operatorwangzhenhua2017-03-282-0/+26
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? TPCDS q45 fails becuase: `ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error. We should prevent push down of IN subquery to Join operator. ## How was this patch tested? Add a new test case in `FilterPushdownSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17428 from wzhfy/noSubqueryInJoinCond.
* [SPARK-20119][TEST-MAVEN] Fix the test case fail in ↵Xiao Li2017-03-281-1/+1
| | | | | | | | | | | | | | DataSourceScanExecRedactionSuite ### What changes were proposed in this pull request? Changed the pattern to match the first n characters in the location field so that the string truncation does not affect it. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17448 from gatorsmile/fixTestCAse.
* [SPARK-19088][SQL] Optimize sequence type deserialization codegenMichal Senkyr2017-03-283-69/+54
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Optimization of arbitrary Scala sequence deserialization introduced by #16240. The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases. This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly. Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`): Before: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean deserializetoobject_resultIsNull; /* 010 */ private java.lang.Object[] deserializetoobject_argValue; /* 011 */ private boolean MapObjects_loopIsNull1; /* 012 */ private int MapObjects_loopValue0; /* 013 */ private boolean deserializetoobject_resultIsNull1; /* 014 */ private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1; /* 015 */ private UnsafeRow deserializetoobject_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 018 */ private scala.collection.immutable.List mapelements_argValue; /* 019 */ private UnsafeRow mapelements_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 022 */ private scala.collection.immutable.List serializefromobject_argValue; /* 023 */ private UnsafeRow serializefromobject_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 027 */ /* 028 */ public GeneratedIterator(Object[] references) { /* 029 */ this.references = references; /* 030 */ } /* 031 */ /* 032 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 033 */ partitionIndex = index; /* 034 */ this.inputs = inputs; /* 035 */ inputadapter_input = inputs[0]; /* 036 */ /* 037 */ deserializetoobject_result = new UnsafeRow(1); /* 038 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 039 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 040 */ /* 041 */ mapelements_result = new UnsafeRow(1); /* 042 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 043 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 044 */ /* 045 */ serializefromobject_result = new UnsafeRow(1); /* 046 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 047 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 048 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 054 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 055 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 056 */ /* 057 */ deserializetoobject_resultIsNull = false; /* 058 */ /* 059 */ if (!deserializetoobject_resultIsNull) { /* 060 */ ArrayData deserializetoobject_value3 = null; /* 061 */ /* 062 */ if (!false) { /* 063 */ Integer[] deserializetoobject_convertedArray = null; /* 064 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 065 */ deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength]; /* 066 */ /* 067 */ int deserializetoobject_loopIndex = 0; /* 068 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 069 */ MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 070 */ MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 071 */ /* 072 */ if (MapObjects_loopIsNull1) { /* 073 */ throw new RuntimeException(((java.lang.String) references[0])); /* 074 */ } /* 075 */ if (false) { /* 076 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 077 */ } else { /* 078 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_loopIndex += 1; /* 082 */ } /* 083 */ /* 084 */ deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /* 085 */ } /* 086 */ boolean deserializetoobject_isNull2 = true; /* 087 */ java.lang.Object[] deserializetoobject_value2 = null; /* 088 */ if (!false) { /* 089 */ deserializetoobject_isNull2 = false; /* 090 */ if (!deserializetoobject_isNull2) { /* 091 */ Object deserializetoobject_funcResult = null; /* 092 */ deserializetoobject_funcResult = deserializetoobject_value3.array(); /* 093 */ if (deserializetoobject_funcResult == null) { /* 094 */ deserializetoobject_isNull2 = true; /* 095 */ } else { /* 096 */ deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult; /* 097 */ } /* 098 */ /* 099 */ } /* 100 */ deserializetoobject_isNull2 = deserializetoobject_value2 == null; /* 101 */ } /* 102 */ deserializetoobject_resultIsNull = deserializetoobject_isNull2; /* 103 */ deserializetoobject_argValue = deserializetoobject_value2; /* 104 */ } /* 105 */ /* 106 */ boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull; /* 107 */ final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue); /* 108 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 109 */ boolean deserializetoobject_isNull = true; /* 110 */ scala.collection.immutable.List deserializetoobject_value = null; /* 111 */ if (!deserializetoobject_isNull1) { /* 112 */ deserializetoobject_resultIsNull1 = false; /* 113 */ /* 114 */ if (!deserializetoobject_resultIsNull1) { /* 115 */ boolean deserializetoobject_isNull6 = false; /* 116 */ final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom(); /* 117 */ deserializetoobject_isNull6 = deserializetoobject_value6 == null; /* 118 */ deserializetoobject_resultIsNull1 = deserializetoobject_isNull6; /* 119 */ deserializetoobject_argValue1 = deserializetoobject_value6; /* 120 */ } /* 121 */ /* 122 */ deserializetoobject_isNull = deserializetoobject_resultIsNull1; /* 123 */ if (!deserializetoobject_isNull) { /* 124 */ Object deserializetoobject_funcResult1 = null; /* 125 */ deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1); /* 126 */ if (deserializetoobject_funcResult1 == null) { /* 127 */ deserializetoobject_isNull = true; /* 128 */ } else { /* 129 */ deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1; /* 130 */ } /* 131 */ /* 132 */ } /* 133 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 134 */ } /* 135 */ /* 136 */ boolean mapelements_isNull = true; /* 137 */ scala.collection.immutable.List mapelements_value = null; /* 138 */ if (!false) { /* 139 */ mapelements_argValue = deserializetoobject_value; /* 140 */ /* 141 */ mapelements_isNull = false; /* 142 */ if (!mapelements_isNull) { /* 143 */ Object mapelements_funcResult = null; /* 144 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 145 */ if (mapelements_funcResult == null) { /* 146 */ mapelements_isNull = true; /* 147 */ } else { /* 148 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 149 */ } /* 150 */ /* 151 */ } /* 152 */ mapelements_isNull = mapelements_value == null; /* 153 */ } /* 154 */ /* 155 */ if (mapelements_isNull) { /* 156 */ throw new RuntimeException(((java.lang.String) references[2])); /* 157 */ } /* 158 */ serializefromobject_argValue = mapelements_value; /* 159 */ /* 160 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 161 */ serializefromobject_holder.reset(); /* 162 */ /* 163 */ // Remember the current cursor so that we can calculate how many bytes are /* 164 */ // written later. /* 165 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 166 */ /* 167 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 168 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 169 */ // grow the global buffer before writing data. /* 170 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 171 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 172 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 173 */ /* 174 */ } else { /* 175 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 176 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 177 */ /* 178 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 179 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 180 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 181 */ } else { /* 182 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 183 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 184 */ } /* 185 */ } /* 186 */ } /* 187 */ /* 188 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 189 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 190 */ append(serializefromobject_result); /* 191 */ if (shouldStop()) return; /* 192 */ } /* 193 */ } /* 194 */ } ``` After: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjects_loopIsNull1; /* 010 */ private int CollectObjects_loopValue0; /* 011 */ private UnsafeRow deserializetoobject_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 014 */ private scala.collection.immutable.List mapelements_argValue; /* 015 */ private UnsafeRow mapelements_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 018 */ private scala.collection.immutable.List serializefromobject_argValue; /* 019 */ private UnsafeRow serializefromobject_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 023 */ /* 024 */ public GeneratedIterator(Object[] references) { /* 025 */ this.references = references; /* 026 */ } /* 027 */ /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 029 */ partitionIndex = index; /* 030 */ this.inputs = inputs; /* 031 */ inputadapter_input = inputs[0]; /* 032 */ /* 033 */ deserializetoobject_result = new UnsafeRow(1); /* 034 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 035 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 036 */ /* 037 */ mapelements_result = new UnsafeRow(1); /* 038 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 039 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 040 */ /* 041 */ serializefromobject_result = new UnsafeRow(1); /* 042 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 043 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 044 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 045 */ /* 046 */ } /* 047 */ /* 048 */ protected void processNext() throws java.io.IOException { /* 049 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ ArrayData inputadapter_value = inputadapter_row.getArray(0); /* 052 */ /* 053 */ scala.collection.immutable.List deserializetoobject_value = null; /* 054 */ /* 055 */ if (!false) { /* 056 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 057 */ scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder(); /* 058 */ CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength); /* 059 */ /* 060 */ int deserializetoobject_loopIndex = 0; /* 061 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 062 */ CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex)); /* 063 */ CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 064 */ /* 065 */ if (CollectObjects_loopIsNull1) { /* 066 */ throw new RuntimeException(((java.lang.String) references[0])); /* 067 */ } /* 068 */ if (false) { /* 069 */ CollectObjects_builderValue2.$plus$eq(null); /* 070 */ } else { /* 071 */ CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0); /* 072 */ } /* 073 */ /* 074 */ deserializetoobject_loopIndex += 1; /* 075 */ } /* 076 */ /* 077 */ deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result(); /* 078 */ } /* 079 */ /* 080 */ boolean mapelements_isNull = true; /* 081 */ scala.collection.immutable.List mapelements_value = null; /* 082 */ if (!false) { /* 083 */ mapelements_argValue = deserializetoobject_value; /* 084 */ /* 085 */ mapelements_isNull = false; /* 086 */ if (!mapelements_isNull) { /* 087 */ Object mapelements_funcResult = null; /* 088 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 089 */ if (mapelements_funcResult == null) { /* 090 */ mapelements_isNull = true; /* 091 */ } else { /* 092 */ mapelements_value = (scala.collection.immutable.List) mapelements_funcResult; /* 093 */ } /* 094 */ /* 095 */ } /* 096 */ mapelements_isNull = mapelements_value == null; /* 097 */ } /* 098 */ /* 099 */ if (mapelements_isNull) { /* 100 */ throw new RuntimeException(((java.lang.String) references[2])); /* 101 */ } /* 102 */ serializefromobject_argValue = mapelements_value; /* 103 */ /* 104 */ final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 105 */ serializefromobject_holder.reset(); /* 106 */ /* 107 */ // Remember the current cursor so that we can calculate how many bytes are /* 108 */ // written later. /* 109 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 110 */ /* 111 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 112 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 113 */ // grow the global buffer before writing data. /* 114 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 115 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 116 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 117 */ /* 118 */ } else { /* 119 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 120 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 121 */ /* 122 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 123 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 124 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index); /* 125 */ } else { /* 126 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index); /* 127 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 128 */ } /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 133 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 134 */ append(serializefromobject_result); /* 135 */ if (shouldStop()) return; /* 136 */ } /* 137 */ } /* 138 */ } ``` Benchmark results before: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 269 / 370 0.0 269125.8 1.0X List 154 / 176 0.0 154453.5 1.7X mutable.Queue 210 / 233 0.0 209691.6 1.3X ``` Benchmark results after: ``` OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Seq 255 / 316 0.0 254697.3 1.0X List 152 / 177 0.0 152410.0 1.7X mutable.Queue 213 / 235 0.0 213470.0 1.2X ``` ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Additionally in Spark Shell: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` Author: Michal Senkyr <mike.senkyr@gmail.com> Closes #16541 from michalsenkyr/dataset-seq-builder.
* [SPARK-20100][SQL] Refactor SessionState initializationHerman van Hovell2017-03-2813-572/+547
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17433 from hvanhovell/SPARK-20100.
* [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logicTathagata Das2017-03-273-12/+19
| | | | | | | | | | | | | ## What changes were proposed in this pull request? Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null. ## How was this patch tested? Existing tests pass. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17444 from tdas/SPARK-19876-1.
* [SPARK-19803][CORE][TEST] Proactive replication test failuresShubham Chopra2017-03-283-12/+29
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Executors cache a list of their peers that is refreshed by default every minute. The cached stale references were randomly being used for replication. Since those executors were removed from the master, they did not occur in the block locations as reported by the master. This was fixed by 1. Refreshing peer cache in the block manager before trying to pro-actively replicate. This way the probability of replicating to a failed executor is eliminated. 2. Explicitly stopping the block manager in the tests. This shuts down the RPC endpoint use by the block manager. This way, even if a block manager tries to replicate using a stale reference, the replication logic should take care of refreshing the list of peers after failure. ## How was this patch tested? Tested manually Author: Shubham Chopra <schopra31@bloomberg.net> Author: Kay Ousterhout <kayousterhout@gmail.com> Author: Shubham Chopra <shubhamchopra@users.noreply.github.com> Closes #17325 from shubhamchopra/SPARK-19803.
* [MINOR][SPARKR] Move 'Data type mapping between R and Spark' to right place ↵Yanbo Liang2017-03-271-69/+69
| | | | | | | | | | | | | | | | | in SparkR doc. Section ```Data type mapping between R and Spark``` was put in the wrong place in SparkR doc currently, we should move it to a separate section. ## What changes were proposed in this pull request? Before this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/24340911/bc01a532-126a-11e7-9a08-0d60d13a547c.png) After this PR: ![image](https://cloud.githubusercontent.com/assets/1962026/24340938/d9d32a9a-126a-11e7-8891-d2f5b46e0c71.png) Author: Yanbo Liang <ybliang8@gmail.com> Closes #17440 from yanboliang/sparkr-doc.
* [SPARK-20105][TESTS][R] Add tests for checkType and type string in ↵hyukjinkwon2017-03-271-0/+53
| | | | | | | | | | | | | | | | | | structField in R ## What changes were proposed in this pull request? It seems `checkType` and the type string in `structField` are not being tested closely. This string format currently seems SparkR-specific (see https://github.com/apache/spark/blob/d1f6c64c4b763c05d6d79ae5497f298dc3835f3e/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L93-L131) but resembles SQL type definition. Therefore, it seems nicer if we test positive/negative cases in R side. ## How was this patch tested? Unit tests in `test_sparkSQL.R`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17439 from HyukjinKwon/r-typestring-tests.
* [SPARK-20102] Fix nightly packaging and RC packaging scripts w/ two minor ↵Josh Rosen2017-03-272-5/+4
| | | | | | | | | | | | | | | | | | | build fixes ## What changes were proposed in this pull request? The master snapshot publisher builds are currently broken due to two minor build issues: 1. For unknown reasons, the LFTP `mkdir -p` command began throwing errors when the remote directory already exists. This change of behavior might have been caused by configuration changes in the ASF's SFTP server, but I'm not entirely sure of that. To work around this problem, this patch updates the script to ignore errors from the `lftp mkdir -p` commands. 2. The PySpark `setup.py` file references a non-existent `pyspark.ml.stat` module, causing Python packaging to fail by complaining about a missing directory. The fix is to simply drop that line from the setup script. ## How was this patch tested? The LFTP fix was tested by manually running the failing commands on AMPLab Jenkins against the ASF SFTP server. The PySpark fix was tested locally. Author: Josh Rosen <joshrosen@databricks.com> Closes #17437 from JoshRosen/spark-20102.
* [SPARK-20088] Do not create new SparkContext in SparkR createSparkContextHossein2017-03-271-1/+1
| | | | | | | | | | | | ## What changes were proposed in this pull request? Instead of creating new `JavaSparkContext` we use `SparkContext.getOrCreate`. ## How was this patch tested? Existing tests Author: Hossein <hossein@databricks.com> Closes #17423 from falaki/SPARK-20088.
* [SPARK-20104][SQL] Don't estimate IsNull or IsNotNull predicates for ↵wangzhenhua2017-03-272-4/+33
| | | | | | | | | | | | | | | | non-leaf node ## What changes were proposed in this pull request? In current stage, we don't have advanced statistics such as sketches or histograms. As a result, some operator can't estimate `nullCount` accurately. E.g. left outer join estimation does not accurately update `nullCount` currently. So for `IsNull` and `IsNotNull` predicates, we only estimate them when the child is a leaf node, whose `nullCount` is accurate. ## How was this patch tested? A new test case is added in `FilterEstimationSuite`. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17438 from wzhfy/nullEstimation.
* [MINOR][DOCS] Match several documentation changes in Scala to R/Pythonhyukjinkwon2017-03-263-7/+15
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? This PR proposes to match minor documentations changes in https://github.com/apache/spark/pull/17399 and https://github.com/apache/spark/pull/17380 to R/Python. ## How was this patch tested? Manual tests in Python , Python tests via `./python/run-tests.py --module=pyspark-sql` and lint-checks for Python/R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17429 from HyukjinKwon/minor-match-doc.
* [SPARK-19281][PYTHON][ML] spark.ml Python API for FPGrowthzero3232017-03-264-9/+273
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? - Add `HasSupport` and `HasConfidence` `Params`. - Add new module `pyspark.ml.fpm`. - Add `FPGrowth` / `FPGrowthModel` wrappers. - Provide tests for new features. ## How was this patch tested? Unit tests. Author: zero323 <zero323@users.noreply.github.com> Closes #17218 from zero323/SPARK-19281.
* [SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windowsHerman van Hovell2017-03-262-3/+16
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime. This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule. ## How was this patch tested? Added a new test case to `CollapseWindowSuite` Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17432 from hvanhovell/SPARK-20086.
* logging improvementsJuan Rodriguez Hortala2017-03-263-3/+5
| | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Adding additional information to existing logging messages: - YarnAllocator: log the executor ID together with the container id when a container for an executor is launched. - NettyRpcEnv: log the receiver address when there is a timeout waiting for an answer to a remote call. - ExecutorAllocationManager: fix a typo in the logging message for the list of executors to be removed. ## How was this patch tested? Build spark and submit the word count example to a YARN cluster using cluster mode Author: Juan Rodriguez Hortala <hortala@amazon.com> Closes #17411 from juanrh/logging-improvements.
* [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding ↵Kazuaki Ishizaki2017-03-261-4/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | sqlContext.read.parquet() ## What changes were proposed in this pull request? This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local variables 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x. Benchmark program: ```java val dir = "/dev/shm/parquet" val N = 1000 * 1000 * 40 val iters = 20 val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds) sparkSession.range(n).write.mode("overwrite").parquet(dir) benchmark.addCase("count") { i: Int => var n = 0 var len = 0L while (n < iters) { len += sparkSession.read.parquet(dir).count n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1152 / 1211 694.7 1.4 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 1053 / 1121 760.0 1.3 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ while (scan_batchIdx < numRows) { /* 057 */ int scan_rowIdx = scan_batchIdx++; /* 058 */ // do aggregate /* 059 */ // common sub-expressions /* 060 */ /* 061 */ // evaluate aggregate function /* 062 */ boolean agg_isNull1 = false; /* 063 */ /* 064 */ long agg_value1 = -1L; /* 065 */ agg_value1 = agg_bufValue + 1L; /* 066 */ // update aggregation buffer /* 067 */ agg_bufIsNull = false; /* 068 */ agg_bufValue = agg_value1; /* 069 */ if (shouldStop()) return; /* 070 */ } /* 071 */ scan_batch = null; /* 072 */ scan_nextBatch(); /* 073 */ } /* 074 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000)); /* 075 */ scan_scanTime1 = 0; /* 076 */ /* 077 */ } /* 078 */ /* 079 */ private void scan_nextBatch() throws java.io.IOException { /* 080 */ long getBatchStart = System.nanoTime(); /* 081 */ if (scan_input.hasNext()) { /* 082 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 083 */ scan_numOutputRows.add(scan_batch.numRows()); /* 084 */ scan_batchIdx = 0; /* 085 */ /* 086 */ } /* 087 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 088 */ } /* 089 */ /* 090 */ protected void processNext() throws java.io.IOException { /* 091 */ while (!agg_initAgg) { /* 092 */ agg_initAgg = true; /* 093 */ long agg_beforeAgg = System.nanoTime(); /* 094 */ agg_doAggregateWithoutKey(); /* 095 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 096 */ /* 097 */ // output the result /* 098 */ /* 099 */ agg_numOutputRows.add(1); /* 100 */ agg_rowWriter.zeroOutNullBytes(); /* 101 */ /* 102 */ if (agg_bufIsNull) { /* 103 */ agg_rowWriter.setNullAt(0); /* 104 */ } else { /* 105 */ agg_rowWriter.write(0, agg_bufValue); /* 106 */ } /* 107 */ append(agg_result); /* 108 */ } /* 109 */ } /* 110 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ int scan_localEnd = numRows - scan_batchIdx; /* 057 */ for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) { /* 058 */ int scan_rowIdx = scan_batchIdx + scan_localIdx; /* 059 */ // do aggregate /* 060 */ // common sub-expressions /* 061 */ /* 062 */ // evaluate aggregate function /* 063 */ boolean agg_isNull1 = false; /* 064 */ /* 065 */ long agg_value1 = -1L; /* 066 */ agg_value1 = agg_bufValue + 1L; /* 067 */ // update aggregation buffer /* 068 */ agg_bufIsNull = false; /* 069 */ agg_bufValue = agg_value1; /* 070 */ // shouldStop check is eliminated /* 071 */ } /* 072 */ scan_batchIdx = numRows; /* 073 */ scan_batch = null; /* 074 */ scan_nextBatch(); /* 075 */ } /* 079 */ } /* 080 */ /* 081 */ private void scan_nextBatch() throws java.io.IOException { /* 082 */ long getBatchStart = System.nanoTime(); /* 083 */ if (scan_input.hasNext()) { /* 084 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 085 */ scan_numOutputRows.add(scan_batch.numRows()); /* 086 */ scan_batchIdx = 0; /* 087 */ /* 088 */ } /* 089 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 090 */ } /* 091 */ /* 092 */ protected void processNext() throws java.io.IOException { /* 093 */ while (!agg_initAgg) { /* 094 */ agg_initAgg = true; /* 095 */ long agg_beforeAgg = System.nanoTime(); /* 096 */ agg_doAggregateWithoutKey(); /* 097 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 098 */ /* 099 */ // output the result /* 100 */ /* 101 */ agg_numOutputRows.add(1); /* 102 */ agg_rowWriter.zeroOutNullBytes(); /* 103 */ /* 104 */ if (agg_bufIsNull) { /* 105 */ agg_rowWriter.setNullAt(0); /* 106 */ } else { /* 107 */ agg_rowWriter.write(0, agg_bufValue); /* 108 */ } /* 109 */ append(agg_result); /* 110 */ } /* 111 */ } /* 112 */ } ``` ## How was this patch tested? Tested existing test suites Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17378 from kiszk/SPARK-20046.
* [SPARK-20092][R][PROJECT INFRA] Add the detection for Scala codes dedicated ↵hyukjinkwon2017-03-251-0/+3
| | | | | | | | | | | | | | | | | | | | | | | | for R in AppVeyor tests ## What changes were proposed in this pull request? We are currently detecting the changes in `R/` directory only and then trigger AppVeyor tests. It seems we need to tests when there are Scala codes dedicated for R in `core/src/main/scala/org/apache/spark/api/r/`, `sql/core/src/main/scala/org/apache/spark/sql/api/r/` and `mllib/src/main/scala/org/apache/spark/ml/r/` too. This will enables the tests, for example, for SPARK-20088. ## How was this patch tested? Tests with manually created PRs. - Changes in `sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala` https://github.com/spark-test/spark/pull/13 - Changes in `core/src/main/scala/org/apache/spark/api/r/SerDe.scala` https://github.com/spark-test/spark/pull/12 - Changes in `README.md` https://github.com/spark-test/spark/pull/14 Author: hyukjinkwon <gurwls223@gmail.com> Closes #17427 from HyukjinKwon/SPARK-20092.
* [SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql coreWenchen Fan2017-03-255-18/+39
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #17408 from cloud-fan/minor.
* [SPARK-17137][ML][WIP] Compress logistic regression coefficientssethah2017-03-252-37/+49
| | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Use the new `compressed` method on matrices to store the logistic regression coefficients as sparse or dense - whichever is requires less memory. Marked as WIP so we can add some performance test results. Basically, we should see if prediction is slower because of using a sparse matrix over a dense one. This can happen since sparse matrices do not use native BLAS operations when computing the margins. ## How was this patch tested? Unit tests added. Author: sethah <seth.hendrickson16@gmail.com> Closes #17426 from sethah/SPARK-17137.
* [SPARK-20078][MESOS] Mesos executor configurability for task name and labelsKalvin Chau2017-03-252-1/+13
| | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Adding configurable mesos executor names and labels using `spark.mesos.task.name` and `spark.mesos.task.labels`. Labels were defined as `k1:v1,k2:v2`. mgummelt ## How was this patch tested? Added unit tests to verify labels were added correctly, with incorrect labels being ignored and added a test to test the name of the executor. Tested with: `./build/sbt -Pmesos mesos/test` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Kalvin Chau <kalvin.chau@viasat.com> Closes #17404 from kalvinnchau/mesos-config.
* [HOTFIX][SQL] Fix the failed test cases in GeneratorFunctionSuiteXiao Li2017-03-241-6/+6
| | | | | | | | | | | | | | ### What changes were proposed in this pull request? Multiple tests failed. Revert the changes on `supportCodegen` of `GenerateExec`. For example, - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75194/testReport/ ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17425 from gatorsmile/turnOnCodeGenGenerateExec.
* [SPARK-20070][SQL] Fix 2.10 buildHerman van Hovell2017-03-252-3/+3
| | | | | | | | | | | | ## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/91fa80fe8a2480d64c430bd10f97b3d44c007bcc broke the build for scala 2.10. The commit uses `Regex.regex` field which is not available in Scala 2.10. This PR fixes this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17420 from hvanhovell/SPARK-20070-2.0.
* [DOCS] Clarify round mode for format_number & round functionsRoxanne Moslehi2017-03-251-5/+5
| | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Updated the description for the `format_number` description to indicate that it uses `HALF_EVEN` rounding. Updated the description for the `round` description to indicate that it uses `HALF_UP` rounding. ## How was this patch tested? Just changing the two function comments so no testing involved. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Roxanne Moslehi <rmoslehi@palantir.com> Author: roxannemoslehi <rmoslehi@berkeley.edu> Closes #17399 from roxannemoslehi/patch-1.
* [SPARK-19846][SQL] Add a flag to disable constraint propagationLiang-Chi Hsieh2017-03-2513-20/+158
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins. Compared with previous PRs #16998, #16785, this is a much simpler option: add a flag to disable constraint propagation. ### Benchmark Run the following codes locally. import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} import org.apache.spark.sql.internal.SQLConf spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false) val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0")) val indexers = df.columns.tail.map(c => new StringIndexer() .setInputCol(c) .setOutputCol(s"${c}_indexed") .setHandleInvalid("skip")) val encoders = indexers.map(indexer => new OneHotEncoder() .setInputCol(indexer.getOutputCol) .setOutputCol(s"${indexer.getOutputCol}_encoded") .setDropLast(true)) val stages: Array[PipelineStage] = indexers ++ encoders val pipeline = new Pipeline().setStages(stages) val startTime = System.nanoTime pipeline.fit(df).transform(df).show val runningTime = System.nanoTime - startTime Before this patch: 1786001 ms ~= 30 mins After this patch: 26392 ms = less than half of a minute Related PRs: #16998, #16785. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17186 from viirya/add-flag-disable-constraint-propagation.
* Disable generate codegen since it fails my workload.Reynold Xin2017-03-242-29/+1
|
* [SPARK-20070][SQL] Redact DataSourceScanExec treeStringHerman van Hovell2017-03-246-24/+138
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? The explain output of `DataSourceScanExec` can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non privileged users. This PR addresses this by adding a redaction facility for the `DataSourceScanExec.treeString`. A user can enable this by setting a regex in the `spark.redaction.string.regex` configuration. ## How was this patch tested? Added a unit test to check the output of DataSourceScanExec. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17397 from hvanhovell/SPARK-20070.
* [SPARK-17471][ML] Add compressed method to ML matricessethah2017-03-244-46/+673
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? This patch adds a `compressed` method to ML `Matrix` class, which returns the minimal storage representation of the matrix - either sparse or dense. Because the space occupied by a sparse matrix is dependent upon its layout (i.e. column major or row major), this method must consider both cases. It may also be useful to force the layout to be column or row major beforehand, so an overload is added which takes in a `columnMajor: Boolean` parameter. The compressed implementation relies upon two new abstract methods `toDense(columnMajor: Boolean)` and `toSparse(columnMajor: Boolean)`, similar to the compressed method implemented in the `Vector` class. These methods also allow the layout of the resulting matrix to be specified via the `columnMajor` parameter. More detail on the new methods is given below. ## How was this patch tested? Added many new unit tests ## New methods (summary, not exhaustive list) **Matrix trait** - `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` (abstract) - converts the matrix (either sparse or dense) to dense format - `private[ml] def toSparseMatrix(columnMajor: Boolean): SparseMatrix` (abstract) - converts the matrix (either sparse or dense) to sparse format - `def toDense: DenseMatrix = toDense(true)` - converts the matrix (either sparse or dense) to dense format in column major layout - `def toSparse: SparseMatrix = toSparse(true)` - converts the matrix (either sparse or dense) to sparse format in column major layout - `def compressed: Matrix` - finds the minimum space representation of this matrix, considering both column and row major layouts, and converts it - `def compressed(columnMajor: Boolean): Matrix` - finds the minimum space representation of this matrix considering only column OR row major, and converts it **DenseMatrix class** - `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` - converts the dense matrix to a dense matrix, optionally changing the layout (data is NOT duplicated if the layouts are the same) - `private[ml] def toSparseMatrix(columnMajor: Boolean): SparseMatrix` - converts the dense matrix to sparse matrix, using the specified layout **SparseMatrix class** - `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` - converts the sparse matrix to a dense matrix, using the specified layout - `private[ml] def toSparseMatrix(columnMajors: Boolean): SparseMatrix` - converts the sparse matrix to sparse matrix. If the sparse matrix contains any explicit zeros, they are removed. If the layout requested does not match the current layout, data is copied to a new representation. If the layouts match and no explicit zeros exist, the current matrix is returned. Author: sethah <seth.hendrickson16@gmail.com> Closes #15628 from sethah/matrix_compress.
* [SPARK-19911][STREAMING] Add builder interface for Kinesis DStreamsAdam Budde2017-03-2411-149/+749
| | | | | | | | | | | | | | | | | | ## What changes were proposed in this pull request? - Add new KinesisDStream.scala containing KinesisDStream.Builder class - Add KinesisDStreamBuilderSuite test suite - Make KinesisInputDStream ctor args package private for testing - Add JavaKinesisDStreamBuilderSuite test suite - Add args to KinesisInputDStream and KinesisReceiver for optional service-specific auth (Kinesis, DynamoDB and CloudWatch) ## How was this patch tested? Added ```KinesisDStreamBuilderSuite``` to verify builder class works as expected Author: Adam Budde <budde@amazon.com> Closes #17250 from budde/KinesisStreamBuilder.
* [SQL][MINOR] Fix for typo in AnalyzerJacek Laskowski2017-03-241-1/+1
| | | | | | | | | | | | | | ## What changes were proposed in this pull request? Fix for typo in Analyzer ## How was this patch tested? local build Author: Jacek Laskowski <jacek@japila.pl> Closes #17409 from jaceklaskowski/analyzer-typo.
* [SPARK-15040][ML][PYSPARK] Add Imputer to PySparkNick Pentreath2017-03-243-5/+175
| | | | | | | | | | | | Add Python wrapper for `Imputer` feature transformer. ## How was this patch tested? New doc tests and tweak to PySpark ML `tests.py` Author: Nick Pentreath <nickp@za.ibm.com> Closes #17316 from MLnick/SPARK-15040-pyspark-imputer.
* [SPARK-19970][SQL][FOLLOW-UP] Table owner should be USER instead of ↵Xiao Li2017-03-241-13/+13
| | | | | | | | | | | | | | | | | PRINCIPAL in kerberized clusters #17311 ### What changes were proposed in this pull request? This is a follow-up for the PR: https://github.com/apache/spark/pull/17311 - For safety, use `sessionState` to get the user name, instead of calling `SessionState.get()` in the function `toHiveTable`. - Passing `user names` instead of `conf` when calling `toHiveTable`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #17405 from gatorsmile/user.
* [SPARK-19820][CORE] Add interface to kill tasks w/ a reasonEric Liang2017-03-2343-115/+289
| | | | | | | | | | | | | | | | | | | | | | | | This commit adds a killTaskAttempt method to SparkContext, to allow users to kill tasks so that they can be re-scheduled elsewhere. This also refactors the task kill path to allow specifying a reason for the task kill. The reason is propagated opaquely through events, and will show up in the UI automatically as `(N killed: $reason)` and `TaskKilled: $reason`. Without this change, there is no way to provide the user feedback through the UI. Currently used reasons are "stage cancelled", "another attempt succeeded", and "killed via SparkContext.killTask". The user can also specify a custom reason through `SparkContext.killTask`. cc rxin In the stage overview UI the reasons are summarized: ![1](https://cloud.githubusercontent.com/assets/14922/23929209/a83b2862-08e1-11e7-8b3e-ae1967bbe2e5.png) Within the stage UI you can see individual task kill reasons: ![2](https://cloud.githubusercontent.com/assets/14922/23929200/9a798692-08e1-11e7-8697-72b27ad8a287.png) Existing tests, tried killing some stages in the UI and verified the messages are as expected. Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekl@google.com> Closes #17166 from ericl/kill-reason.
* [SPARK-16929] Improve performance when check speculatable tasks.jinxing2017-03-235-6/+176
| | | | | | | | | | | | | ## What changes were proposed in this pull request? 1. Use a MedianHeap to record durations of successful tasks. When check speculatable tasks, we can get the median duration with O(1) time complexity. 2. `checkSpeculatableTasks` will synchronize `TaskSchedulerImpl`. If `checkSpeculatableTasks` doesn't finish with 100ms, then the possibility exists for that thread to release and then immediately re-acquire the lock. Change `scheduleAtFixedRate` to be `scheduleWithFixedDelay` when call method of `checkSpeculatableTasks`. ## How was this patch tested? Added MedianHeapSuite. Author: jinxing <jinxing6042@126.com> Closes #16867 from jinxing64/SPARK-16929.