aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala28
1 files changed, 16 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
index f41fc38be2..8e3436f134 100644
--- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
+++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
@@ -18,14 +18,13 @@ package org.apache.spark.security
import java.io.{InputStream, OutputStream}
import java.util.Properties
+import javax.crypto.KeyGenerator
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
import org.apache.commons.crypto.random._
import org.apache.commons.crypto.stream._
-import org.apache.hadoop.io.Text
import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@@ -33,10 +32,6 @@ import org.apache.spark.internal.config._
* A util class for manipulating IO encryption and decryption streams.
*/
private[spark] object CryptoStreamUtils extends Logging {
- /**
- * Constants and variables for spark IO encryption
- */
- val SPARK_IO_TOKEN = new Text("SPARK_IO_TOKEN")
// The initialization vector length in bytes.
val IV_LENGTH_IN_BYTES = 16
@@ -50,12 +45,11 @@ private[spark] object CryptoStreamUtils extends Logging {
*/
def createCryptoOutputStream(
os: OutputStream,
- sparkConf: SparkConf): OutputStream = {
+ sparkConf: SparkConf,
+ key: Array[Byte]): OutputStream = {
val properties = toCryptoConf(sparkConf)
val iv = createInitializationVector(properties)
os.write(iv)
- val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
- val key = credentials.getSecretKey(SPARK_IO_TOKEN)
val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
new CryptoOutputStream(transformationStr, properties, os,
new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
@@ -66,12 +60,11 @@ private[spark] object CryptoStreamUtils extends Logging {
*/
def createCryptoInputStream(
is: InputStream,
- sparkConf: SparkConf): InputStream = {
+ sparkConf: SparkConf,
+ key: Array[Byte]): InputStream = {
val properties = toCryptoConf(sparkConf)
val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
is.read(iv, 0, iv.length)
- val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
- val key = credentials.getSecretKey(SPARK_IO_TOKEN)
val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
new CryptoInputStream(transformationStr, properties, is,
new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
@@ -92,6 +85,17 @@ private[spark] object CryptoStreamUtils extends Logging {
}
/**
+ * Creates a new encryption key.
+ */
+ def createKey(conf: SparkConf): Array[Byte] = {
+ val keyLen = conf.get(IO_ENCRYPTION_KEY_SIZE_BITS)
+ val ioKeyGenAlgorithm = conf.get(IO_ENCRYPTION_KEYGEN_ALGORITHM)
+ val keyGen = KeyGenerator.getInstance(ioKeyGenAlgorithm)
+ keyGen.init(keyLen)
+ keyGen.generateKey().getEncoded()
+ }
+
+ /**
* This method to generate an IV (Initialization Vector) using secure random.
*/
private[this] def createInitializationVector(properties: Properties): Array[Byte] = {