Sunday, July 22, 2012

BPEL SE (Open-ESB) Scalability Support

Scalability Support for BPEL Service Engine

The complete solution for scalability of system is co-coordinated work of all the constituent parts; service engines, binding components and NMR. When the system starts maxing out heap space, the full GC kicks in and ‘pauses’ the JVM. This has debilitating effect on the system performance. A properly designed system is expected to know expected usage and provide enough resources for capacity utilization.
The specific problem in the context of business process orchestration involving interactions with external systems is the in-determinism in terms of responses for the outbound requests. When that happen the business process instances continue to hog memory and new requests for such processes would only aggravate the memory situation.
Since BPEL SE is the only component currently in the open-esb that can be configured to be used with database, we can leverage this to provide scalability for long running processes. Also the BPEL SE scalability solution needs to be thought in conjunction with its recovery and monitor capabilities. In my opinion, the best solution is to have very large heap space and nothing else would be required. But, if that cannot happen, at least with 32-bit boxes, and also for other reason s scalability solution as outlined here for BPELSE can help.
The scalability solution can be called in phases based on early detection and also following some patterns of business process behavior. The complete solution may be complex, but taking a trivial approach may not solve most (if not all) the use cases. Since the primary mechanism for scalability solution of BPEL Service Engine is passivation, there will be some overhead associated with that happens. Also, in order to minimize the overhead and not be overly aggressive about it, attempt will be made to do the passivation in phases with each phase providing incremental relief.

Figure 1: Phase 1 (Variable) and Phase 2 (Instance) Passivation for Phase 2 Scalability Solution

Configurations and Factory Defaults

Whenever the persistence is on, the Scalability Solution (Phase-1 and Phase-2) is enabled by default. Using system properties the solution can be turned off. Also, the phase-1 and Phase -2 solution can be selectively turned off. In addition there are other system properties that allows for configuring the memory levels and the age criterion used by the scalability for memory release. Please note that default values for all these properties are already supplied. Here are the System Properties to use and the explanation thereof.

Scalability Levels (Default=phase2)(Valid options - none, phase1, phase2) For example to turn off Scalability: com.sun.jbi.bpelse.scalabilityLevel=none
Upper Memory Threshold (% of Total Heap)(Default 75%). For Example to set it to 85% of heap size: com.sun.jbi.bpelse.scalability.upperMemoryThreshold=0.85
Lower Memory Threashold (% of Total Heap)(Default 50%). For Example to set it to 60% of heap size: com.sun.jbi.bpelse.scalability.lowerMemoryThreshold=0.60
To Configure memory release age criterion to start with (Default 2 Seconds). For example to set it to 1 hour (enter value in seconds): com.sun.jbi.bpelse.scalability.idleThreshold=3600
Recovery Batch Size (Scalability need to be turned on)(Default=50). For recovering large number of instances in batches. For example to set it to 25 : com.sun.jbi.bpelse.recoveryBatchSize=25
Experiments to Analyze BPEL SE Scalability Issues also post Phase-1 and Phase-2 Improvement Measurements
Details can be found here.

Design Overview

Phase 1: Variable Passivation

When the free memory falls to certain levels of max heap size, identify all the instances that are waiting for any of the massaging activity (response, status) and wait activity for some defined period of time (hard coded value of 10 minutes, can be made configurable). Scan through these qualifying instances and if the variables are dirty, store them in the database (same variable table with special mark, if not dirty, they are already persisted due to last persistable activity) and de-reference all the variables. Such instances (in-memory) will be marked scalability passivated to avoid the scalability thread from visiting them again. When the messaging event or timer goes off for waits, the instance during its execution will load the variable as and when needed, if the variable is marked as passivated. When that happens, the instance would be marked back as non-scalability passivated.
If the available free memory does not meet this threshold limit, then halve the time duration for identifying the waiting/held instances and do the 1 above again. Repeat this till the wait time is zero.

Phase 2: Instance Passivation

If Phase 1 does not yield the required free memory (i.e. all the variables of paused instances are knocked off), Phase 2 scalability solution will be called.

Start removing the instances that are not not moving forward (due to defined wait, waiting on status/response or correlating receive) completely from memory. Note that these instances are already persisted in the database. Before, removing these instances, appropriate information (please refer design details below) is stored in database. These instances can be resurrected back, when needed.
Again, as for Phase 1, the passivation will happen on time inverse scale with the instances waiting for most of the time will be taken out first before removing the newer such instance. The only exception to this rule is for waiting instances (waiting for the defined wait) where there is predictability of wait, hence the instances that are going to wait the most (based on the alarm expiration) will be taken out first.
In addition to providing support for scalability for BPEL engine, we need to take care of the scalability of recover/Monitor and Fail-over in the case of cluster.

Recovery Scalability

Without the scalability solution, during recovery the bpel service engine loads all the instances in the memory, and try to recover them all. If there are very large number of instances to be recovered, engine may run into out of memory issues.

When there are very large number of instances to be recovered, the instances will be recovered in batches with check on available memory before scheduling next batch. If the available memory falls below as defined by phase/level 1 solution (refer related integration notice), the level 1 scalability solution will be called to free up memory. If enough memory is freed, the recovery will continue, otherwise the recovery will paused and will be periodically visited to finish up the recovery work.

Fail-over Scalability (Cluster Case)
With this solution in place, BPELSE in cluster performs fail-over by acquiring all the instances of the failed engine(s). When the engine is running with low available memory (heap space), this could be result in destabilizing engine/system. The scalability solution enables the engine in cluster to fail-over the instances without causing memory issues and also better distribute the fail-over work amongst the live engines.

Monitor Scalability

BPELSE Monitoring API provides BPEL process instance query that may return thousands of instances and runs a risk of memory exhaustion. The scalability design for monitoring takes it into account and imposes maximum instances restriction (1000) in conjunction with with sorting and ordering to help user identify the relevant instances.User can specify a lower maximum instance count (<1000) and sort on startTime, endTime, and updatedTime in either ascending or descending orders.
Limitations of current solution
If the defined process is synchronous process (receive-reply), the instances cannot be passivated while in the middle of execution of receive-reply. Hence, if the BPEL engine runs into memory issues do to inordinate wait of request-reply, this solution cannot help. The reason why we cannot passivate the instances in the middle request-reply is due to fact that by passivating the requester reference (Message Exchange) will be lost (currently, the ME cannot be passivated).
This solution assumes that the synchronous processes do not take long time to complete. One way to ensure scalability for Synchronous outbound invocations to external web service (that can take long time to complete), is by configuring throttling property on the connection. Throttling can limit the number of simultaneous outstanding requests at a given time, hence limiting the memory usage.

Design Details:

During BPEL Service Engine initialization an instance of BPELMemoryMonitorImpl class is set on the engine. This class provide utility methods to do the memory levels checks and also defines the Phase1 Upper and Lower Free Memory ratio limits.
When the free memory falls below the threshold as defined by Lower Free Memory Ratio, the phase 1 solution will be called first. If phase 1 solution does not succeed in releasing enough memory to meet this threshold, phase 2 will be employed.

Phase 1 Scalability Solution

When scalability is enabled, the main entry point for Scalability Solution is method doMemoryMangement() on EngineImpl. This gets called during processing of any request by BPEL SE.
The current memory levels are checked; if enough free memory (as defined by Phase1 Free Memory Ratio) is available, this method returns.
If the memory level check fails, recursing through all the instances of deployed processes, a call is made to passivate variables of the instances not moving forward (due to wait/corelateable receive/pending status/pending response).
This variable passivation is also done in phases, where in during the initial call only instances that are waiting for 10 minutes or more is picked for variable passivation.
Even during variable passivation for a given time criterion for a business process, the memory levels are checked while iterating though the identified instances. This ensures that just enough variable passivation happen and we don't aggressively do this passivation.
During execution of process instance, it pass through various persistence points. At these persistence points the state of the process instance along with all the dirty variables gets persisted. In order to take advantage of this fact, the variables used by the process instance will maintain the information regarding the fact that the variable is dirty (whenever it gets changed) or not (no change since last persistence).
The persistence schema is changed to add a new column SCALABILITYPASSIVATED for the variables table.
During phase 1 scalability passivation of instance variables, if the variable is found dirty, it will be saved in the database, with SCALABILITYPASSIVATED marked as 'Y', otherwise the variable data (payload) will be de-referenced (and hence removed from memory) and variable marked as Scalability Passivated.
When all the business processes are recursed and variable passivation for time value of 10 minutes returns, another memory check is made. If still enough memory is not available, the time interval is halved (5 minutes) and step 2 is called again. This process repeats till the time period is zero.
If even after de-referencing all the in-memory variables of the held process instances, still Phase 1 free memory criterion is not met, phase 2 solution will be called.
As a result of Phase 1 Scalability Solution call interleaving the regular persistence calls, process variables can have various states. Each variable maintain three state flags - namely:

Inserted - When the variable is created this is FALSE. If the variable is inserted into database due to regular persistence point during process execution (not Scalability Passivation) this changes to TRUE.
Persisted - When the variable value is changed (dirty), this is FALSE. When the variable is persisted, it is marked TRUE.
Passivated - By default this is FALSE. When the variable is scalability passivated, this changes to TRUE.
Also, the persistence and scalability call on the variable would result in altering the above state of the variable. Following figure explains the various states and shows all the possible combinations of the before and after state as a result of scalability/persistence and in-memory read/update calls.

Figure 2: Variable States for Phase 1 Scalability Solution

Phase 2 Scalability Solution

Since the phase 1 solution (above) could not make enough free memory available, more aggressive action will be attempted to free up memory. In this phase all those instances that are not moving forward (same as identified by phase 1) will be progressively passivated, starting with most aged ones.

Passivation of instances held due to wait activity

  1. Recurse through all the waiting instances and passivate them.
  2. The passivation will be gradual starting with time criterion of instances that need to wait for 10 minutes or more. If this does not produce enough free memory, this time will be reduced by half and the processes will be repeated again repeatedly with memory check between each iteration.
  3. The idea is not to be aggressive with passivation and do only enough passivation.

Passivation of Instances held due to pending response for outstanding two way invokes


  1. When the scalability call is made to passivate instances pending on response, persist the msg ex id to inst id in a new table, OUTSTANDINGMSGEX which will keep this info.
  2. When the response/status happens, the instance will be identified by this table and scheduled for recovery.
  3. When the execution reaches invoke, it will construct CRMPID, at this time this CRMPID will be checked to see if there is any response available for this. For this case there will be one, hence no new invoke will be made and the invoke unit with further process the response message.

Passivation of Instances held due to absence of message for correlating IMA (receive/pick)

This relates to passivation of instances that are held in mRequestPendingInstances queue for waiting for the correlatable message for further execution.

  1. The instances waiting on mRequestPendingInstances that qualify for phase 2 passivation would be removed from memory.
  2. Currently, the correlation values for the IMA are not persisted along-with persistence of the IMA. The persistence point for correlation values being next persistence point. Since the instance is scalability passivated, these correlation values will be persisted before getting rid of the instance.
  3. Also, if the engine is running in cluster mode, the cluster logic will passivate the instance when the execution reaches correlatable IMA and if the message for such IMA is missing. Since the could be chance that the scalability logic to be kicking same time, the scalability passivation (for engines in cluster) will use the same passivate instance to do scalability passivation. This ensures that the scalability logic will not interfere with cluster logic of passivation and that the passivated instance can be executed by any engine. The cluster passivation api also persists the correlation values. For single engine case, only the correlation values will be persisted. Also note that for single engine case the waiting IMA table need not be populated (as done for cluster)
  4. When the correlating message arrives on the engine, the instance will be activated (recovered from the database). A new api on state manager will return the instance id for the given correlation values. Note for cluster activation uses different api that in addition to finding instance for recovery, does the ownership transfer of the instance as well.

Not to scalability passivate instance if there exists active (outstanding) request for in-out message

If the instance execution is in the middle of active request-reply, the instance should not be passivated (even if the instance qualifies for scalability passivation, due to wait/outstanding response/status) as in doing so the requester context (message exchange) will be lost. In order to scalability passivate such processes (that are defined with receive-reply) user need to configure the throttling property on the service connection (to be available on CASA editor, work under progress)


  1. Currently the outstanding requests are maintained as part of the context object (IMAScope). One such IMAScope is constructed when the process instance is created and also for each scope thereafter.
  2. For scalability support, such IMAScope's will be added to a list maintained at process instance level.
  3. Two new apis on process instance would allow the IMAScopeBride to be added to process instance.
  4. Whenever an instance need to be scalability passivated, a check using the new api on ProcessInstance would return existence of such outstanding request at any given point during execution, and for positive result the instance would not be scalability passivated.

BPEL SE Monitoring in conjunction with Scalability (instance suspend and resume functionality)

Splitting of ready to run queue introduced issue of scheduling of instance even if marked suspended. After the split the instances on waiting queue are not checked for suspended status causing this issue. Also, to support the instance suspend and resume in conjunction with scalability passivation (phase 2) following design will be used:

  1. When a waiting instance (due to defined wait) is identified for scalability passivation, the suspended flag is saved in the bpit (along-with the instance id) and the callframe is dereferenced (for releasing instance and associated memory).
  2. During normal execution the locally cached waiting bpit is first checked for timer expiration. But, before this check is made, the instance is verified that is not marked for suspension. If it is marked for suspension, the next expired and non-suspended waiting bpit is picked from the waiting bpit collection maintained at ReadyToRunQueue class.
  3. When an instance (which is suspended earlier) is called resume from monitor, also if the instance is scalability passivated, the same is activated (recovered) from database and scheduled again for execution.

Splitting ready to run queue - providing a special collection for waiting business process instances

At various points during the execution of a process instance a special instance wrapper Business Process Instance Thread (BPIT) is created and put on a queue (LinkedList in ReadyToRunQueue class). This same queue is also used for adding BPITs waiting for wait activity in the process definition. This current design has following pitfalls:

  1. All the engine need threads need to do a full scan of this queue to get the least wait time before call accept on delivery channel with wait time (if any such instance(s) exist on the queue). This will happen even though there may not be any waiting BPIT on the queue. This means additional processing and would have performance implications for highly loaded engine with very large number of concurrent instances under execution.
  2. During execution for wait type BPIT the first expired BPIT is picked for execution, even though there might be another wait BPIT which happen to have expired even earlier.

When instances that are waiting on ready to run queue (due to wait activity) are scalability passivated, keeping them on the regular queue will have performance implications. Although this is strictly required for regular execution, but found benefit of have two queues, hence added for regular execution also.


A separate data structure (a HashSet) is used to store the waiting BPITs.
The class ReadyToRunqueue will have local member variable which will store the BPIT with most recently expiring time.
Whenever a new wait type BPIT is to be added, a comparison will be made with this local object and if the new BPIT has more recent timeout, the new BPIT will be locally cached and the previously cached BPIT will be added to the set, otherwise the new BPIT will be added to the set.
Whenever a ready instance is requested for execution, this locally cached BPIT will be checked for expiry, if expired the same will be returned. There is no need to scan the Waiting BPITs Set.

Failover Scalability Solution (BPELSE in Cluster)

NOTE: This support is only added for Oracle as the equivalent of rownum function (that allows limiting the resultset for a update query) is not available for Derby at this moment. Also note that this feature for Derby is being worked on and planned for release early next year. Once this is available support will be added for Derby. Clustering will still work with Derby though, only scalability of failover support will not be available.


Currently, the engine used GUID for engine id and every time the engine restarts it acquires a new id value. It will be changed to use static engine id. The cluster’s instance id will be used as engine id. The advantage of using static engine id is that prevent the engine entries from growing in the engine table and also it will also allow engine to load its instances in the event of crash followed by restart (Note: It is not necessarily required that the engine upon restart load its instance, but wont hurt if it does). Since, currently we don’t support multiple bpelse in same cluster instances, using cluster’s instance id as engine id should not cause any issue.
During engine start, engine will update the heartbeat using the static id, success of the call indicate that id already exists, otherwise, this engine will be
registered in the database against the static id (instance id of cluster).
Currently in cluster mode, each engine after heartbeat update queries for the failed engine(s), and if found, acquires the running instances of such engine(s).
The change would be to pass the BATCH_RECOVERY_SIZE (default value 50, Can be changed using System property ‘com.sun.jbi.bpelse.recoveryBatchSize’; also used for batch recovery for single engine case) and the engine during this check would only acquire batch number of such instances of the failed engine(s).
The update return count for the above call to update the dangling instances will provide hint that the failover of instances completed or not (a return count equal to BATCH_RECOVERY_SIZE indicate there could be more).
After acquiring the instances for failover, engine would call recover methods to load the instance data (from database) into memory. If the return count was equal
to BATCH_RECOVERY_SIZE, the update dangling instances call will be made recursively till the return count is less than BATCH_RECOVERY_SIZE.
If the failed engine were to come back up, it will be able to acquire its remaining running instances prior to fail over (assuming not all the instances of this failed engine are already taken by other live engines in the interim).

No comments: