From e97fc7f176f8bf501c9b3afd8410014e3b0e1602 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 3 Mar 2016 09:54:09 +0000 Subject: [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x ## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen Closes #11292 from srowen/SPARK-13423. --- .../sql/execution/vectorized/ColumnarBatch.java | 1 - .../apache/spark/sql/ContinuousQueryManager.scala | 5 ++--- .../scala/org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../sql/execution/columnar/ColumnAccessor.scala | 3 +++ .../spark/sql/execution/columnar/ColumnType.scala | 4 +++- .../columnar/InMemoryColumnarTableScan.scala | 2 +- .../sql/execution/datasources/csv/CSVRelation.scala | 10 +++++----- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 6 +++--- .../apache/spark/sql/execution/ui/SQLListener.scala | 2 +- .../org/apache/spark/sql/JavaDataFrameSuite.java | 16 ++++++++-------- .../test/org/apache/spark/sql/JavaDatasetSuite.java | 21 +++++++++------------ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 15 +++++++-------- 13 files changed, 44 insertions(+), 45 deletions(-) (limited to 'sql/core/src') diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 8a0d7f8b12..2a78058838 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -251,7 +251,6 @@ public final class ColumnarBatch { @Override public Row next() { - assert(hasNext()); while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { ++rowId; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 13142d0e61..0a156ea99a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -55,9 +55,8 @@ class ContinuousQueryManager(sqlContext: SQLContext) { * @since 2.0.0 */ def get(name: String): ContinuousQuery = activeQueriesLock.synchronized { - activeQueries.get(name).getOrElse { - throw new IllegalArgumentException(s"There is no active query with name $name") - } + activeQueries.getOrElse(name, + throw new IllegalArgumentException(s"There is no active query with name $name")) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 68a251757c..d8af799d89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -94,7 +94,7 @@ private[r] object SQLUtils { } def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = { - val num = schema.fields.size + val num = schema.fields.length val rowRDD = rdd.map(bytesToRow(_, schema)) sqlContext.createDataFrame(rowRDD, schema) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index fee36f6023..78664baa56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.columnar import java.nio.{ByteBuffer, ByteOrder} +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor import org.apache.spark.sql.types._ @@ -120,6 +122,7 @@ private[columnar] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType) with NullableColumnAccessor private[columnar] object ColumnAccessor { + @tailrec def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = { val buf = buffer.order(ByteOrder.nativeOrder) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index 9c908b2877..3ec01185c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.columnar import java.math.{BigDecimal, BigInteger} import java.nio.ByteBuffer +import scala.annotation.tailrec import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.InternalRow @@ -548,7 +549,7 @@ private[columnar] object LARGE_DECIMAL { private[columnar] case class STRUCT(dataType: StructType) extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] { - private val numOfFields: Int = dataType.fields.size + private val numOfFields: Int = dataType.fields.length override def defaultSize: Int = 20 @@ -663,6 +664,7 @@ private[columnar] case class MAP(dataType: MapType) } private[columnar] object ColumnType { + @tailrec def apply(dataType: DataType): ColumnType[_] = { dataType match { case NullType => NULL diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala index 22d4278085..1f964b1fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala @@ -147,7 +147,7 @@ private[sql] case class InMemoryRelation( // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat // hard to decipher. assert( - row.numFields == columnBuilders.size, + row.numFields == columnBuilders.length, s"Row column number mismatch, expected ${output.size} columns, " + s"but got ${row.numFields}." + s"\nRow content: $row") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index e9afee1cc5..d2d7996f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -204,16 +204,16 @@ object CSVRelation extends Logging { val rowArray = new Array[Any](safeRequiredIndices.length) val requiredSize = requiredFields.length tokenizedRDD.flatMap { tokens => - if (params.dropMalformed && schemaFields.length != tokens.size) { + if (params.dropMalformed && schemaFields.length != tokens.length) { logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}") None - } else if (params.failFast && schemaFields.length != tokens.size) { + } else if (params.failFast && schemaFields.length != tokens.length) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + s"${tokens.mkString(params.delimiter.toString)}") } else { - val indexSafeTokens = if (params.permissive && schemaFields.length > tokens.size) { - tokens ++ new Array[String](schemaFields.length - tokens.size) - } else if (params.permissive && schemaFields.length < tokens.size) { + val indexSafeTokens = if (params.permissive && schemaFields.length > tokens.length) { + tokens ++ new Array[String](schemaFields.length - tokens.length) + } else if (params.permissive && schemaFields.length < tokens.length) { tokens.take(schemaFields.length) } else { tokens diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index ed02b3f95f..4dd3c50cdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -212,14 +212,14 @@ private[sql] object JDBCRDD extends Logging { // We can't compile Or filter unless both sub-filters are compiled successfully. // It applies too for the following And filter. // If we can make sure compileFilter supports all filters, we can remove this check. - val or = Seq(f1, f2).map(compileFilter(_)).flatten + val or = Seq(f1, f2).flatMap(compileFilter(_)) if (or.size == 2) { or.map(p => s"($p)").mkString(" OR ") } else { null } case And(f1, f2) => - val and = Seq(f1, f2).map(compileFilter(_)).flatten + val and = Seq(f1, f2).flatMap(compileFilter(_)) if (and.size == 2) { and.map(p => s"($p)").mkString(" AND ") } else { @@ -304,7 +304,7 @@ private[sql] class JDBCRDD( * `filters`, but as a WHERE clause suitable for injection into a SQL query. */ private val filterWhereClause: String = - filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ") + filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ") /** * A WHERE clause representing both `filters`, if any, and the current partition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 835e7ba6c5..f9d10292f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -335,7 +335,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's - if (a.metadata.exists(_ == SQLMetrics.ACCUM_IDENTIFIER)) { + if (a.metadata.contains(SQLMetrics.ACCUM_IDENTIFIER)) { val newValue = new LongSQLMetricValue(a.update.map(_.toString.toLong).getOrElse(0L)) Some(a.copy(update = Some(newValue))) } else { diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 0d4c128cb3..ee85626435 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -355,27 +355,27 @@ public class JavaDataFrameSuite { DataFrame df = context.range(1000); BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03); - assert (filter1.expectedFpp() - 0.03 < 1e-3); + Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3); for (int i = 0; i < 1000; i++) { - assert (filter1.mightContain(i)); + Assert.assertTrue(filter1.mightContain(i)); } BloomFilter filter2 = df.stat().bloomFilter(col("id").multiply(3), 1000, 0.03); - assert (filter2.expectedFpp() - 0.03 < 1e-3); + Assert.assertTrue(filter2.expectedFpp() - 0.03 < 1e-3); for (int i = 0; i < 1000; i++) { - assert (filter2.mightContain(i * 3)); + Assert.assertTrue(filter2.mightContain(i * 3)); } BloomFilter filter3 = df.stat().bloomFilter("id", 1000, 64 * 5); - assert (filter3.bitSize() == 64 * 5); + Assert.assertTrue(filter3.bitSize() == 64 * 5); for (int i = 0; i < 1000; i++) { - assert (filter3.mightContain(i)); + Assert.assertTrue(filter3.mightContain(i)); } BloomFilter filter4 = df.stat().bloomFilter(col("id").multiply(3), 1000, 64 * 5); - assert (filter4.bitSize() == 64 * 5); + Assert.assertTrue(filter4.bitSize() == 64 * 5); for (int i = 0; i < 1000; i++) { - assert (filter4.mightContain(i * 3)); + Assert.assertTrue(filter4.mightContain(i * 3)); } } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 1181244c8a..e0e56f3fbf 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -304,15 +304,12 @@ public class JavaDatasetSuite implements Serializable { Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList()); } - private Set toSet(List records) { - Set set = new HashSet(); - for (T record : records) { - set.add(record); - } - return set; + private static Set toSet(List records) { + return new HashSet<>(records); } - private Set asSet(T... records) { + @SafeVarargs + private static Set asSet(T... records) { return toSet(Arrays.asList(records)); } @@ -529,7 +526,7 @@ public class JavaDatasetSuite implements Serializable { Encoders.kryo(PrivateClassTest.class); } - public class SimpleJavaBean implements Serializable { + public static class SimpleJavaBean implements Serializable { private boolean a; private int b; private byte[] c; @@ -612,7 +609,7 @@ public class JavaDatasetSuite implements Serializable { } } - public class SimpleJavaBean2 implements Serializable { + public static class SimpleJavaBean2 implements Serializable { private Timestamp a; private Date b; private java.math.BigDecimal c; @@ -650,7 +647,7 @@ public class JavaDatasetSuite implements Serializable { } } - public class NestedJavaBean implements Serializable { + public static class NestedJavaBean implements Serializable { private SimpleJavaBean a; public SimpleJavaBean getA() { @@ -745,7 +742,7 @@ public class JavaDatasetSuite implements Serializable { ds.collect(); } - public class SmallBean implements Serializable { + public static class SmallBean implements Serializable { private String a; private int b; @@ -780,7 +777,7 @@ public class JavaDatasetSuite implements Serializable { } } - public class NestedSmallBean implements Serializable { + public static class NestedSmallBean implements Serializable { private SmallBean f; public SmallBean getF() { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 84f30c0aaf..a824759cb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -603,7 +603,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(parquetDF.inputFiles.nonEmpty) val unioned = jsonDF.unionAll(parquetDF).inputFiles.sorted - val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).toSet.toArray.sorted + val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted assert(unioned === allFiles) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 16e769feca..f59faa0dc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1562,16 +1562,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { e.message.contains("Cannot save interval data type into external storage") }) - def checkIntervalParseError(s: String): Unit = { - val e = intercept[AnalysisException] { - sql(s) - } - e.message.contains("at least one time unit should be given for interval literal") + val e1 = intercept[AnalysisException] { + sql("select interval") } - - checkIntervalParseError("select interval") + assert(e1.message.contains("at least one time unit should be given for interval literal")) // Currently we don't yet support nanosecond - checkIntervalParseError("select interval 23 nanosecond") + val e2 = intercept[AnalysisException] { + sql("select interval 23 nanosecond") + } + assert(e2.message.contains("cannot recognize input near")) } test("SPARK-8945: add and subtract expressions for interval type") { -- cgit v1.2.3