Introduction
Entry Processors (EP) allow you to create/update/delete a single cache entry at a time in a guaranteed atomic operation. What if you want to write an application that mutates multiple cache entries at a time in an atomic operation via an EP, and additionally you want to do this over multiple caches?
This is now possible in Coherence 3.7, which introduced the concept of ‘Partition Level Transactions’. This allows you to insert/update/mutate multiple cache entries in the same partition, via an Entry Processor (EP), in a single atomic operation.
When you run a “normal” Entry Processor (EP) against an entry you get a couple of things for “free”:
- Guaranteed once and only once execution of the EP against the data. On a node failure during execution if the EP did not complete, it will be re-executed on the primary copy of the data when the data is recovered.
- Implicit concurrency control while executing the EP. This ensuring only one EP can ever be executing against a particular key at a time. Lock free processing, very powerful.
Using Partition Level Transactions
In 3.7 you can now use BackingMapContext.getBackingMapEntry() to access another cache and any mutations will be added to the EP’s ‘sandbox’ of changes that will be guaranteed to be saved as an atomic operation. The pre-requisites for this are:
- The caches must belong to the same service.
- They must exist in the same partition. This is achieved by using data affinity. See Here for more information.
- You must ensure you do not access the keys out of order and cause a deadlock, although the sandbox will detect this and throw an exception.
It was possible to update multiple entries prior to 3.7 by accessing the BackingMap directly, but the atomic operation only covered the key you were updating, not the direct accesses to a different BackingMap.
See this video, which has a good explanation of this feature.
The Example
Consider the use-case where we have a Customer object with an order total attribute and a Order object that belongs to a customer and has an order total. What we want to be able to do is to insert the new order and at the same time ensure that the order total on the Customer object is always updated to reflect the new total.
Firstly we need our Order and Customer objects. As specified above we need to use data affinity (KeyAssociation) to ensure that all orders for a given customer exist in the same partition.
public class Customer implements Serializable {
public static final String CACHENAME = "Customer";
private int customerId;
private String customerName;
private String address;
private float balance;
// getters/setters etc
// Key class
public static class Key implements Serializable, KeyAssociation {
private int customerId;
public Key() {
}
public Key (Customer customer) {
this.customerId = customer.getCustomerId();
}
public Key (int customerId) {
this.customerId = customerId;
}
@Override
public Object getAssociatedKey() {
return new Integer(customerId);
}
// hashCode() , equals().. etc..
}
public class Order implements Serializable {
private static final long serialVersionUID = -1307260114314671367L;
public static final String CACHENAME = "Order";
private int customerId;
private int orderId;
private float orderTotal;
private long orderDate;
// getters/setters etc
public Key getKey() {
return new Key(this);
}
// Key class
public static class Key implements Serializable, KeyAssociation {
private int orderId;
private int customerId;
@Override
public Object getAssociatedKey() {
return new Integer(customerId);
}
public Key() {
}
public Key(Order order) {
this.orderId = order.getOrderId();
this.customerId = order.getCustomerId();
}
public Key(int orderId, int customerId) {
this.orderId = orderId;
this.customerId = customerId;
}
// hashCode() , equals().. etc..
}
Next we need to have our Entry Processor that, in my example runs against the order cache to create a new order and updates the customer cache object order total. My entry processor takes two parameters, the Order to create and an operation. The operation is either:
- null – Normal processing, no exceptions.
- sleep – Sleep for 10 seconds so we can simulate node failure.
- fail – Throw a RuntimeException to simulate processing failure.
The key item to note is on line 68 where we use getBackingMapEntry(). Using this will ensure that the modifications to the “Customer” cache is part of the sandbox of modifications.
package com.oracle.demo.partitionleveltxn.entryprocessors;
import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;
import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
import java.io.Serializable;
/**
* This entry processor will add a new order to a customer. The total orders for a customer
* will also be updated in the KeyAssociated Customer Cache using Partition Level Transactions
* feature introduced in 3.7.
*
* This entry process is run against a "soon to be created" Order.
*
* @author tam 2012.07.05
*
*/
public class AddOrderEntryProcessor
extends AbstractProcessor
implements Serializable
{
/**
* Used to add a new {@link Order} to a {@link Customer}.
*
* @param newOrder the order to add
* @param operation string value of:
* "fail" - for throw new exception
* "sleep"- (for sleep 10 seconds so we can kill cache server
* null - nothing, just let it complete
*/
public AddOrderEntryProcessor(Order newOrder, String operation)
{
super();
this.newOrder = newOrder;
this.operation = operation;
}
/**
* Add a new {@link Order} to a {@link Customer}. This method is called against the
* {@link Order} object and the {@link Customer} object will be implicitly locked.
*
* @param entry The customer entry to run against
*
* @return null
*/
@Override
public Object process(Entry entry)
{
// entry should not be present so lets set the value. E.g. create it
entry.setValue(newOrder);
System.out.println("In EP: " + newOrder);
BinaryEntry orderEntry = (BinaryEntry) entry;
BackingMapManagerContext ctx = ((BinaryEntry) entry).getContext();
// get the customer entry as a binary - this will be in the same partition
// by using getBackingMapEntry you are adding the changes to the sandbox
BinaryEntry customerEntry =
(BinaryEntry) orderEntry.getContext().getBackingMapContext(Customer.CACHENAME)
.getBackingMapEntry(ctx.getKeyToInternalConverter()
.convert(new Customer.Key(newOrder.getCustomerId())));
// get normal Object Value
Customer thisCustomer = ((Customer) customerEntry.getValue());
// simulated failure of entry processor, atomic operation should not be completed
if ("fail".equals(operation))
{
throw new RuntimeException("Artificial fail");
}
else if ("sleep".equals(operation))
{
System.out.println("******* In EP: Sleeping 10 sec, kill this cache server now.");
try
{
// not a good thing to do in an entry processor, but just for example!
Thread.sleep(10000L);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// update the customer balance
thisCustomer.setBalance(thisCustomer.getBalance() + newOrder.getOrderTotal());
customerEntry.setValue(thisCustomer);
return null;
}
/**
*
*/
private static final long serialVersionUID = -7813558017954740822L;
// ---- data members ------------------------------------------------------
private Order newOrder;
private String operation;
}
Next, we have our JUnit test which does the following:
- Five customers are inserted with zero balances.
- Two orders, one of $100 and one of $150 are added to customer 1. Total balance should now be $250.
- Runs the EP to add a new order of $120 to customer 1, but force it to fail. The atomic operation of inserting the order and updating the customer balance should fail together. E.g. no order is inserted and balance still $250.
- Runs the EP to add a new order of $120 to customer 1, but instruct the EP to sleep for 10 seconds after insert of order but before update of customer balance (not a good idea in real code but just for the chance to kill the cache server!). When sleeping, kill the cache to simulate a node failure.
- EP continues running on surviving node and then will complete the atomic operation, inserting the new order and updating the totals to $370.
import com.oracle.demo.partitionleveltxn.entryprocessors.AddOrderEntryProcessor;
import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.Service;
import com.tangosol.util.filter.EqualsFilter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Class description
*
* @version Enter version here..., 12/07/05
* @author Enter your name here...
*/
public class RunTest
{
/**
* Method description
*
* @throws Exception
*/
@BeforeClass
public static void setUpBeforeClass()
throws Exception
{
RunDefaultCacheServer.setSystemProperties(false);
CacheFactory.ensureCluster();
ncCustomer = CacheFactory.getCache(Customer.CACHENAME);
ncOrder = CacheFactory.getCache(Order.CACHENAME);
ncCustomer.clear();
ncOrder.clear();
}
/**
* Method description
*
* @throws Exception
*/
@AfterClass
public static void tearDownAfterClass()
throws Exception
{
CacheFactory.shutdown();
}
/**
* Method description
*/
@Test
public void testPartitionLevelTransaction()
{
System.out.println("\n**** Starting Test");
// populate MAX customers with zero balance
populateData();
// add customer orders
addCustomerOrder(1, 100, null);
addCustomerOrder(1, 150, null);
System.out.println("\n**** Validate totals");
// ensure that the header value total for customer 1 matches the sum of the orders
float orderTotalFromOrders = getTotallOrderCount(1);
float orderTotalFromCustomer = getTotalFromCustomer(1);
assertTrue("Total order count on Customer is not 250.00. Value is " + orderTotalFromCustomer,
orderTotalFromCustomer == orderTotalFromOrders);
System.out.println("Customer 1, orderTotalFromOrders=" + orderTotalFromOrders + ", orderTotalFromCustomer="
+ orderTotalFromCustomer);
// now run the entry processor to add a new order for customer 1 for $120.00.
// make sure it fails (artificially) and then ensure both the order is not updated
// and the total is not updated.
//
// In the scenario that the cache server dies before the EP finishes,
// the EP will be run-run on the recovered primary and the atomic operation of
// creating the customer order and updating the balance will be intact.
float newOrderValue = 120.0f;
try
{
System.out.println("\n**** Testing failure of a EP due to error.");
System.out.println("Customer 1 BEFORE failed update: " + (Customer) ncCustomer.get(new Customer.Key(1)));
addCustomerOrder(1, newOrderValue, "fail");
}
catch (Exception e)
{
System.out.println("We should have an exception now " + e.getMessage());
orderTotalFromOrders = getTotallOrderCount(1);
orderTotalFromCustomer = getTotalFromCustomer(1);
// assert that the order total was not updated
assertTrue("Order value from Customer 1 should be 250. Value is " + orderTotalFromCustomer,
orderTotalFromCustomer == 250);
// assert that the order was not created
assertTrue("Order should not have been crated", ncOrder.get(new Order.Key(orderNumber.get(), 1)) == null);
Customer customer = (Customer) ncCustomer.get(new Customer.Key(1));
System.out.println("Customer 1 AFTER failed update: " + customer);
System.out.println("Total order count based upon Orders for customer 1 is " + orderTotalFromOrders
+ " total on Customer object is " + orderTotalFromCustomer
+ "\nFailure in EP correctly did not partially update/insert order.");
}
System.out
.println("\n**** Testing failure of a node during EP execution. Kill cache server which displays the message.");
addCustomerOrder(1, newOrderValue, "sleep");
orderTotalFromOrders = getTotallOrderCount(1);
orderTotalFromCustomer = getTotalFromCustomer(1);
assertTrue("Order value from Customer 1 should be 370. Value is " + orderTotalFromCustomer,
orderTotalFromCustomer == 370);
System.out.println("Last order is " + orderNumber.get() + " = " + ncOrder.get(new Order.Key(orderNumber.get(), 1)));
// assert that the order was created
assertTrue("Order should have been crated", ncOrder.get(new Order.Key(orderNumber.get(), 1)) != null);
Customer customer = (Customer) ncCustomer.get(new Customer.Key(1));
System.out.println("Customer 1 AFTER failover EP execution: " + customer);
System.out.println("Total order count based upon Orders for customer 1 is " + orderTotalFromOrders
+ " total on Customer object is " + orderTotalFromCustomer
+ "\nAtomic operation was successful");
}
/**
* Populate the cache with {@link Customer}s.
*/
private void populateData()
{
NamedCache ncCustomer = CacheFactory.getCache(Customer.CACHENAME);
// create MAX customers with starting balance of zero
for (int i = 1; i <= MAX; i++)
{
Customer c = new Customer(i, "Customer " + i, "Address " + i, 0f);
ncCustomer.put(c.getKey(), c);
}
System.out.println("Number of customers is " + ncCustomer.size());
assertTrue("Cache size should be " + MAX, ncCustomer.size() == MAX);
}
/**
* Add a customer order.
*
* @param customerId customer to add order to
* @param orderValue the value of the order
* @param operation instructs EP to either:
* null - execute normally
* "fail" - throw runtime exception
* "sleep" - sleep for 10 seconds so we can kill cache server running EP
*/
private void addCustomerOrder(int customerId, float orderValue, String operation)
{
Order newOrder = new Order(customerId, orderNumber.incrementAndGet(), orderValue, System.currentTimeMillis());
System.out.println("Adding new order " + newOrder + " to customer " + customerId + ", operation=" + operation);
ncOrder.invoke(newOrder.getKey(), new AddOrderEntryProcessor(newOrder, operation));
}
/**
* Loop through all the {@link Order}s for each customer and get the totals.
* This is used to validate that the header total matches the number of orders.
*
* @param customerId customer to sum for
* @return the total
*/
private float getTotallOrderCount(int customerId)
{
float balance = 0f;
Iterator it = ncOrder.entrySet(new EqualsFilter("getCustomerId", customerId)).iterator();
while (it.hasNext())
{
Map.Entry<Order.Key, Order> result = (Map.Entry<Order.Key, Order>) it.next();
balance += result.getValue().getOrderTotal();
}
return balance;
}
/**
* Return the customer balance from the {@link Customer} object.
*
* @param customerId customer to return balance for
* @return the balance
*/
public float getTotalFromCustomer(int customerId)
{
return ((Customer) ncCustomer.get(new Customer.Key(1))).getBalance();
}
// ----- static ---------------------------------------------------------
private static final int MAX = 5;
private static NamedCache ncCustomer = null;
private static NamedCache ncOrder = null;
private static AtomicInteger orderNumber = new AtomicInteger(0);
}
Lastly we run two cache servers, using the following code, then run our above JUnit test.
import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
/**
* Start a "cache server".
*
* @author tam 20120.07.05
*/
public class RunDefaultCacheServer
{
/**
* @param args
*/
public static void main(String[] args)
{
setSystemProperties(true);
CacheFactory.ensureCluster();
NamedCache ncCustomer = CacheFactory.getCache(Customer.CACHENAME);
NamedCache ncOrder = CacheFactory.getCache(Order.CACHENAME);
System.out.println("Cache server started.");
try
{
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void setSystemProperties(boolean storageEnabled)
{
System.setProperty("tangosol.coherence.distributed.localstorage", storageEnabled ? "true" : "false");
System.setProperty("tangosol.coherence.log.level", "3");
System.setProperty("tangosol.coherence.distributed.threads", "5");
System.setProperty("tangosol.coherence.localhost", "localhost");
System.setProperty("tangosol.coherence.ttl", "0");
System.setProperty("tangosol.coherence.wka", "localhost");
System.setProperty("tangosol.coherence.wka.port", "8088");
}
}
The output below shows the successful running of the JUnit test and Partition Level Transactions. Enjoy!
**** Starting Test Number of customers is 5 Adding new order Order [customerId=1, orderId=1, orderTotal=100.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=null Adding new order Order [customerId=1, orderId=2, orderTotal=150.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=null Adding new order Order [customerId=2, orderId=3, orderTotal=50.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 2, operation=null Adding new order Order [customerId=2, orderId=4, orderTotal=70.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 2, operation=null **** Validate totals Customer 1, orderTotalFromOrders=250.0, orderTotalFromCustomer=250.0 **** Testing failure of a EP due to error. Customer 1 BEFORE failed update: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=250.0] Adding new order Order [customerId=1, orderId=5, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=fail We should have an exception now (Wrapped: Failed request execution for DistributedCache service on Member .... Customer 1 AFTER failed update: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=250.0] Total order count based upon Orders for customer 1 is 250.0 total on Customer object is 250.0 Failure in EP correctly did not partially update/insert order. **** Testing failure of a node during EP execution. Kill cache server which displays the message. Adding new order Order [customerId=1, orderId=6, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=sleep [CACHE SERVER KILLED HERE - SWITCH TO RUNNING CACHE SERVER THAT SHOWS ******* In EP: Sleeping 10 sec, kill this cache server now.] Last order is 6 = Order [customerId=1, orderId=6, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012] Customer 1 AFTER failover EP execution: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=370.0] Total order count based upon Orders for customer 1 is 370.0 total on Customer object is 370.0 Atomic operation was successful Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=370.0] Total from orders is 370.0
Nice post Tim. At last someone had done a nice simple example of this great feature.
Thanks for the article Tim, very usefull. I have a question: in a slightly different scenario, when on change in ‘master’ entity we have to add a number of ‘detail’ entities in a separate cache (with key affinity between two caches) – is it possible to create several entries and perform setValue on them in the same atomic way?
Thanks, Denis.
Hi Dennis,
Thanks for your question. yes you can do that as long as the objects are in the same partition, e.g. via key association. E.g you could have an EntryProcessor that is called against an Order Key, which creates 5 OrderLines for the Order in the “sandbox”
and have it them all saved as part of the same “atomic” operation.
Regards
Tim
Good one.
This feature is generally good – except for the “real-world” use-case. i.e. when you have Write-through enabled.
When you do have Write-through enabled (which is far more common in real-world use-cases), what will usually fail is the Write-through layer (because of DB issues etc.). Your Entry Processor is going to end up with fairly noddy application logic.
In this case (write-through failure), the roll-back does not work. Your pair of caches end up inconsistent.
Whats more, because write-through happens per entry, your database is also inconsistent at this point.
This feature is therefore only marginally useful in the real world (imnsho).
Hi,
I use this feature in “real-world” app and have no issues with it. When something goes wrong on CacheStore layer (I use write-behind strategy) I don’t need to rollback anything, my caches are still in consistent state as a failure on CacheStore layer has no impact on the cache entries at all. Coherence will try to store updated entries via CacheStore as long as you wish, so it is not a problem to repair DB connection and eventually store entries to DB.
If you use write-though strategy you can rollback changes in case of CacheStore failure if you set rollback-cachestore-failures param properly.
If you need you DB consistent with cache all the time and store master-detail data in several tables transactionally – then I’d suggest to perform DB updates from one (master) CacheStore only.
Thanks, Denis.
Hi Denis
I don’t think you understand the issue. This is an open bug with Coherence and they admit that the situation I’ve described is unsupported by the current version of Coherence (I’m on 3.7.1p6).
The issue arises because:
1. you have an entry processor mutating state in 2 caches.
2. you are doing write-through to the database
3. write-through happens outside the context of the entry-processor execution
4. errors in the database layer are transactional per entry (remember we have 2 caches) but not transactional (from a DB perspective) across both entries you mutate.
The workaround we’re using is we do persistence within the entry-processor. This guarantees correct behavior in the database (both record changes happen in the same transaction) as well as correct behavior in Coherence (exception in the entry-processor correctly rolls back all changes and retains your cache and database in a consistent state.
Good discussion. Take a look at this excellent post from Andrew Wilson.
http://wiki.fullten.com.hk/display/AWKL/Coherence+Partition+Level+Transactions+and+Cache+Stores