前言:
如今同学们对“ray计算框架”大概比较珍视,朋友们都想要剖析一些“ray计算框架”的相关资讯。那么小编也在网摘上收集了一些有关“ray计算框架””的相关资讯,希望咱们能喜欢,同学们一起来了解一下吧!Ray 介绍
Ray 是 UC Berkeley RISELab 出品的机器学习分布式框架。UC Berkeley 教授 Ion Stoica 写了一篇文章:The Future of Computing is Distributed。里面详细说了 Ray 产生的原由。总结一下,就是由于 AI 和大数据的快速发展,对于应用和硬件能力的要求提出了更高的挑战。
Ion Stoica 同时也是 Spark 产品的公司 Databricks 的创始人,Apache Mesos、Alluxio、Clipper 的项目主导人
Ray 的特点
我们从了解 Ray 到实践上线,大概有1年半的时间。为什么会找到 Ray ,主要还是基于高性能计算的需求。我们的场景主要是做投资组合的实时归因分析,由于涉及的数据量很大,对于计算的要求也很高。还有一个关键的门槛,金融模型大量使用 Pandas 和 Numpy 来做矩阵计算,需要针对 Pandas/Numpy 有更好的支持。实践下来,我觉得 Ray 有如下特点:
分布式异步调用内存调度Pandas/Numpy 的分布式支持支持 Python整体性能出众
存在很多和 Ray 类似的框架,但是如果把范围缩小,针对 python 用户,类似的主要就是 Dask、Mars、Celery 等 。
和 Dask 对比
Dask 是 Anaconda 的产品,背后的主要贡献者是 Matthew Rocklin 。Dask 的 blog 和 Matthew Rocklin 的 Blog 可以经常去看看,更新很频繁,内容很不错。最近 Matthew Rocklin 加入了 Nvidia ,开始做Dask_cudf,开发基于 GPU 的 Dask,结合 Rapids cudf(基于 GPU 的 Pandas)。最新的 blog 里面,显式他准备建立一个 Dask 的公司,推进 python 的分布式数据平台
Dask 和核心是弥补 python 在数据科学中的不足,主要是性能上。Python 单机的能力不能够支持数据科学中大数据集的快速计算。
Dask 提供了基础的数据结构,底层是分布式计算架构。数据结构包括:Array 、Dataframe。Array 兼容 numpy 的ndarray,Dataframe 兼容 Pandas 的 Dataframe 。"兼容"是个相对的说法,毕竟 Pandas 和 Numpy 发展多年,本身也在发展,接口非常多。Dask 应该是在兼容这块做的非常好的,但是和 Pandas/Numpy 还是有差异,这一点要注意。
就像刚才提到的,Dask 的目标是为了弥补 Python 在数据科学上的不足。而 Ray 的出发点是为了加速机器学习的调优和训练的速度。Ray 除了基础的计算平台,还包括 Tune (超参数调节) 和 RLlib (增强学习)
因为数据科学和机器学习基础都是 Python 也是核心,所以分布式和对 Python Pandas/Numpy 的支持是这两个框架的基础。但是有一点不一样的,也是我们决定要采用 Ray 的核心原因。Ray 的底层内存数据结构的基础是 Apache Arrow,而 Dask 是 xarray/xray 。xray 是 NumFocus 赞助的开源类似 numpy 的数据结构。Apache Arrow 拥有更好的生态,被大部分的数据处理系统接受,非常有利于和其他系统的融合
Apache Arrow 和 Plasma
Apache Arrow 是列式内存数据结构,已经成为数据处理领域最通用的数据结构。Arrow 最突出的特点就是生态非常丰富和性能出众
Arrow 的背后还有一位 committer 和 co-creator,叫 Wes McKinney 。Wes McKinney 是 pandas 的作者,所以 Arrow 对 Pandas 的支持也非常好
Ray 团队基于 Arrow 开发了一个内存数据服务,叫做 Plasma 。Plasma 在 Linux 共享内存创建了 Arrow 封装的对象,单独作为一个进程运行。其他进程可以通过 Plasma Client Library 来访问这块共享内存里的 Arrow 存储。这个功能是 Ray 团队开发的,贡献给 Arrow 作为 Arrow 生态的一部分。
除了 Plasma ,Ray 团队还贡献了一个叫做 Modin 的功能,就是基于 Ray 的分布式能力,提供了 Pandas 的实现。类似于 Dask 的 Dataframe。这个功能已经从 Ray 独立,成为一个独立的项目
软件架构
Ray 的基础结构可以参考 Paper
经过几个版本的迭代,有些内容做了一些优化,主要的结构还是如上图。GCS 作为集中的服务端,是 Worker 之间传递消息的纽带。每个 Server 都有一个共用的 Object Store,也就是用 Apache Arrow/Plasma 构建的内存数据。 Local Scheduler 是 Server 内部的调度,同时通过 GCS 来和其他 Server 上的 Worker 通信。Object Store 时间也有通信,作用是传递 Worker 之间的数据。
在 Paper 里面描述了一个典型的远程调用流程:
可以看到,GCS 储存了代码、输入参数、返回值。Worker 通过 Local Scheduler 来和 GCS 通信。Local Scheduler 就是 Raylet, 是单机上的基础调度服务
Object > 100 kb 会通过 Object Store 之间的并行 RPC 来传输,而不通过任务调度 RPC 来实现。Apache Arrow 在 0.15 之后提供了一个 Apache Arrow Flight 的 RPC 框架,0.16 又做了强化。不知道 Ray 的 Object 的并行传递是不是采用 Arrow Flight。下图是一个 任务调度的 RPC 示例图:
以上两个示意图来自于How Ray Uses gRPC and Arrow to outperform gRPC
下面我们详细来了解了一下 Raylet
Raylet 本地调度的核心
我重新画了一个简单一点的 Worker 和 GCS 的关系图:
Raylet 在中间的作用非常关键,Raylet 包含了几个重要内容:
Node ManagerObject Managergcs_client 或者 gcs server
Node Manager 是基于 boost::asio 的异步通信模块,主要是通信的连接和消息处理管理;Object Manager 是 Object Store 的管理;gcs_client 是连接 GCS 客户端。如果设置RAY_GCS_SERVICE_ENABLED 为 True 的话 ,这个 Server 就是 作为 GCS 启动。
我们先看一下 Raylet 的启动过程:
Raylet 在中间的作用非常关键,Raylet 包含了几个重要内容:
Node ManagerObject Managergcs_client 或者 gcs server
Node Manager 是基于 boost::asio 的异步通信模块,主要是通信的连接和消息处理管理;Object Manager 是 Object Store 的管理;gcs_client 是连接 GCS 客户端。如果设置RAY_GCS_SERVICE_ENABLED 为 True 的话 ,这个 Server 就是 作为 GCS 启动。
我们先看一下 Raylet 的启动过程:
首先,要做 Raylet 的初始化,这里面包含很多参数,包括 Node Manager 和 gcs client 的初始化。然后 Raylet Start 之后,注册 GCS,准备接收消息。一旦有消息进来,就进入 Node Manager 的 ProcessClientMessage 过程。在解释 ProcessClientMessage 的操作之前,我们需要了解一下 Ray Worker 和 Raylet 的进程/线程和通信的模型
通信模型
Ray 采用的是 Boost::asio 的异步通信模型,这里有一个很丰富全面的关于 asio 的介绍
Asio 采用的是 Proactor 模型。一个操作经过 Initiator 之后分解为 Asynchronous Operation Processor(AOP) 、Asynchronouse Operation(AO) 和 Completion Hanlder(CH) 。AOP 做具体的工作,执行异步操作。执行完成之后,把结果放入Completion Event Queue(CEQ)。Asynchronous Event Demultiplexer(AED)等待 CEQ ,如果 CEQ 出现完成事件,则返回一个完成事件到 CH
Raylet 启动了一个 main_service , 是 boost::asio::io_service 。io_service 也是 asio 运转的核心组件。前面的AOP、AED 和 Proactor 都是由 io_service 串联起来的。io_service 内部实现了一个任务队列,队列的任务就是void(void) 函数
io_service 的接口有 run 、run_one、poll、poll_one、stop、reset 、 dispatch 、post 。run 方式就是轮询执行队列里面的所有任务,无任务执行的时候就 epoll_wait 上阻塞等待
// Initialize the node manager.boost::asio::io_service main_service;main_service.run();
Node Manager 在初始化的时候,会按照 num_initial_workers 的数量初始化 worker pool 。然后Node Manager 会按照 asio 的异步机制,分配任务到这些 worker pool 里面的进程
接下来我们看一下 Raylet 、Worker 和 GCS 的消息传递和调度机制
消息传递和调度
Ray 后面的公司 的 blog 有一篇文章,叫做 Fast Scheduling in Ray 0.8 。讲了怎么在 ray 0.8 里面优化调度
Worker 提交 task 到 raylet,raylet 分配 task 到其他 worker。同时 raylet 还需要把 task 、相关 worker 信息提交给 GCS。task 执行的参数和返回都需要通过 Object Store 来获取
接下来,我们看一下详细的消息传递和对应的一些执行过程
先看一下 Submit Task 这个操作: Worker 提交一个 Task ,就调用 SubmitTask 的任务到 Raylet 。Task 在 Raylet 内部有一个 Lineage 的机制。这个也是上面 Anyscale 图里面的 task lineage
我们先了解一下 Task Lineage 的机制
Task Lineage
Task Lineage 里面包含几个概念,Lineage Cache 、Lineage Entry 和 Lineage 。Lineage 是管理 Task 执行的 DAG (有向无环图) ;Lineage Entry 是对 Task 状态的一些管理;Lineage Cache 是对 Task 在本机执行缓存的管理。在上面 Fast Scheduling in Ray 0.8 文章里面,主要就是通过对 Lineage 的优化来提升 Ray 0.8 的调度性能
在 Ray 0.8 里面,把调用其他 Worker 的流程,从 Raylet 到 GCS 然后到 woker ,改为直接查询 Lineage Cache,如果 Worker 曾经调用过,就直接请求对应的 Worker。减少调用路径,提升效率。
回到我们对 Lineage 的分析
Task 在 GCS 里面有几个状态:None、Uncommitted、Committing、Committed 。None 意思是在 Lineage Cache 里面不存在;当任务从 Woker 提交之后,是 uncommited 状态了;当任务发生一些变化,经过一些操作或者重新提交,就是 Committing 状态。意思就是正在进行 Committing,等待返回状态;提交的任务得到了反馈,就是 Committed 状态。但是有一个不同,任务没有删除,当下一个任务还是调度这个 Worker 的时候,就可以直接调用这个 Task Entry 来实现。这就是上面说的优化过程
TaskEntry 保存 Task 的状态和相关的联系。主要包含这么几个内容:
GcsStatus:就是上面说的 Task 的状态parent_task_ids_:一个 Set ,保存了 Task 的父任务 ID 列表forwarded_to_:一个 Set,保存了任务明确提交到的 Node Manager 的 ID 列表
Lineage 维护了两个 map。一个是 Task 和 LineageEntry 的 map;一个是 TaskID 和 TaskID Set 的 Map。第二个的意思就是 Task 和它子 Task 组的映射
LineageCache 是 Task 的 Cache Table 。包含了 Task 的信息和状态。Lineage Cache 的策略是把所有的任务成为 Uncommitted 状态。为了安全起见,只有当 Task 的父任务都删除了,子任务才能删除
Lineage 的细节还很多,而且还处在优化的状态。我们先看看通过一个 Task 提交的过程来看看 Lineage 是怎么运转的
提交任务
Submit Task 之后,先记录增加了一个 Task。然后拿到需要提交的 Task Spec,就是 Task 详细信息。然后提交。Task 有几个状态:
Placeable:就绪的状态,可以分配到 Node, 可以是本地或者远端。分配的原则根据资源状况,例如本地的内存、是否超过 Task 最大数量等。如果本地资源不够,就会提交到其他的 Node ,也就是服务器。当然,如果其他 Node 资源也不够,就会继续分配。WaitForActoreCreation:这个转改是针对 Actore Task ,代表 Actor 方法等待 Actore 完成返回Waiting:Task 在等待它的参数的依赖关系满足要求。也就是 Task 的参数需要放到 local object storeReady:Task 可以运行,所有的参数已经传输到 local object store 了Running:Task 已经分配冰运行到一个 workerBlocked:Task 暂停。可能是因为 Task 正在等待启动其他 Task 并且等待结果返回Infeasible:Task 所需要的资源所有机器都不满足
在 design_docs/task_states.rst 文档里面有一个图:
在 SubmitTask 最后:
// if the task was forwarded. if (forwarded) { // Check for local dependencies and enqueue as waiting or ready for dispatch. EnqueuePlaceableTask(task); } else { // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueTasks({task}, TaskState::PLACEABLE); ScheduleTasks(cluster_resource_map_); }
如果要提交的 Task 需要 forward (在收到 HandleForwardTask 操作的时候),则进行 Task 如队列操作。入队的时候,如果参数都满足,也就是本地资源足够。Task 就入队列,成为 READY 状态,如果不满足,就是 WAITING 状态。同时改变 Task 状态为 Pending
if (args_ready) { local_queues_.QueueTasks({task}, TaskState::READY); DispatchTasks(MakeTasksByClass({task})); } else { local_queues_.QueueTasks({task}, TaskState::WAITING); }task_dependency_manager_.TaskPending(task);
调度策略
在 SubmitTask 之后,如果不是 forward ,则执行两个操作:
local_queues_.QueueTasks({task}, TaskState::PLACEABLE);ScheduleTasks(cluster_resource_map_);
第一个是把 Task 放到本地,并且把 Task 状态置为 Placeable;第二是把 Task 在集群进行调度
Ray 针对 Task 有两个 Queue:ReadyQueue、SchedulingQueue 。 ReadyQueue 是已经准备好的 Task 的队列;SchedulingQueue 是已经提交的 Task 的队列。这两个队列用来存储不同状态的 Task,实现上面说的 Task 状态变化过程。
调度任务的步骤是这样:
先尝试把 Tasks 放在 Local Node遍历集群的资源,查看这个 Task 需要的资源是不是可以用。把可以用的集群的资源都储存到 clients_keys 的 vector 里面如果 Local Node 有资源从资源里面按照均匀分布取出一个资源。均匀分布的算法用的是 C++ 的 uniform_int_distribution 。然后为集群的 Client 都准备好计算资源如果没有合适的计算资源,就采用硬分配的方式,给 Client 安排计算资源。
集群架构
按照上面的描述,Ray 集群有 Worker 、Gcs 和 Raylet 等模块。Worker 是一个执行单元。 Worker 的执行是通过 gRPC 来远程提交的。整个架构有点像 istio 的 service mesh 的结构
对应以上的粗粒度的组件,拆解开来就像下面这样:
这里面有几个关键组件。Raylet 是处理 Worker 和 GCS 的关键连接点,还有处理 Local Worker 之间的调度。Raylet 里面包含 Node Manager,这是处理消息传递和调度的基础模块;还有 Object Manager ,这是处理本机 Arrow 内存读取的组件,相对容易理解;Core worker 组件针对 Python Driver 提供支持,主要是完成 Task 的调度。就是 python 里面使用 ray 时候需要加的 remote 注解。这个是 Ray 的核心。Python Driver 主要是针对 python 提供支持,当然 Ray 也有 Java Driver ,这里没有列出
我们先从 Raylet 看起
Raylet
在 Raylet 初始化的时候,初始化了一个 main_service 。 这是一个boost::asio::io_service 实例。这个在上面的通信模型里面简单描述了一下 asio 的机制。main_service 在 main.cc 启动(main_service.run()),main_service 的引用传递到了 Raylet ,然后 Raylet 应用传递到了 Node Manager
Node Manager(下称 NM)是 Raylet 的一个负责通信的模块,处理 Raylet 和其他分布式节点(服务器)、Worker、Task 分配还有 GCS 的通信
从 Raylet 到 Node Manager 的入口在 HandlerAccept :
ClientHandler<local_stream_protocol> client_handler = [this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); }; MessageHandler<local_stream_protocol> message_handler = [this](std::shared_ptr<LocalClientConnection> client, int64_t message_type, const uint8_t *message) { node_manager_.ProcessClientMessage(client, message_type, message); }; // Accept a new local client and dispatch it to the node manager. auto new_connection = LocalClientConnection::Create( client_handler, message_handler, std::move(socket_), "worker", node_manager_message_enum, static_cast<int64_t>(protocol::MessageType::DisconnectClient));
client_handler 是处理连接请求,message_handler 是处理这个 Client 的消息。LocalClientConnection 是一个针对客户端请求到服务端的抽象,主要是基于 asio 机制把读写,和异步读写封装了一下
ProcessNewClient 主要是记录 Client 的一些信息 ProcessClientMessage 就是上面架构图里面的消息处理,对应不同的消息处理流程。可以看一下附录:《Node Manager 处理消息的列表》
这里面最重要的一个,是 SubmitTask。是针对 task 的处理,Task 作为主要任务调度的模块,贯穿 Ray 分布式任务调度的全过程。所以我们有必要从源头来了解和跟踪一下 Task 的发起到完成的整个过程。同时,我们也可以通过这个过程,了解从 Python Driver 到 Core Worker ,然后到 Raylet 的处理过程。
Submit Task
Task 是表示一个任务及其执行的资源等信息。Task 的发起是从 Python Driver
@ray.remotedef borrower(inner_ids): inner_id = inner_ids[0] ray.get(foo.remote(inner_id))inner_id = ray.put(1)outer_id = ray.put([inner_id])res = borrower.remote(outer_id)
例如以上的代码,@ray.remote 注解下面的函数,就是一个执行体。对应的是 RemoteFuntion Class 。在 _remote 这一段:
self._pickled_function = pickle.dumps(self._function)
_function 序列化为 _pickled_funtion,然后再 hash 为 pickled_function_hash
self._function_descriptor = PythonFunctionDescriptor.from_function( self._function, self._pickled_function)def from_function(cls, function, pickled_function): pickled_function_hash = hashlib.sha1(pickled_function).hexdigest()
然后就调用了 Core Worker 的 SubmitTask
Status SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,const TaskOptions &task_options, std::vector<ObjectID> *return_ids,int max_retries);
在 SubmitTask 里面。生成一个 task id ,然后通过 BuildCommonTaskSpec 函数,把 Task 所有信息封装成一个 TaskSpecification 实例。然后把这个 TaskSpecification 提交到 Task 的任务队列里面。
if (task_options.is_direct_call) { task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries); return direct_task_submitter_->SubmitTask(task_spec); } else { return local_raylet_client_->SubmitTask(task_spec); }
这里面 Task Manager 是对 Task 管理的一个封装。包含了对应的内存 in_memory_store_、引用计数 reference_counter_(主要用作对 ObjectID 的管理,用在 GC 上),任务的状态和 Retry 次数等。这里面 task_manager->AddPendingTask ,主要是针对 Task 提交前做了记录,记录 Task ID,为了 reference_manager_ 之后的 GC 用
is_direct_call 是针对 Actor worker 的直接调用。local_raylet_client_ 就是上面提到的 Raylet Client,Core worker 把接收到的 remote 调用提交到 Raylet ,Raylet 来做调度。就是下图红色的那一段:
在 Raylet 的 Node Manager 接收到 SubmitTask 消息,按照 Task 的依赖次序来提交 Task。意思就是,如果一个 Task B 依赖于另外一个 Task A,那就先提交 Task A
如果任务是提交到另外一个 Node(这个取决于 Lineage 调度,forwarded 是 True,forwarded 是 SubmitTask 的最后一个参数),则在 Lineage Cache 增加一个 UncommittedLineage
lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage)
这里面第二个参数,是 SubmitTask 的时候,生成的一个 Lineage 的实例。
如果任务是提交到本地(forwarded 是 False,默认),则异步 commit task 到 GCS:
lineage_cache_.CommitTask(task)
调用的是 Lineage 的 CommitTask,然后调用 Lineage 的 FlushTask,接着调用 gcs_client_->Tasks().AsyncAdd 把 Task 状态提交到 GCS,然后根据返回状态更新本地 Lineage 的 Task 状态为 GcsStatus::COMMITTED。同时 Evict Task 和 UnSubscribeTask
整个过程流程图如下:
参考文档
Ray: A Distributed Framework for Emerging AI Applicationsarxiv.org/abs/1712.05889
Node Manager 处理消息的列表:
原文:
标签: #ray计算框架