aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
committerSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
commite97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch)
tree23a11a3646b13195aaf50078a0f35fad96190618 /sql/core/src
parent02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff)
downloadspark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala2
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java16
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala15
13 files changed, 44 insertions, 45 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 8a0d7f8b12..2a78058838 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -251,7 +251,6 @@ public final class ColumnarBatch {
@Override
public Row next() {
- assert(hasNext());
while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
++rowId;
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 13142d0e61..0a156ea99a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -55,9 +55,8 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
* @since 2.0.0
*/
def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
- activeQueries.get(name).getOrElse {
- throw new IllegalArgumentException(s"There is no active query with name $name")
- }
+ activeQueries.getOrElse(name,
+ throw new IllegalArgumentException(s"There is no active query with name $name"))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 68a251757c..d8af799d89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -94,7 +94,7 @@ private[r] object SQLUtils {
}
def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = {
- val num = schema.fields.size
+ val num = schema.fields.length
val rowRDD = rdd.map(bytesToRow(_, schema))
sqlContext.createDataFrame(rowRDD, schema)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index fee36f6023..78664baa56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.columnar
import java.nio.{ByteBuffer, ByteOrder}
+import scala.annotation.tailrec
+
import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.types._
@@ -120,6 +122,7 @@ private[columnar] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
with NullableColumnAccessor
private[columnar] object ColumnAccessor {
+ @tailrec
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
val buf = buffer.order(ByteOrder.nativeOrder)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index 9c908b2877..3ec01185c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.columnar
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteBuffer
+import scala.annotation.tailrec
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.catalyst.InternalRow
@@ -548,7 +549,7 @@ private[columnar] object LARGE_DECIMAL {
private[columnar] case class STRUCT(dataType: StructType)
extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] {
- private val numOfFields: Int = dataType.fields.size
+ private val numOfFields: Int = dataType.fields.length
override def defaultSize: Int = 20
@@ -663,6 +664,7 @@ private[columnar] case class MAP(dataType: MapType)
}
private[columnar] object ColumnType {
+ @tailrec
def apply(dataType: DataType): ColumnType[_] = {
dataType match {
case NullType => NULL
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index 22d4278085..1f964b1fc1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -147,7 +147,7 @@ private[sql] case class InMemoryRelation(
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
- row.numFields == columnBuilders.size,
+ row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index e9afee1cc5..d2d7996f56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -204,16 +204,16 @@ object CSVRelation extends Logging {
val rowArray = new Array[Any](safeRequiredIndices.length)
val requiredSize = requiredFields.length
tokenizedRDD.flatMap { tokens =>
- if (params.dropMalformed && schemaFields.length != tokens.size) {
+ if (params.dropMalformed && schemaFields.length != tokens.length) {
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
None
- } else if (params.failFast && schemaFields.length != tokens.size) {
+ } else if (params.failFast && schemaFields.length != tokens.length) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
s"${tokens.mkString(params.delimiter.toString)}")
} else {
- val indexSafeTokens = if (params.permissive && schemaFields.length > tokens.size) {
- tokens ++ new Array[String](schemaFields.length - tokens.size)
- } else if (params.permissive && schemaFields.length < tokens.size) {
+ val indexSafeTokens = if (params.permissive && schemaFields.length > tokens.length) {
+ tokens ++ new Array[String](schemaFields.length - tokens.length)
+ } else if (params.permissive && schemaFields.length < tokens.length) {
tokens.take(schemaFields.length)
} else {
tokens
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index ed02b3f95f..4dd3c50cdf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -212,14 +212,14 @@ private[sql] object JDBCRDD extends Logging {
// We can't compile Or filter unless both sub-filters are compiled successfully.
// It applies too for the following And filter.
// If we can make sure compileFilter supports all filters, we can remove this check.
- val or = Seq(f1, f2).map(compileFilter(_)).flatten
+ val or = Seq(f1, f2).flatMap(compileFilter(_))
if (or.size == 2) {
or.map(p => s"($p)").mkString(" OR ")
} else {
null
}
case And(f1, f2) =>
- val and = Seq(f1, f2).map(compileFilter(_)).flatten
+ val and = Seq(f1, f2).flatMap(compileFilter(_))
if (and.size == 2) {
and.map(p => s"($p)").mkString(" AND ")
} else {
@@ -304,7 +304,7 @@ private[sql] class JDBCRDD(
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
*/
private val filterWhereClause: String =
- filters.map(JDBCRDD.compileFilter).flatten.mkString(" AND ")
+ filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ")
/**
* A WHERE clause representing both `filters`, if any, and the current partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 835e7ba6c5..f9d10292f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -335,7 +335,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
taskEnd.taskInfo.accumulables.flatMap { a =>
// Filter out accumulators that are not SQL metrics
// For now we assume all SQL metrics are Long's that have been JSON serialized as String's
- if (a.metadata.exists(_ == SQLMetrics.ACCUM_IDENTIFIER)) {
+ if (a.metadata.contains(SQLMetrics.ACCUM_IDENTIFIER)) {
val newValue = new LongSQLMetricValue(a.update.map(_.toString.toLong).getOrElse(0L))
Some(a.copy(update = Some(newValue)))
} else {
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 0d4c128cb3..ee85626435 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -355,27 +355,27 @@ public class JavaDataFrameSuite {
DataFrame df = context.range(1000);
BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03);
- assert (filter1.expectedFpp() - 0.03 < 1e-3);
+ Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3);
for (int i = 0; i < 1000; i++) {
- assert (filter1.mightContain(i));
+ Assert.assertTrue(filter1.mightContain(i));
}
BloomFilter filter2 = df.stat().bloomFilter(col("id").multiply(3), 1000, 0.03);
- assert (filter2.expectedFpp() - 0.03 < 1e-3);
+ Assert.assertTrue(filter2.expectedFpp() - 0.03 < 1e-3);
for (int i = 0; i < 1000; i++) {
- assert (filter2.mightContain(i * 3));
+ Assert.assertTrue(filter2.mightContain(i * 3));
}
BloomFilter filter3 = df.stat().bloomFilter("id", 1000, 64 * 5);
- assert (filter3.bitSize() == 64 * 5);
+ Assert.assertTrue(filter3.bitSize() == 64 * 5);
for (int i = 0; i < 1000; i++) {
- assert (filter3.mightContain(i));
+ Assert.assertTrue(filter3.mightContain(i));
}
BloomFilter filter4 = df.stat().bloomFilter(col("id").multiply(3), 1000, 64 * 5);
- assert (filter4.bitSize() == 64 * 5);
+ Assert.assertTrue(filter4.bitSize() == 64 * 5);
for (int i = 0; i < 1000; i++) {
- assert (filter4.mightContain(i * 3));
+ Assert.assertTrue(filter4.mightContain(i * 3));
}
}
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 1181244c8a..e0e56f3fbf 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -304,15 +304,12 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList());
}
- private <T> Set<T> toSet(List<T> records) {
- Set<T> set = new HashSet<T>();
- for (T record : records) {
- set.add(record);
- }
- return set;
+ private static <T> Set<T> toSet(List<T> records) {
+ return new HashSet<>(records);
}
- private <T> Set<T> asSet(T... records) {
+ @SafeVarargs
+ private static <T> Set<T> asSet(T... records) {
return toSet(Arrays.asList(records));
}
@@ -529,7 +526,7 @@ public class JavaDatasetSuite implements Serializable {
Encoders.kryo(PrivateClassTest.class);
}
- public class SimpleJavaBean implements Serializable {
+ public static class SimpleJavaBean implements Serializable {
private boolean a;
private int b;
private byte[] c;
@@ -612,7 +609,7 @@ public class JavaDatasetSuite implements Serializable {
}
}
- public class SimpleJavaBean2 implements Serializable {
+ public static class SimpleJavaBean2 implements Serializable {
private Timestamp a;
private Date b;
private java.math.BigDecimal c;
@@ -650,7 +647,7 @@ public class JavaDatasetSuite implements Serializable {
}
}
- public class NestedJavaBean implements Serializable {
+ public static class NestedJavaBean implements Serializable {
private SimpleJavaBean a;
public SimpleJavaBean getA() {
@@ -745,7 +742,7 @@ public class JavaDatasetSuite implements Serializable {
ds.collect();
}
- public class SmallBean implements Serializable {
+ public static class SmallBean implements Serializable {
private String a;
private int b;
@@ -780,7 +777,7 @@ public class JavaDatasetSuite implements Serializable {
}
}
- public class NestedSmallBean implements Serializable {
+ public static class NestedSmallBean implements Serializable {
private SmallBean f;
public SmallBean getF() {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 84f30c0aaf..a824759cb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -603,7 +603,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(parquetDF.inputFiles.nonEmpty)
val unioned = jsonDF.unionAll(parquetDF).inputFiles.sorted
- val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).toSet.toArray.sorted
+ val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
assert(unioned === allFiles)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 16e769feca..f59faa0dc2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1562,16 +1562,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
e.message.contains("Cannot save interval data type into external storage")
})
- def checkIntervalParseError(s: String): Unit = {
- val e = intercept[AnalysisException] {
- sql(s)
- }
- e.message.contains("at least one time unit should be given for interval literal")
+ val e1 = intercept[AnalysisException] {
+ sql("select interval")
}
-
- checkIntervalParseError("select interval")
+ assert(e1.message.contains("at least one time unit should be given for interval literal"))
// Currently we don't yet support nanosecond
- checkIntervalParseError("select interval 23 nanosecond")
+ val e2 = intercept[AnalysisException] {
+ sql("select interval 23 nanosecond")
+ }
+ assert(e2.message.contains("cannot recognize input near"))
}
test("SPARK-8945: add and subtract expressions for interval type") {