Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
def setNumPartitions(numPartitions: Int) = {}
}

object Partitioner {
Expand Down Expand Up @@ -75,14 +76,18 @@ object Partitioner {
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
class HashPartitioner(var partitions: Int) extends Partitioner {
def numPartitions = partitions

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def setNumPartitions(numPartitions: Int){
partitions = numPartitions
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
Expand All @@ -102,7 +107,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient var partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private var ascending: Boolean = true)
extends Partitioner {
Expand All @@ -113,7 +118,9 @@ class RangePartitioner[K : Ordering : ClassTag, V](
private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
private var rangeBounds: Array[K] = getRangeBounds

private def getRangeBounds(): Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
Expand Down Expand Up @@ -184,6 +191,11 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
}

override def setNumPartitions(numPartitions: Int){
partitions = numPartitions
rangeBounds = getRangeBounds
}

override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_,_] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1289,17 +1289,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
resultHandler: (Int, U) => Unit): Array[U] = {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
val result = dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
result
}

/**
Expand All @@ -1315,7 +1316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
results
//results
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import org.apache.spark.util.Utils
*/

private[spark] class PythonPartitioner(
override val numPartitions: Int,
var partitions: Int,
val pyPartitionFunctionId: Long)
extends Partitioner {

override def numPartitions = partitions

override def getPartition(key: Any): Int = key match {
case null => 0
// we don't trust the Python partition function to return valid partition ID's so
Expand All @@ -44,6 +46,10 @@ private[spark] class PythonPartitioner(
case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}

override def setNumPartitions(numPartitions: Int) = {
partitions = numPartitions
}

override def equals(other: Any): Boolean = other match {
case h: PythonPartitioner =>
h.numPartitions == numPartitions && h.pyPartitionFunctionId == pyPartitionFunctionId
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
array
}

override def resetPartitions(numPartitions: Int): Array[Partition] = {
part.setNumPartitions(numPartitions)
this.partitions_ = getPartitions
this.partitions_
}

override val partitioner: Some[Partitioner] = Some(part)

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,18 @@ class HadoopRDD[K, V](
newInputFormat
}

override def getInputSize: Long = {
var totalSize:Long = 0L
this.partitions_.map(partition =>
if (partition.isInstanceOf[HadoopPartition]) {
val hadoopSplit = partition.asInstanceOf[HadoopPartition]
totalSize += hadoopSplit.inputSplit.value.getLength
}
)
logInfo("Input Size: " + totalSize)
totalSize
}

override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
Expand Down
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ abstract class RDD[T: ClassTag](
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
private var dependencies_ : Seq[Dependency[_]] = null
@transient private var partitions_ : Array[Partition] = null
@transient protected var partitions_ : Array[Partition] = null

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
Expand Down Expand Up @@ -209,6 +209,22 @@ abstract class RDD[T: ClassTag](
}
}

def getInputSize: Long = {
var totalSize:Long = 0L
dependencies.map {dep =>
totalSize += dep.rdd.getInputSize
}
totalSize
}

def resetPartitions(numPartitions: Int): Array[Partition] = {
dependencies.map(dep =>
dep.rdd.resetPartitions(numPartitions)
)
this.partitions_ = getPartitions
this.partitions_
}

/**
* Get the preferred locations of a partition, taking into account whether the
* RDD is checkpointed.
Expand Down Expand Up @@ -843,8 +859,12 @@ abstract class RDD[T: ClassTag](
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
override def numPartitions = p.numPartitions
var partitions: Int = p.numPartitions
override def numPartitions = partitions
override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
override def setNumPartitions(numPartitions: Int) = {
partitions = numPartitions
}
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class ShuffledRDD[K, V, C](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}

override def resetPartitions(numPartitions: Int): Array[Partition] = {
part.setNumPartitions(numPartitions)
this.partitions_ = getPartitions
this.partitions_
}

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,25 @@ private[spark] class ActiveJob(
val jobId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
var partitions: Array[Int],
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

val numPartitions = partitions.length
val finished = Array.fill[Boolean](numPartitions)(false)
var numPartitions = partitions.length
var finished = Array.fill[Boolean](numPartitions)(false)
var numFinished = 0

def resetPartition(numPartitions: Int){
this.numPartitions = numPartitions
partitions = (0 until numPartitions).toArray
finished = Array.fill[Boolean](numPartitions)(false)
if(listener.isInstanceOf[JobWaiter[_]]){
val waiter:JobWaiter[_] = listener.asInstanceOf[JobWaiter[_]]
val results = new Array[Any](numPartitions)
waiter.resultHandler = (index, res) => results(index) = res
waiter.totalTasks = numPartitions
waiter.result = results
}
}
}
99 changes: 96 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class DAGScheduler(
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val stageIdToChildStage = new HashMap[Int, Stage]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
Expand Down Expand Up @@ -127,6 +128,23 @@ class DAGScheduler(
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

private var isAutoPartition = sc.getConf.getBoolean("spark.reduce.autoPartition", false)
private var minNumPartitions = sc.getConf.getInt("spark.reduce.partition.min", 2)
private var maxNumPartitions = sc.getConf.getInt("spark.reduce.partition.max", 2)
private var bytesPerPartition = sc.getConf.getInt("spark.reduce.per.partition.bytes",
256 * 1024 * 1024)
private var isAutoPartitionForTest = false

def setAutoPartitionForTest(isAutoPartitionForTest: Boolean, maxNumPartitions: Int,
bytesPerPartition: Int){
this.isAutoPartitionForTest = isAutoPartitionForTest
this.isAutoPartition = isAutoPartitionForTest
if (isAutoPartitionForTest) {
this.maxNumPartitions = maxNumPartitions
this.bytesPerPartition = bytesPerPartition
}
}

private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
Expand Down Expand Up @@ -490,12 +508,13 @@ class DAGScheduler(

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
return new JobWaiter[U](this, jobId, 0, new Array[Any](partitions.size), resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
val waiter = new JobWaiter[U](this, jobId, partitions.size, new Array[Any](partitions.size),
resultHandler)
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
Expand All @@ -508,14 +527,21 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
properties: Properties = null): Array[U] =
{
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
val result = new Array[U](waiter.result.size)
var i = 0
while (i < waiter.result.size) {
result(i) = waiter.result(i).asInstanceOf[U]
i += 1
}
result
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
Expand Down Expand Up @@ -779,6 +805,9 @@ class DAGScheduler(
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
if(isAutoPartition){
stageIdToChildStage(parent.id) = stage
}
submitStage(parent)
}
waitingStages += stage
Expand All @@ -789,12 +818,76 @@ class DAGScheduler(
}
}

private def estimateStageInputSize(stage: Stage): Long = {
var inputSize:Long = 0L
if (stage.parents.isEmpty) {// when first level stage, compute rdd.inputSize
inputSize += stage.rdd.getInputSize
} else {
for (parentStage <- stage.parents) {
if(parentStage.isAvailable){
inputSize += parentStage.getOutputSize
} else {
inputSize += estimateStageInputSize(parentStage)
}
}
}
logInfo("estimate " + stage + " 's inputSize=" + Utils.bytesToString(inputSize))
inputSize
}

private def estimateNumPartitionsOfStage(childStage: Stage): Int = {
var inputSize:Long = 0L
// sum inputSize of childStage's parents stage
for (parentStage <- childStage.parents) {
// stage's inputSize is sum outputSize of his parents stage
inputSize += estimateStageInputSize(parentStage)
}
val estimateNumPartitions = ((inputSize + bytesPerPartition - 1) / bytesPerPartition).toInt
val numPartitions:Int = Math.min(Math.max(estimateNumPartitions, minNumPartitions),
maxNumPartitions)
logInfo("estimate " + childStage + " 's inputSize=" + Utils.bytesToString(inputSize) +
",bytesPerPartition=" + Utils.bytesToString(bytesPerPartition) + ",numPartitions=" +
numPartitions)
numPartitions
}

private def updateNumPartitionsOfStage(stage: Stage, numPartitions: Int) {
logInfo("update " + stage + " 's numPartitions=" + numPartitions)
for (parentStage <- stage.parents) {
if (parentStage.shuffleDep.isDefined) {
logInfo("update " + parentStage + " 's shuffleDep's numPartitions=" + numPartitions)
val partitioner:Partitioner = parentStage.shuffleDep.get.partitioner
if (!isAutoPartitionForTest) {
partitioner.setNumPartitions(numPartitions)
}
}
}

stage.resetNumPartitions(numPartitions)
logInfo("update " + stage + " 's numPartitions=" + numPartitions + " finished.")
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()

if(isAutoPartition){
if (stageIdToChildStage.contains(stage.id)) {
val childStage = stageIdToChildStage.get(stage.id)
if (!childStage.get.isResetNumPartitions && childStage.get.numPartitions > 1) {
updateNumPartitionsOfStage(childStage.get, estimateNumPartitionsOfStage(childStage.get))
} else {
logInfo(stage + "'s child " + childStage + " have been estimated, partitionNumber=" +
childStage.get.numPartitions)
}
stageIdToChildStage.remove(stage.id)
} else {
logInfo(stage + " has no child stage ")
}
}

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = {
if (stage.isShuffleMap) {
Expand Down
Loading