aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala12
-rw-r--r--python/pyspark/context.py6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala2
8 files changed, 70 insertions, 55 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3ddc0d5eee..ee5637371f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
- def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
+ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
- minSplits).map(pair => pair._2.toString)
+ minPartitions).map(pair => pair._2.toString)
}
/**
@@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
- * @param minSplits A suggestion value of the minimal splitting number for input data.
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
+ def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
+ RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
@@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
- minSplits)
+ minPartitions)
}
/**
@@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
- * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
@@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -524,7 +525,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minPartitions)
}
/**
@@ -532,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging {
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
- * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
+ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
@@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
- (path: String, minSplits: Int)
+ (path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits)
+ minPartitions)
}
/**
@@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
+ hadoopFile[K, V, F](path, defaultMinPartitions)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
@@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging {
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
@@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+ sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
- (path: String, minSplits: Int = defaultMinSplits)
+ (path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
@@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging {
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
@@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
- minSplits: Int = defaultMinSplits
+ minPartitions: Int = defaultMinPartitions
): RDD[T] = {
- sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
+ sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}
@@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism
/** Default min number of partitions for Hadoop RDDs when not given by user */
+ @deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
+ /** Default min number of partitions for Hadoop RDDs when not given by user */
+ def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
+
private val nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e6a3f06b0e..cf30523ab5 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism
- /** Default min number of partitions for Hadoop RDDs when not given by user */
+ /**
+ * Default min number of partitions for Hadoop RDDs when not given by user.
+ * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
+ * {@link #defaultMinPartitions()} instead
+ */
+ @Deprecated
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits
+ /** Default min number of partitions for Hadoop RDDs when not given by user */
+ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
+
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
@@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
- def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
+ def textFile(path: String, minPartitions: Int): JavaRDD[String] =
+ sc.textFile(path, minPartitions)
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
@@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
- * @param minSplits A suggestion value of the minimal splitting number for input data.
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
- new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
+ def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
+ new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
- * @see `wholeTextFiles(path: String, minSplits: Int)`.
+ * @see `wholeTextFiles(path: String, minPartitions: Int)`.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))
@@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions))
}
/** Get an RDD for a Hadoop SequenceFile.
@@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
- def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
+ def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
- sc.objectFile(path, minSplits)(ctag)
+ sc.objectFile(path, minPartitions)(ctag)
}
/**
@@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
}
/**
@@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass: Class[F],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minPartitions: Int
): JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
+ new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 80d055a895..4cb4505777 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
}
/**
- * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
+ * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
*/
- def setMaxSplitSize(context: JobContext, minSplits: Int) {
+ def setMaxSplitSize(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum
- val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
+ val maxSplitSize = Math.ceil(totalLen * 1.0 /
+ (if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6811e1abb8..6547755764 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
- * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
*/
@DeveloperApi
class HadoopRDD[K, V](
@@ -97,7 +97,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int)
+ minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -106,7 +106,7 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int) = {
+ minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -115,7 +115,7 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minPartitions)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -169,7 +169,7 @@ class HadoopRDD[K, V](
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(jobConf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 8684b645bc..ac1ccc06f2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD(
keyClass: Class[String],
valueClass: Class[String],
@transient conf: Configuration,
- minSplits: Int)
+ minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
override def getPartitions: Array[Partition] = {
@@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD(
case _ =>
}
val jobContext = newJobContext(conf, jobId)
- inputFormat.setMaxSplitSize(jobContext, minSplits)
+ inputFormat.setMaxSplitSize(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 2f3ac10397..3d6e7e0d5c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -57,7 +57,7 @@ object MLUtils {
* @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
* @param numFeatures number of features, which will be determined from the input data if a
* negative value is given. The default value is -1.
- * @param minSplits min number of partitions, default: sc.defaultMinSplits
+ * @param minPartitions min number of partitions, default: sc.defaultMinPartitions
* @return labeled data stored as an RDD[LabeledPoint]
*/
def loadLibSVMData(
@@ -65,8 +65,8 @@ object MLUtils {
path: String,
labelParser: LabelParser,
numFeatures: Int,
- minSplits: Int): RDD[LabeledPoint] = {
- val parsed = sc.textFile(path, minSplits)
+ minPartitions: Int): RDD[LabeledPoint] = {
+ val parsed = sc.textFile(path, minPartitions)
.map(_.trim)
.filter(!_.isEmpty)
.map(_.split(' '))
@@ -101,7 +101,7 @@ object MLUtils {
* with number of features determined automatically and the default number of partitions.
*/
def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinSplits)
+ loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions)
/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
@@ -112,7 +112,7 @@ object MLUtils {
sc: SparkContext,
path: String,
labelParser: LabelParser): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
+ loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions)
/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
@@ -124,7 +124,7 @@ object MLUtils {
path: String,
labelParser: LabelParser,
numFeatures: Int): RDD[LabeledPoint] =
- loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
+ loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions)
/**
* :: Experimental ::
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index d8667e84fe..f63cc4a55f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -248,14 +248,14 @@ class SparkContext(object):
jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
return RDD(jrdd, self, serializer)
- def textFile(self, name, minSplits=None):
+ def textFile(self, name, minPartitions=None):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
"""
- minSplits = minSplits or min(self.defaultParallelism, 2)
- return RDD(self._jsc.textFile(name, minSplits), self,
+ minPartitions = minPartitions or min(self.defaultParallelism, 2)
+ return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer())
def wholeTextFiles(self, path):
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 0da5eb754c..8cfde46186 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -52,7 +52,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
// it is smaller than what Spark suggests.
private val _minSplitsPerRDD = math.max(
- sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits)
+ sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)
// TODO: set aws s3 credentials.