前言:
此时姐妹们对“fifo调度算法程序”大概比较注意,你们都想要知道一些“fifo调度算法程序”的相关文章。那么小编在网络上网罗了一些关于“fifo调度算法程序””的相关知识,希望朋友们能喜欢,姐妹们一起来了解一下吧!TaskSchedulerImpl对Task的调度依赖于调度池Pool,所有需要被调度的TaskSet都被至于调度池中。调度池Pool通过调度算法对每个TaskSet进行调度,并将调度的TaskSet交给TaskSchedulerImpl进行资源调度。
调度算法
调度池对TaskSet的调度取决于调度算法。SchedulingAlgorithm定义了调度算法的规范。
/** * An interface for sort algorithm * FIFO: FIFO algorithm between TaskSetManagers * FS: FS algorithm between Pools, and FIFO or FS within Pools */ private[spark] trait SchedulingAlgorithm { def comparator(s1: Schedulable, s2: Schedulable): Boolean }
SchedulingAlgorithm有两个实现类,分别为实现了先进先出(First In First Out,FIFO)算法的FIFOSchedulingAlgorithm和公平调度算法的FairSchedulingAlgorithm。
FIFOSchedulingAlgorithm
FIFOSchedulingAlgorithm实现了FIFO调度算法,
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } res < 0 } }
重写的comparator方法的执行步骤如下:
对s1和s2两个Schedulable的优先级(值越小,优先级越高)进行比较如果两个Schedulable的优先级相同,则对s1和s2所属的Stage的身份标识进行比较如果结果小于0,则优先调度s1,否则优先调度s2。FairSchedulingAlgorithm
FairSchedulingAlgorithm代码如下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0) val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0) val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }
重写的comparator方法的执行步骤如下:
如果s1中处于运行状态的Task的数量小于s1的minShare,并且s2中处于运行状态的Task的数量大于等于s2的minShare,那么优先调度s1。如果s1中处于运行状态的Task的数量大于等于minShare,并且s2中处于运行状态的Task的数量小于s2的minShare,那么优先调度s2。如果s1中处于运行状态的Task数量小于s1的minShare,并且s2中处于运行状态的Task的数量小于s2的minShare,那么在对minShareRatio1和minShareRatio2进行比较。如果minShareRatio1小于minShareRatio2,则优先调度s1;如果minShareRatio2小于minShareRatio1,则优先调度s2.如果minShareRatio1和minShareRatio2相等,则对s1和s2的名字进行比较。如果s1中处于运行状态的Task数量大于等于s1的minShare,并且s2中处于运行状态的Task的数量大于等于s2的minShare,那么在对taskToWeightRatio1和taskToWeightRatio2进行比较。如果taskToWeightRatio1和taskToWeightRatio2相等, 则对s1和s2的名称进行比较。taskToWeightRatio是正在运行的任务数量与权重(weight)之间的比值。Pool
TaskScheduler对任务的调度是借助于调度池实现的,Pool是对Task集合进行调度的调度池。调度池内部有一个根调度队列,跟调度队列中包含了多个子调度池。子调度池自身的调度队列中还可以包含其他的调度池或者TaskSetManager,所以调度池是一个多层次的调度队列。
Pool实现了Schedulable的特质,其中包含的属性如下:
parent:当前Pool的父Pool。name:当前Pool的名称,构造器属性之一schedulingMode:Pool的构造器属性之一,表示调度模式(SchedulingMode)。枚举类型SchedulingMode共有FAIR、FIFO、NONE三种枚举值。initMinShare:minShare的初始值initWeight:weight的初始值weight:用于公平调度算法的权重minShare:用于公平调度算法的参考值schedulableQueue:类型是ConcurrentLinkedQueue[Schedulable],用于存储Schedulable。由于Schedulable只有Pool和TaskSetManager两个实现类,所以schedulableQueue是一个可以嵌套的层次结构。schedulableNameToSchedulable:调度名称与Schedulable的对应关系。runningTasks:当前正在运行的任务数量。stageId:调度池或TaskSetManager所属Stage的身份标识taskSetSchedulingAlgorithm:任务集合的调度算法,默认是FairSchedulingAlgorithm。
标签: #fifo调度算法程序 #fifo调度算法例题