aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-28 15:01:51 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-28 15:01:51 -0800
commit710b41172958a0b3a2b70c48821aefc81893731b (patch)
treed09bedea3873c481883574c4abea6baf1ed1e845
parentfb572c6e4b0645c8084aa013d0c93bb21a79977b (diff)
downloadspark-710b41172958a0b3a2b70c48821aefc81893731b.tar.gz
spark-710b41172958a0b3a2b70c48821aefc81893731b.tar.bz2
spark-710b41172958a0b3a2b70c48821aefc81893731b.zip
[SPARK-12489][CORE][SQL][MLIB] Fix minor issues found by FindBugs
Include the following changes: 1. Close `java.sql.Statement` 2. Fix incorrect `asInstanceOf`. 3. Remove unnecessary `synchronized` and `ReentrantLock`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10440 from zsxwing/findbugs.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala3
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java4
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/Main.java2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala16
7 files changed, 51 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index a6d9374eb9..16815d51d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
-import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}
import scala.collection.JavaConverters._
@@ -126,7 +125,7 @@ private[spark] class MesosClusterScheduler(
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val schedulerState = engineFactory.createEngine("scheduler")
- private val stateLock = new ReentrantLock()
+ private val stateLock = new Object()
private val finishedDrivers =
new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
private var frameworkId: String = null
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index d099ee9aa9..414ffc2c84 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -293,9 +293,7 @@ class LauncherServer implements Closeable {
protected void handle(Message msg) throws IOException {
try {
if (msg instanceof Hello) {
- synchronized (timeout) {
- timeout.cancel();
- }
+ timeout.cancel();
timeout = null;
Hello hello = (Hello) msg;
ChildProcAppHandle handle = pending.remove(hello.secret);
diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java
index a4e3acc674..e751e948e3 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/Main.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java
@@ -151,7 +151,7 @@ class Main {
@Override
protected boolean handle(String opt, String value) {
- if (opt == CLASS) {
+ if (CLASS.equals(opt)) {
className = value;
}
return false;
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
index d89682611e..9cfd466294 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
@@ -386,9 +386,9 @@ private[tree] object LearningNode {
var levelsToGo = indexToLevel(nodeIndex)
while (levelsToGo > 0) {
if ((nodeIndex & (1 << levelsToGo - 1)) == 0) {
- tmpNode = tmpNode.leftChild.asInstanceOf[LearningNode]
+ tmpNode = tmpNode.leftChild.get
} else {
- tmpNode = tmpNode.rightChild.asInstanceOf[LearningNode]
+ tmpNode = tmpNode.rightChild.get
}
levelsToGo -= 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03867beb78..ab362539e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -297,7 +297,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
if (!tableExists) {
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
- conn.createStatement.executeUpdate(sql)
+ val statement = conn.createStatement
+ try {
+ statement.executeUpdate(sql)
+ } finally {
+ statement.close()
+ }
}
} finally {
conn.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index fc0f86cb18..4e2f5059be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -122,30 +122,35 @@ private[sql] object JDBCRDD extends Logging {
val dialect = JdbcDialects.get(url)
val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try {
- val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
+ val statement = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0")
try {
- val rsmd = rs.getMetaData
- val ncols = rsmd.getColumnCount
- val fields = new Array[StructField](ncols)
- var i = 0
- while (i < ncols) {
- val columnName = rsmd.getColumnLabel(i + 1)
- val dataType = rsmd.getColumnType(i + 1)
- val typeName = rsmd.getColumnTypeName(i + 1)
- val fieldSize = rsmd.getPrecision(i + 1)
- val fieldScale = rsmd.getScale(i + 1)
- val isSigned = rsmd.isSigned(i + 1)
- val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
- val metadata = new MetadataBuilder().putString("name", columnName)
- val columnType =
- dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
- getCatalystType(dataType, fieldSize, fieldScale, isSigned))
- fields(i) = StructField(columnName, columnType, nullable, metadata.build())
- i = i + 1
+ val rs = statement.executeQuery()
+ try {
+ val rsmd = rs.getMetaData
+ val ncols = rsmd.getColumnCount
+ val fields = new Array[StructField](ncols)
+ var i = 0
+ while (i < ncols) {
+ val columnName = rsmd.getColumnLabel(i + 1)
+ val dataType = rsmd.getColumnType(i + 1)
+ val typeName = rsmd.getColumnTypeName(i + 1)
+ val fieldSize = rsmd.getPrecision(i + 1)
+ val fieldScale = rsmd.getScale(i + 1)
+ val isSigned = rsmd.isSigned(i + 1)
+ val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
+ val metadata = new MetadataBuilder().putString("name", columnName)
+ val columnType =
+ dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
+ getCatalystType(dataType, fieldSize, fieldScale, isSigned))
+ fields(i) = StructField(columnName, columnType, nullable, metadata.build())
+ i = i + 1
+ }
+ return new StructType(fields)
+ } finally {
+ rs.close()
}
- return new StructType(fields)
} finally {
- rs.close()
+ statement.close()
}
} finally {
conn.close()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 28cd688ef7..46f2670eee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -49,14 +49,26 @@ object JdbcUtils extends Logging {
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems using JDBC meta data calls, considering "table" could also include
// the database name. Query used to find table exists can be overriden by the dialects.
- Try(conn.prepareStatement(dialect.getTableExistsQuery(table)).executeQuery()).isSuccess
+ Try {
+ val statement = conn.prepareStatement(dialect.getTableExistsQuery(table))
+ try {
+ statement.executeQuery()
+ } finally {
+ statement.close()
+ }
+ }.isSuccess
}
/**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
- conn.createStatement.executeUpdate(s"DROP TABLE $table")
+ val statement = conn.createStatement
+ try {
+ statement.executeUpdate(s"DROP TABLE $table")
+ } finally {
+ statement.close()
+ }
}
/**