发布于:2021-01-30 10:55:20
0
164
0
GPars是Groovy(和Java)中用于高级并发的开源库。如果您听过actors、dataflow或parallel collections之类的术语,并且希望用Java友好的语言尝试这些概念,那么现在您有机会了。在本文中,我计划对gpar中可用的抽象进行快速概述。然后我们将更详细地研究基于流的并发数据处理、并行化收集操作以及异步运行组合函数。
随着多核芯片逐渐成为主流电脑、平板电脑和手机的标准,并发编程越来越重要。不幸的是,我们从Java中了解到的广泛使用的基于线程的并发模型与人脑的工作方式并不匹配。线程和锁在代码中引入了太多的不确定性,这常常导致难以追踪和修复的细微错误。这样的代码不能被可靠地测试或分析。不可避免地,为了使并发编程有效,我们需要使用更自然的心智模型。
并发可以是Groovy
将直观的并发模型引入主流是gpar雄心勃勃的挑战。我们采用了众所周知的并发概念,如actors、CSP、dataflow等,并用Java实现了它们,使用了一种美味的Groovy DSL,使库具有流畅和易于使用的风格。尽管主要针对Groovy,但一些gpar抽象可以直接从Java使用。由于Groovy社区对并发的兴趣以及他们对项目的支持,gpar目前是Groovy发行版的标准部分。在Groovy中启动并运行并发不需要额外的设置。
被认为是致命的循环
我想让你停下来想一想计算机科学专业的学生在学习编程时通常被分配的一些琐碎的练习。例如,这样的任务之一是查找集合的最大值。可能会应用一些复杂的度量来增加解决方案的计算量。你首先想到的是什么算法?
你很有可能会提出一个迭代来遍历这个集合并记住迄今为止发现的最大元素。一旦我们到达集合的末尾,我们记住的迄今为止最大的元素必须是整个集合的全局最大值。清楚,简单-好吧,错了!如果你想知道原因,就继续读下去。
做你的选择
在并发空间中似乎没有一个一刀切的解决方案。多种范式已经逐渐出现,尽管有一些重叠,但它们都适用于不同类型的问题。GPars本身在2008年作为一个用于并行收集处理的小型Groovy库起步。不久之后,它增加了对actor模型的支持。随着时间的推移,其他并发抽象已经被集成。以下是GPars版本0.12中当前可用内容的快速列表:
并行集合提供了直观的方式来并行处理Java / Groovy集合,地图和一般所有几何可分解问题的处理
异步功能使Groovy闭包能够异步运行,同时又可以毫不费力地协调它们之间的相互通信。
Fork / Join使您能够同时处理递归的分治式算法
数据流变量(又名Promises)为线程间通信提供了一种轻量级的机制。
数据流通道和运算符使您可以将活动的数据转换元素组织到高度并发的数据处理管道和网络中
CSP是基于理论数学的著名并发模型,它使用通过同步通道进行通信的独立并发运行流程的抽象
Actor / Active对象为您提供了低礼仪事件驱动的主动组件的抽象,这些组件异步交换数据
顾名思义,代理可以保护多个线程需要同时访问的数据
您可以在GPars用户指南中查看每个模型的详细信息,并将它们与它们的典型适用范围进行并排比较。另外,Dierkönig即将出版的第二版书“Groovy in Action”对gpar进行了详细的介绍。
在本文中,我选择了三种最有可能向您展示直观并发好处的抽象:并行集合、异步函数和数据流操作符。我们就潜进去吧!
几何分解
现在,这是一个很好的地方来解释为什么我们前面描述的求最大值的顺序算法是一个错误的选择。并不是说这个解决方案是错误的。很明显,它给了你正确的答案,不是吗?它失败的地方在于它的有效性。它禁止随着工人人数的增加而扩大规模。它完全忽略了这样一种可能性,即系统可能会在这个问题上放置多个处理器。
超市几十年前就解决了这个难题。当收银台的队伍排得太长时,他们会另外叫一个出纳来为顾客服务,这样工作量就会分散,吞吐量也会增加。
回到寻找最大值的问题:利用Groovy函数集合API,GPars添加了每个流行迭代方法的并行版本,如eachParallel()、collectpallel()、findAllParallel()、maxpallel()和其他方法。这些方法对用户隐藏了实际的实现。在幕后,集合被划分成更小的块,可能是按层次组织的,每个块都将由不同的线程处理(图1)。
实际工作由用户必须提供的线程池中的线程执行。GPAR提供了两种类型的线程池:
GParsExecutorsPool 使用直接的Java 5 Executor
GParsPool 使用Fork / Join线程池
Parallel collections in use:GParsPool.withPool 16, { def myFavorite = programmingLanguages.collectParallel {it.grabSpec()} .maxParallel {it.spec.count(/parallel/)}}
在withPool代码块中,并行收集方法自动在周围线程池的线程之间分配工作。向池中添加的线程越多,获得的并行度就越高。如果没有明确的池大小要求,池将为运行时检测到的每个可用处理器创建一个线程,从而提供最大的计算能力。这样就不会有人为的上限限制算法内部的并行性。无论是在旧的单处理器机器上还是在未来的百核芯片上,代码都将全速运行。
GPars并行集合API提供了经常被称为循环并行问题或更一般的几何分解问题的解决方案。还有其他类型的挑战需要更具创造性的并发方法。我们将在文章的下一部分讨论其中的两个。
异步函数
在看到集合被并发处理之后,我们现在将重点讨论函数。Groovy对函数编程有很好的支持;毕竟,能够并行化方法和函数调用肯定会派上用场。为了接近Groovy所在的领域,我选择了软件项目构建编排问题作为下一步的主题。
注意:当并行化构建进程时,通常I/O限制比CPU限制更多,我们明显地增加了磁盘和网络带宽的利用率,而不是处理器的利用率。它不仅是CPU,而且是其他资源,通过并发代码可以提高它们的利用率。显然,所演示的原理可以完全以同样的方式应用于CPU受限的问题
让我们假设我们有一组函数,可能实现为shell脚本、gradle任务或GAnt方法,它们可以执行构建的不同部分。传统的构建脚本可以如下所示:
清单1:构建脚本的顺序版本
println "Starting the build process."def projectRoot = checkout('git@github.com:vaclav/GPars.git')def classes = compileSources(projectRoot)def api = generateAPIDoc(projectRoot)def guide = generateUserDocumentation(projectRoot)def result = deploy(packageProject(classes, api, guide))
我们现在的任务是尽可能地安全地并行化构建,而不需要过度的努力。您可能会看到compileSources()、generateAPIDoc()和generateUserGuide()可以安全地并行运行,因为它们没有相互依赖关系。他们只需要等待checkout()完成就可以开始工作。但是,脚本会连续运行它们。
我相信你能想象出比这个更复杂的构建场景。然而,如果没有一个好的抽象,如果我们的任务是并发地运行构建任务,即使使用这样一个人工简化的构建脚本,我们也要做相当多的工作(清单2)。你觉得这个怎么样?
清单2:使用异步函数的构建脚本的并发版本
withPool { /* We need asynchronous variants of all the individual build steps */ def aCheckout = checkout.asyncFun() def aCompileSources = compileSources.asyncFun() def aGenerateAPIDoc = generateAPIDoc.asyncFun() def aGenerateUserDocumentation = generateUserDocumentation.asyncFun() def aPackageProject = packageProject.asyncFun() def aDeploy = deploy.asyncFun() /* Here's the composition of asynchronous build steps to form a process */ Promise projectRoot = aCheckout('git@github.com:vaclav/GPars.git') Promise classes = aCompileSources(projectRoot) Promise api = aGenerateAPIDoc(projectRoot) Promise guide = aGenerateUserDocumentation(projectRoot) Promise result = aDeploy(aPackageProject(classes, api, guide)) /* Now we're setup and can wait for the build to finish */ println "Starting the build process. This line is quite likely to be printed first ..." println result.get()}
线路保持完全相同。我们只通过asyncFun()方法将原始函数转换为异步函数。另外,整个代码块现在被包装在GParsPool.with池()块,以便函数有一些线程用于它们的艰苦工作。
显然,在asyncFun()方法中发生了一些神奇的事情,允许函数异步运行,并且在它们需要彼此的数据时进行协作。
细节之美
本质上,asyncFun()将原始函数包装到新函数中。新函数的签名与原始函数的签名略有不同。例如,原始的compileSources()函数将字符串作为参数并返回字符串作为结果:
String compileSources = {String projectRoot -> ...}
新构造的aCompileSources()函数返回字符串的承诺,而不是字符串本身。此外,String和Promise
PromiseaCompileSources = {String | PromiseprojectRoot -> ...}
信守诺言
Promise接口是几个GPars api的基础。这有点类似于java.util.concurrent文件.Future,因为它表示一个正在进行的异步活动,可以通过其blocking get()方法等待和获取结果。两者之间最重要的区别是,与Java的未来不同,承诺允许非阻塞读取。
promise.then {println "Now we have a result: $it"}
这允许我们的异步函数仅在计算所需的所有值都可用时使用系统线程。因此,例如,只有在类、api和guide局部变量都绑定到结果值之后,打包才会开始。在此之前,aPackage()函数将在后台静默地等待,没有计划也没有活动(图2)。
现在,您应该能够看到构建块是多么完美地结合在一起。由于异步函数返回并接受承诺,因此它们可以按照与同步原始函数相同的方式进行组合。函数组合的第二个也是可能更突出的好处是,我们不必明确指定哪些任务可以并行运行。我敢肯定,如果我们继续将任务添加到构建过程中,我们很快就会失去关于哪些活动可以安全地与哪些其他活动并行运行的全局视图。幸运的是,我们的异步函数将在运行时自行发现并行性。在任何时候,所有准备好参数的任务都将从分配的线程池中获取一个线程并开始运行。通过限制池中的线程数,可以设置并行运行的任务数的上限。
数据流在哪里
现在,我们已经准备好进行今天的集合中最有趣的抽象—数据流并发。在上一个示例的基础上,我们现在继续。为了构建多个项目,我们将反复运行构建脚本。如果您愿意,可以将其视为未来构建服务器的初始阶段。各种项目的构建请求将通过管道传入,我们的构建服务器将在系统资源允许的情况下依次处理它们。
您可能会尝试最简单的解决方案—为每个传入请求运行上一个练习中基于异步函数的代码。然而,这在很大程度上是次优的可能性很高。由于请求队列中堆积了多个请求,我们有机会获得更大的并行性。不仅同一项目的独立部分可以并行构建,而且不同项目的不同部分也可以同时处理。简单地说,不同项目的处理可以在时间上重叠(图3)。
顺其自然
这种模型自然适合于此类问题,称为数据流网络。它通常用于并行数据处理,如加密和压缩、数据挖掘、图像处理等。基本上,数据流网络由活动的数据转换元素(称为操作符)组成,通过异步通道连接。每个操作符使用来自其输入通道的数据,并通过多个输出通道发布结果。它还有一个相关的转换功能,它将通过输入通道接收的数据转换为向下发送到输出通道的数据。在这种情况下,操作符共享一个线程池,因此没有数据可处理的非活动操作符不会占用系统线程。对于我们的简单构建服务器,网络可以如图4所示。
我们每一步都有一个操作员。这些通道表示构建任务之间的依赖关系,每个操作符只需要一个系统线程,并在其所有输入通道都有值可读取之后启动计算。
清单3:使用数据流操作符的并发构建服务器
/* We need channels to wire active elements together */def urls = new DataflowQueue()def checkedOutProjects = new DataflowBroadcast()def compiledProjects = new DataflowQueue()def apiDocs = new DataflowQueue()def userDocs = new DataflowQueue()def packages = new DataflowQueue()def done = new DataflowQueue()/* Here's the composition of individual build steps into a process */operator(inputs: [urls], outputs: [checkedOutProjects], maxForks: 3) {url ->bindAllOutputs checkout(url)}operator([checkedOutProjects.createReadChannel()], [compiledProjects]) {projectRoot ->bindOutput compileSources(projectRoot)}operator(checkedOutProjects.createReadChannel(), apiDocs) {projectRoot ->bindOutput generateAPIDoc(projectRoot)}operator(checkedOutProjects.createReadChannel(), userDocs) {projectRoot ->bindOutput generateUserDocumentation(projectRoot)}operator([compiledProjects, apiDocs, userDocs], [packages]) {classes, api, guide ->bindOutput packageProject(classes, api, guide)}def deployer = operator(packages, done) {packagedProject ->if (deploy(packagedProject) == 'success') bindOutput true else bindOutput false}/* Now we're setup and can wait for the build to finish */println "Starting the build process. This line is quite likely to be printed first ..."deployer.join() //Wait for the last operator in the network to finish
这个模型应用于我们的问题的好处是,例如,当第一个操作符执行项目签出并获取项目的源代码时,它可以立即获取队列中的下一个请求,并在编译、打包和部署前很久就开始获取其源代码。
通过更改分配给网络中特定任务的操作员数量,可以调整系统。例如,如果我们意识到获取源是一个瓶颈,并且假设硬件(网络带宽)仍然没有得到充分利用,那么我们可以通过增加源获取操作符的数量来提高服务器的吞吐量(图5)。
显然,您的调优选项比分叉大量使用的操作符要远得多。为了简单地说出一些其他的可能性,您可以考虑在网络的重复部分之间采用负载平衡方案,实施生产节流——可以通过同步通信通道,也可以使用类似看板的工作进度节流方案。对于某些问题,可以考虑数据缓存和推测性计算。
摘要
在深入了解Groovy并发之后,您可能处于一个很好的位置来进行更深入的研究。为了快速入门,请考虑遵循gpar的一个快速方法,我绝对建议您查看用户指南以了解更多详细信息。
如果你带着GPAR去兜风我会很高兴的。去享受并发性吧,因为并发性太棒了!
作者介绍