• -------------------------------------------------------------
  • ====================================

Activiti 浅谈并发处理

技能 dewbay 5年前 (2019-04-12) 2647次浏览 已收录 0个评论 扫描二维码

Activiti 版本 5.10

使用activiti 有一段时间了,目前使用activiti 的大部分公司都是用来做类似于 OA 等以用户任务为主的流程,
这我没什么好说的,因为我们的流程是以 ServiceTask + UserTask 结合来处理定时调度等数据处理任务。
ServiceTask 以主,采用 class 和 Spring bean 的方式。废话补多少,切入正题:

Activiti 5.10 设计器是支持 delegateExpression 的注入参数的,但activiti 引擎的解析器却未能将参数注入从 Spring 得到的 bean,该问题在 5.11 的版本上是得到解决了的,但 5.11 版本目前还没有发布,有兴趣的同学可以去下载正在开发中的代码进行研究,或者修改源码

然后我们来谈谈 Activiti 对于并发的处理以及其中的问题(以 ServiceTask 为例):
当我们将 serviceTask 设置 async = “true” (关于 isExclusive 后续会提到) 的时候,流程引擎采用 JobExecutor 来异步执行,执行顺序为引擎首先会将该任务实例化一条 job 记录,插 act_ru_job 表,然后 JobExecutor 扫描该表并加锁执行该 job,这里就涉及到定义 ServiceTask 的另外一个属性 isExclusive,这个属性默认为 true,即同一流程中当前存在于 act_ru_job 且 is_exclusive 为 true 的会一起取出来,放入一个 AcquireJobsCmd,然后放入一个线程执行,这样做用来保证该批任务时串行执行的,使用相同的 context,这样做没有什么问题,
但不能达到真正并行的目的。
附上我们的流程图:

Activiti 浅谈并发处理

目标,处于 parallelGateWay 后面的任务并行执行,即任务完成的时间为单个任务完成的最大时间。
每个 ServiceTask 的 acitiviti:async = “true” activiti:exclusive=”false”
运行,这时你会遇到一个 ActivitiOptimisticLockingException 异常
   (toString(updatedObject)+” was updated by another transaction concurrently”);

为什么会这样呢?因为每个 ServiceTask 对应一条 act_ru_execution 表的记录,当该任务完后后,会去跟新其
parent_id 对应的 execution 将其版本 +1 ,
  update ${prefix}ACT_RU_EXECUTION set
      REV_ = #{revisionNext, jdbcType=INTEGER},
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
      ACT_ID_ = #{activityId, jdbcType=VARCHAR},
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
     where ID_ = #{id, jdbcType=VARCHAR}
      and REV_ = #{revision, jdbcType=INTEGER}

跟踪该异常的抛出原因是 在 serviceTask 完成后,更新的 ExecutionEntity 是同一条记录,而每一个 serviceTask 此时处于
两个不同的线程和事务当中,两个事务彼此不可见,任务开始时获取的 ExecutionEntity 完成相同,当一个事务成功更新后,
另一个事务就会失败。这样保证了流程的准确执行,当该任务失败后,会在下一个 JobExecutor 扫描时重新执行。此时获取的
execution 的版本已经加 1,此时任务正常结束。Activiti 引擎如此做有一定的道理,但这不是我要的。为什么这样说呢?
假设我两个 serviceTask,每个执行都需要 30 分钟,仅仅因为这样,我就需要花费 1 个小时的时间才能完成,天啊,饶了我吧!

有木有办法,我想是有的,只要你够大胆。跟踪代码,更新父 execution 的时候,唯一变了的就是version,其他值都没变化,
那么我们是否可以将 update 语句更改一下,如下:
update ${prefix}ACT_RU_EXECUTION set
      REV_ = REV_ + 1,
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
      ACT_ID_ = #{activityId, jdbcType=VARCHAR},
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER}
     where ID_ = #{id, jdbcType=VARCHAR}

注意到这里的变化,where 条件只剩 id 去掉了 version ,然后 REV_ 使用表里面的数据直接+1 经测试完成可行,有兴趣的朋友可以自己试试。这里的关口过了,但后面仍然存在危险,所以修改源码是有风险的(%>_<%)。问题出在哪儿?有时候你会发现 两个 serviceTask 运行完了,Execution 表的记录也更新了,流程停滞不前了,这是神马原因???

最后终于让我问题出现的地方,ParallelGatewayActivityBehavior,我们前面说过,当每个 ServiceTask 执行完成之后,事务并没有到结束的地方,根据 ServiceTask 的流程指向来到了第二个 ParallelGateway,ParallelGatewayActivityBehavior 的作用就是判断前面的任务是否完成,是否继续执行,当每个 ServiceTask 所在的事务到达此处时,他们都只能看见自己完成的部分,而不能看见与他并行的事务里面的状态。所以当到达是否执行下一步的判断条件时 

if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {      // Fork      log.fine("parallel gateway '"+activity.getId()+"' activates: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");      execution.takeAll(outgoingTransitions, joinedExecutions);        } else if (log.isLoggable(Level.FINE)){     log.fine("parallel gateway '"+activity.getId()+"' does not activate: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");  }

都会告诉 ParallelGatewayActivityBehavior 我已经完成了,其他的还没有完成。
有人可能会说,为什么我的程序没有遇到这种情况呢?
第一 如果你不是 ServiceTask 任务很难遇到这种情况
第二 如果你的 ServiceTask 没有设置为 async = “true” 和 exclusive=”false” 也就不是真正的并发,当然也不会遇到
     还有其他原因我就不赘述了。

这种问题也是可以解决的,因为针对同一个流程,每一个事务都是通过同一个 ParallelGatewayActivityBehavior 实例来进行判断的,
我们只要记录每一个通过该 GateWay 的事务的完成情况,然后汇总起来就 OK 了,另外在完成的那一步需要将 executio 的 parent 的
executions 对应的更新,否则 execution 会有记录不能删除,但流程是可以完整的执行完成,给出我的完整处理方式:

public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {    private static Logger log = Logger.getLogger(ParallelGatewayActivityBehavior.class.getName());  private Map<String,ActivityExecution> activityJoinedExecutions = new ConcurrentHashMap<String,ActivityExecution>();    public void execute(ActivityExecution execution) throws Exception {         // Join    PvmActivity activity = execution.getActivity();    List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();        execution.inactivate();    lockConcurrentRoot(execution);        List<ActivityExecution> joinedExecutions = execution.findInactiveConcurrentExecutions(activity);    int nbrOfExecutionsToJoin = execution.getActivity().getIncomingTransitions().size();    int nbrOfExecutionsJoined = joinedExecutions.size();    if(nbrOfExecutionsToJoin!=nbrOfExecutionsJoined){	for(ActivityExecution e:joinedExecutions){	    activityJoinedExecutions.put(e.getId(), e);	}	nbrOfExecutionsJoined = activityJoinedExecutions.size();	if(nbrOfExecutionsJoined == nbrOfExecutionsToJoin && execution.getParentId()!=null 		&& execution instanceof ExecutionEntity){	    ExecutionEntity et = (ExecutionEntity)execution;	    while(joinedExecutions.size()!=nbrOfExecutionsToJoin ){		Thread.sleep(10000);		for(int i = 0 ; i < et.getParent().getExecutions().size(); i++){		    ExecutionEntity ct = et.getParent().getExecutions().get(i);		    if(activityJoinedExecutions.containsKey(ct.getId()) ){			et.getParent().getExecutions().set(i, (ExecutionEntity)activityJoinedExecutions.get(ct.getId()));		    }		    		}		joinedExecutions = execution.findInactiveConcurrentExecutions(activity);	    }	}    }        if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {	activityJoinedExecutions.clear();      // Fork      log.fine("parallel gateway '"+activity.getId()+"' activates: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");      execution.takeAll(outgoingTransitions, joinedExecutions);          } else if (log.isLoggable(Level.FINE)){      log.fine("parallel gateway '"+activity.getId()+"' does not activate: "+nbrOfExecutionsJoined+" of "+nbrOfExecutionsToJoin+" joined");    }  }}

露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Activiti 浅谈并发处理
喜欢 (3)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址