Sunday, July 22, 2012

BPEL SE (Open-ESB) Clustering Design

Architecture and Design Document of BPEL Service Engine Support for clustering & Failover

To achieve Scalability and High Availability for BPEL Service Engine running in JBI environment. A cluster is defined as a set of engines running in multiple JBI Environments running in Application Server Cluster. When a business process needs to be scaled to meet heavier processing needs, you can deploy Service Assemblies to BPEL Service Engine running on application server cluster to increase throughput.

To achieve transparent failover of running instances of the failed engine in cluster over to any live engine for continued processing.

How to Enable: When the app server is running in cluster mode, it sets some System properties indicating this fact. BPEL Service engine during initialization queries this property to discover engine running in cluster mode and starts behaving accordingly. There is no explicit flag on engine that user need to configure to enable BPEL SE Clusterig.
Engine Heartbeat Update Time: Each engine participating in cluster periodically updates database table to indicate its livliness. The perodicity of this update can be cofigured through property EngineExpiryInterval on BPELSE (refer this page  for engine configuratios).

Design Goals:
Enable installation of BPEL Service engines on Application Server Cluster.
Enable BPEL Service engine component installed on Application Server Cluster to be able to process incoming requests
The overall throughput of cluster should increase linearly (or almost close to) in relation to the number of engine instances in the cluster.
In the event of failure of any one the application server or engine, the other engines should continue processing new request. The in process (in-flight) requests of the failed engine should continue to be processed by any of the live engine.
Support business process with instance correlation.
Single database will be used for implementing this feature. Database will be common for all the BPEL service engines in cluster and all the service engines need to be configured to use same database
Database is considered to be highly available. The database failure will constitute single point of failure.
BPEL Service Engine uses Application Server System property com.sun.jbi.isClustered to determine if the engine is participating in cluster environment. This property is extended by Glassfish Application Server. For other application server user need to set this system property for clustered environment.
It is assumed that BPEL Service engines are running in multiple JBI runtime environments that in turn are running inside distinct application server instances which can be on the same machine or different servers. Clustering support is not available for Multiple BPEL Service engines within same JBI environment on one Application server.
Load balancing will be specific to protocol/binding component in use.
The load from the failed engine will not be distributed equally to the live engines. Instead, in the event of failure of one of the engine, any one of the live engines will take over all the instances of the failed engine. Since the current design does not distribute the load across all the live engines, there is possibility of engine overload in the event when huge number of instances that needs to be failed over.
The servers in a cluster as assumed to be on the same time-zone.
Current Limitations:
Cluster support is not available for business process that contains event handlers.
For reliability and recovery, BPEL Service engine persist the state of the instance as soon as the non-replay able activity is executed. In the event of crash, upon restart of the BPEL Service engine, the persisted state can be loaded back in the memory and the execution continues from the last persisted point. Clustering support of BPEL service engine is leveraging engine's persistence and recovery feature.

An important aspect of clustering support is high availability, which in essence means that in the event of failure of any of the application server instance/BPEL Service Engine of cluster, continue processing new and already in process requests by the live engines, and also transparent failover of in-process instances of the failed engine (by any of live engine) for further processing. Also, In order to support correlation based projects where in multiple requests need to participate in stateful interaction with same instance, mechanism needs to be created that the correlated message(s) are able to join the already created instance.

Multiple design approaches were considered primarily to solve the correlation feature of business process definition (see appendix for more details on other options considered) including Intelligent Message Router, Message Routing and Engine leasing with instance activation/passivation mechanism.

Current implementation is based Engine leasing and instance passivation/activation mechanism and was chosen for simplicity of design and implementation and also to leverage already robust persistence and recovery support for BPEL Service Engine. See Appendix A for details on other design considerations.

Design Details:

Clustering support for BPEL Service engine entails design for the following two main features and are discussed in the subsequent sections:

Failover Support
Correlation Support
Correlation Support is further discussed in following sub-sections for various types of Inbound Messaging Activities (IMA) as defined by the BPEL Specification and use case scenarios.

  • Support for Multiple Receives
  • Support for Out of Order correlated messages
  • Support for Pick Activity with no on-Alarm defined
  • Support for Pick Activity with Alarm defined
  • Support for Multiple Messaging Activities (IMAs and Invokes)in Same Flow Activity
  • Support for two way invoke to Sub Business Process containing Correlated Messaging Activities (Receive/On-Message)

The design of engine support for clustering is based on central common database used by all the active engines participating in a cluster. This database is assumed to be scalable and highly available. BPEL Service engine will use Application Server data source JNDI resource (therefore underlying connection pool) for its persistence and recovery and cluster related operations.

Before installation of BPEL Service engine on Application Server cluster, user needs to create a JNDI resource and pass this to engine configuration during installation. Please refer this page  for the details on how to configure data source for currently supported databases. During installation of BPEL service engine on Application server cluster engine will check if the persistence schema is created, if not, it will create the persistence schema and register itself in the ENGINES table. (For details on persistence schema refer this page ).

Failover Support
Each engine actively updates its lease using a heartbeat signal to ENGINE table of this common database. After updating the lease, active engines would query this table (ENGINES) to find out if any engine has not updated its lease in the specified (configurable) interval. If any such engine(s) is (are) found whose last lease is expired, that engine(s) is deemed failed and any active (running, in process) instance of that failed engine will be acquired by the first querying (live) engine. These instances will then be loaded in the memory for further execution.

BPEL Service engine does not create any separate (daemon) threads to updating its lease and recovery of the failed engines instances, but uses the same thread pool (configurable parameter MaxThreadCount) used for processing incoming requests on the DeliveryChannel. On no loaded engine (no in-coming requests) these thread block on the delivery channel for a period of next lease update time, which is currently set to 60% (non-configurable) of engine expiry interval (configurable).

As part of optimization, the process of identifying the failed engine and loading of instances in memory (if any, of failed engine, if found) is not done in one single SQL, but rather in following independent steps.

Overall the following steps are involved in lease update and assumed failover.

If the time elapsed since last update of lease is within 60% of engine expiry interval

Update engine lease.
Update the dangling instances (running instances of failed engine) of failed engine with this (querying) engine's id and return the update count.
If count is greater than 0 (which means, there were running instances of failed engine, updated with this engine's id), do the recovery.
In the recovery call, query all the running instances of this engine from database. From this list removing the instances in-memory would result in the remaining instances. Load the instance data and schedule them for recovery (further execution).

Correlation Support

Figure 1 : Instance Passivation and Activation. Correlated Messages (M2) arriving on different Engine (E2) at later time (T2) than the engine that created the instance (Engine E1 for Message M1 at Time T1)

As mentioned in the overview section, the current implementation of clustering support of BPEL Service engine does not use any intelligent routing of the messages. This in essence means that for a project defined with correlation, the correlated message(s) for the same instance may in fact end on different engine than the one that created the instance. Hence we have two options, either to route the correlated message(s) arriving on different engine to the engine which created the instance, or route the instance to the engine that received the correlated message. We chose the later, again for simplicity of design and implementation.

The design and implementation further explained using the following sub-sections

1. Support for Multiple Receives

a) The messages (first message and also the correlated message) get distributed randomly across the engines.

b) The IMA with defined attribute createinstance as true would create the instance and continue the execution till the execution hits the point where it reaches the correlated IMA, at this point one of two conditions can happen

i) The correlated message is already available at this engine, it will be consumed by the instance and further execution will continue.

ii) It is not. The instance will be *PASSIVATED* (see Figure 1 above), i.e. instance will be marked in the database, the correlated waiting IMA registered and the instance removed from memory. The correlated message arriving on any of the live engine will fist try to find the instance to correlate in memory based on the correlation value calculated (as per process definition). If instance is not available in memory, engine will query the database and if instance is available in the database for the correlation key values and also if message's IMA matches the waiting IMA registered for the instance during passivation, the instance will be loaded in the memory. This ensures that only the correct incoming message (of multiple correlated IMA that might be defined) is handled for processing and not one of other messages (IMA's defined down-steam in process definition) where the process execution has not yet reached. Once found, the instance is loaded in memory and the execution continues. We call this process of finding and loading the instance as ACTIVATION.

2. Support for Out of Order correlated messages

a) Since the messages arriving on the BPEL service engine might be out or order, i.e. the correlated message for an instance may in fact arrive before the message that actually created the instance on the same or different engine (another case when message arrives on other engine even if instance is created, but not persisted/passivated yet); these cases need to be handled as well.

b) Such (out-of order) messages are stored in a special data structure and this data structure is checked periodically to see if contains correlated message events, if it does, query the database to get matching instances (based on correlation id and matching IMA type on passivated instances). If any such instance(s) are found, engine would acquire ownership (see section Failover Support above for details), ACTIVATE the instance and process recovery for further execution.

c) At some point the message that create the instance arrives on some engine and the instance would be created and this instance upon reaching the correlated IMA will first check this data structure to find out-of-order correlated message, it still not found, would passivate the instance.

d) Such passivated instances, will be found by the above defined periodic poll (step 2.b) by the engine that got the out-of-order correlated message.

e) The polling for out-of-order message is not done by any special thread, but tied to the engine lease update thread (see failover support above).

3. Support for Pick Activity with no on-Alarm defined

The pick activity, as defined by BPEL 2.0 Spec, waits for occurrence of exactly one event from a set of event then executes the activity associated with that event. Pick activity is comprised of set of branches, each containing an event-activity pair. The event can be of on-Message type (similar to receive activity) or an on-Alarm event. An on Alarm is timer based event. Pick must have at least on on-Message activity defined. The on-Alarm activity on Pick is optional.

For a business process that does not have on-Alarm defined, the behavior of pick is same as that of multiple receives. The above design and discussion (Section 2) also applies for pick with no-on Alarm defined.

4. Support for Pick Activity with Alarm defined

A pick activity defined with on-Alarm poses special challenges to design and implementation. This is due to the fact that no only the correlated message can arrive on any of the engine, the time for on Alarm starts as soon as the pick activity enters execution. In the event that the time expires before any of the on-Message events were to happen, the execution will chose the on-Alarm branch. Hence, we need to keep track of the on-Alarm(s) timer(s) in conjunction with the messaging events. The design also takes care of the case where while the on-Alarm is active, the engine might crash.

a) During execution when a pick activity with correlated IMA (on-Message) is encountered a special timer type object3 is created for each on-Alarm defined and scheduled with the defined alarm duration. The instance is passivated (see discussion for section 1 for details)

b) If on-Message, if defined, were to arrive on any of the engine before the expiration of on-Alarm, the instance is activated and the processing continues.

c) The case where in the instance is passivated and correlated on message event has not arrived and also on-alarm not expired and the engine (that has on-Alarm timer) crashes is also handled. During of recovery of such passivated instance, all on-Alarm special timer type ojject3 are reconstructed for the remaining duration (non-expired) and scheduled in the memory such that the pre-crash state of the instance is constructed in memory.

5. Support for Multiple Messaging Activities (IMAs and Invokes)in Same Flow Activity

a) In the case of the business process containing multiple messaging activities (receives/onMessage or invokes) on different branches of same flow activity, the instance will not be passivated (during IMA execution, for the absence of messaging event) if any of the messaging activity (on another branch of same flow) is under active execution. Only when the messaging activity completes will the instance be allowed to passivate. Refer the following for the receive (Figure 2) and invoke (Figure 3) cases in flow.

b) The Figure 4 pertains to special Business Process Instance Thread created and put on ready to run queue for instance passivation when the instance cannot be passivated because of another flow branch in active execution of messaging activity. The IMA unit under execution of the flow branch is put on waiting state. This BPIT an 2000 d is is periodically checked by the engine execution threads and when the active IMA count is zero, once more a check is done to see if the message for the event(s) for this flow branch arrived on the engine. If message exists, the IMA unit (Receive or Pick) is played again. If not, then the instance is passivated and cleared from memory.

Figure 2 : Flow Diagram for Execution of ReceiveUnit and PickUnit inside a Flow Activity in Cluster (case 5)

Figure 3 : Flow Diagram for Execution of InvokeUnit inside a Flow Activity in Cluster (case 5)

Figure 4 : Special Business Process Instance Thread for Instance Passivation in Cluster (case 5)

6. Support for two way invoke to Sub Business Process containing Correlated Messaging Activities (Receive/On-Message)

In case the sub business process instance is executed in another engine (as a result of passivation and activation mechanism) when the reply is encountered, using the CRMP mechanism the response object is persisted in the database. The invoking business process (for clustered cases only) would create a special Business Process Instance Thread and would periodically (currently tied with the heartbeat updated thread) check for the response object for the two way invoke. When the response is available in the database, the response would be constructed directly from the database and further execution of the parent process will continue. The Sub BP can continue execution after persisting the response object.


A. Alternate Design Choices
Intelligent router design


Best design for clustering. The way it should be. Obviously overcomes all technical idiosyncrasies of any alternative solution.

Router sits outside of the BPEL SE and it is a dependency on some other teams and projects.
Load Balancer is specific to protocols, Messaging protocol, http, file protocol and so on. This could be an issue during deployments.
Deployment complexity involved. Can't foresee how much, but compared to the current design this will definitely be more.
Primary disadvantage is that we rely on other teams for this to work successfully which is the main deterrent here. If we can overcome this challenge and get all the teams to work towards this that is ideal (again).
Message routing

We have talked about this a few times but never ventured deep enough to see its feasibility (or its ugliness). We always averted this solution because of the lack of simple transactional support.

This design makes use of the existing DB as the message pass through. Jerry suggested similar design using a queue. Both of them are very similar but with different message pass through mechanisms.


In terms of execution of an instance it is similar to the intelligent router design, which is the way it should be in cluster.
Agnostic of an external router, from a project feasibility perspective this is good.

Transactional support for correlating messages is no longer simple if the correlating message needs to be routed. Also remember that we only support 1 ways and not ways in clustering. If we want to support 2ways we go for CRMP and in that case the transactional issue we talk about is no longer an issue.
Timeouts of transactions could be an issue here, depending on the implementation choices we make to enable the engine communication through DB.

Assumes another table say CLUSTER_MESG_INFO (clob, source, destination, status) Status can have values

- send = message sent from one engine to another but not yet consumed by the destination engine
- done = message consumed by destination and is successful in consumption
- error = message consumed by destination and is resulted in a failure
- closedSuccess = source engine closed the entire transaction as part of its transaction
- closedFailure = transaction commit failed and rolledback

Flow Chart

Different (Engine) Failure Scenarios

Sunny day case:
- A sunny scenario is possible to do without any issues. As part of E1's receive activity's (consuming M2) persistence we mark the completion of the communication by logical deletion of the row in CLUSTER_MESG_INFO.
Case 1: If Engine 2(E2) fails (system crash) during the cycle of M2 consumption

If E2 fails before the status in CLUSTER_MESG_INFO is "send", then there is no issue
If E2 fails when the status is "send", then the failover logic for engine will set the status to "closedFailure" after E1 updates the status as "done" or "error".
If E2 fails when the status is "error", then the failover logic kicks in to take over the instances run in E2. As part of the fail over logic, which ever engine takes over, it should update the status to "closedFailure".
If E2 fails when the status is "done", need to consider 3 scenarios based on when E2 crashed
If E2 didn't read the "done" or E2 did read but didn't update the record with "closedSuccess"
If E2 did read, updated the record with "closedSuccess". E2 crashed before TM called prepare on the transaction.
If E2 did read, updated the record with "closedSuccess". E2 crashed after TM prepared the transaction but before commit.
One way to address this would be implementing time out. - on time out the fail-over logic will update the status to "closedFailure".
If E2 fails after the status is updated to "closedSuccess"/"closedFailure", then there is no issue.
Case 2: If Engine 1 (E1) fails during the cycle of M2 consumption

If E1 fails after E2 decides to route it to E1 and before it can insert a row in CLUSTER_MESG_INFO with status "send"???
Maybe have one more column in CLUSTER_MESG_INFO which points to the instance ID. Whichever engine owns this instance at after whatever time, will then look at this column to know that this message is to be consumed by the new engine for that instance
If E1 fails after E2 inserts a row in CLUSTER_MESG_INFO with the status as "send". Then the failover logic kicks in and the new engine that owns the instances of E1 will also reflect that the message in the inserted row is to be routed to the new Engine.
If E1 crashes after the status is "done". Failover kicks in and replays M2 consumption (but doesn't update CLUSTER_MESG_INFO with the status "done") and waits for the status to be changed to "closedSuccess"/"closedFailure".
If E1 crashes after the status is "error", it is not an issue.
If E1 crashes after the status is "closedSuccess"/"closedFailure" then there is no issue. In the case of "closedSuccess" Failover will consume the message M2 and proceed further in processing the instance.
Case 3: If E1 and E2 crash

If E1 & E2 fail before status is "send", there is no issue
If E1 & E2 fail when the status is "send" or "error", then it falls into Case1's similar situation where E2 crashes in similar state.
If E1 & E2 crash after the status is "done". We should in this case use the trick like "rollback" or "rollforward". Most likely "roll forward" taking an optimistic approach.
If E1 & E2 crash after the status is "closedSuccess"/"closedFailure", then there is no issue. This falls into Case2's similar situation when E1 crashes in similar state.

No comments: