Partition Level Transactions in Coherence 3.7

Introduction
Entry Processors (EP) allow you to create/update/delete a single cache entry at a time in a guaranteed atomic operation. What if you want to write an application that mutates multiple cache entries at a time in an atomic operation via an EP, and additionally you want to do this over multiple caches?

This is now possible in Coherence 3.7, which introduced the concept of ‘Partition Level Transactions’. This allows you to insert/update/mutate multiple cache entries in the same partition, via an Entry Processor (EP), in a single atomic operation.

When you run a “normal” Entry Processor (EP) against an entry you get a couple of things for “free”:

  • Guaranteed once and only once execution of the EP against the data. On a node failure during execution if the EP did not complete, it will be re-executed on the primary copy of the data when the data is recovered.
  • Implicit concurrency control while executing the EP. This ensuring only one EP can ever be executing against a particular key at a time. Lock free processing, very powerful.

Using Partition Level Transactions
In 3.7 you can now use BackingMapContext.getBackingMapEntry() to access another cache and any mutations will be added to the EP’s ‘sandbox’ of changes that will be guaranteed to be saved as an atomic operation. The pre-requisites for this are:

  1. The caches must belong to the same service.
  2. They must exist in the same partition. This is achieved by using data affinity. See Here for more information.
  3. You must ensure you do not access the keys out of order and cause a deadlock, although the sandbox will detect this and throw an exception.

It was possible to update multiple entries prior to 3.7 by accessing the BackingMap directly, but the atomic operation only covered the key you were updating, not the direct accesses to a different BackingMap.

See this video, which has a good explanation of this feature.

The Example
Consider the use-case where we have a Customer object with an order total attribute and a Order object that belongs to a customer and has an order total. What we want to be able to do is to insert the new order and at the same time ensure that the order total on the Customer object is always updated to reflect the new total.

Firstly we need our Order and Customer objects. As specified above we need to use data affinity (KeyAssociation) to ensure that all orders for a given customer exist in the same partition.


public class Customer implements Serializable {

	public static final String CACHENAME = "Customer";
	private int customerId;
	private String  customerName;
	private String  address;
	private float  balance;

	// getters/setters  etc

	// Key class
	public static class Key implements Serializable, KeyAssociation {

		private int customerId;

		public Key() {
		}

		public Key (Customer customer) {
		   this.customerId = customer.getCustomerId();
		}

		public Key (int customerId) {
			this.customerId = customerId;
		}

		@Override
		public Object getAssociatedKey() {
			return new Integer(customerId);
		}

		// hashCode() , equals().. etc..
	}
public class Order implements Serializable {
	
	private static final long serialVersionUID = -1307260114314671367L;

	public static final String CACHENAME = "Order";
	
	private int customerId;
	private int orderId;
	private float orderTotal;
	private long orderDate;
	
	// getters/setters  etc


    public Key getKey() {
        return new Key(this);
    }

    // Key class
	public static class Key implements Serializable, KeyAssociation {

		private int orderId;
		private int customerId;
		
		@Override
		public Object getAssociatedKey() {
			return new Integer(customerId);
		}
		
		public Key() {	
		}
		
		public Key(Order order) {
			this.orderId = order.getOrderId();
			this.customerId = order.getCustomerId();
		}
		
		public Key(int orderId, int customerId) {
			this.orderId = orderId;
			this.customerId = customerId;
		}

		// hashCode() , equals().. etc..
		
		
	}

Next we need to have our Entry Processor that, in my example runs against the order cache to create a new order and updates the customer cache object order total. My entry processor takes two parameters, the Order to create and an operation. The operation is either:

  • null – Normal processing, no exceptions.
  • sleep – Sleep for 10 seconds so we can simulate node failure.
  • fail – Throw a RuntimeException to simulate processing failure.

The key item to note is on line 68 where we use getBackingMapEntry(). Using this will ensure that the modifications to the “Customer” cache is part of the sandbox of modifications.

package com.oracle.demo.partitionleveltxn.entryprocessors;

import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;

import com.tangosol.net.BackingMapManagerContext;

import com.tangosol.util.BinaryEntry;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;

import java.io.Serializable;

/**
 * This entry processor will add a new order to a customer.  The total orders for a customer
 * will also be updated in the KeyAssociated Customer Cache using Partition Level Transactions
 * feature introduced in 3.7.
 *
 * This entry process is run against a "soon to be created" Order.
 *
 * @author tam  2012.07.05
 *
 */
public class AddOrderEntryProcessor
        extends AbstractProcessor
        implements Serializable
    {
    /**
     * Used to add a new {@link Order} to a {@link Customer}.
     *
     * @param newOrder the order to add
     * @param operation string value of:
     *    "fail"  - for throw new exception
     *    "sleep"- (for sleep 10 seconds so we can kill cache server
     *    null   - nothing, just let it complete
     */
    public AddOrderEntryProcessor(Order newOrder, String operation)
        {
        super();
        this.newOrder  = newOrder;
        this.operation = operation;
        }

    /**
     * Add a new {@link Order} to a {@link Customer}. This method is called against the
     * {@link Order} object and the {@link Customer} object will be implicitly locked.
     *
     * @param entry The customer entry to run against
     *
     * @return null
     */
    @Override
    public Object process(Entry entry)
        {
        // entry should not be present so lets set the value. E.g. create it
        entry.setValue(newOrder);

        System.out.println("In EP: " + newOrder);

        BinaryEntry              orderEntry = (BinaryEntry) entry;
        BackingMapManagerContext ctx        = ((BinaryEntry) entry).getContext();

        // get the customer entry as a binary - this will be in the same partition
        // by using getBackingMapEntry you are adding the changes to the sandbox
        
        BinaryEntry customerEntry =
            (BinaryEntry) orderEntry.getContext().getBackingMapContext(Customer.CACHENAME)
                .getBackingMapEntry(ctx.getKeyToInternalConverter()
                    .convert(new Customer.Key(newOrder.getCustomerId())));
        
        // get normal Object Value
        Customer thisCustomer = ((Customer) customerEntry.getValue());

        // simulated failure of entry processor, atomic operation should not be completed
        if ("fail".equals(operation))
            {
            throw new RuntimeException("Artificial fail");
            }
        else if ("sleep".equals(operation))
            {
            System.out.println("******* In EP: Sleeping 10 sec, kill this cache server now.");

            try
                {
                // not a good thing to do in an entry processor, but just for example!
                Thread.sleep(10000L);
                }
            catch (InterruptedException e)
                {
                e.printStackTrace();
                }

            }

        // update the customer balance
        thisCustomer.setBalance(thisCustomer.getBalance() + newOrder.getOrderTotal());
        customerEntry.setValue(thisCustomer);

        return null;
        }

    /**
     *
     */
    private static final long serialVersionUID = -7813558017954740822L;

    // ---- data members ------------------------------------------------------

    private Order  newOrder;
    private String operation;
    }

Next, we have our JUnit test which does the following:

  1. Five customers are inserted with zero balances.
  2. Two orders, one of $100 and one of $150 are added to customer 1. Total balance should now be $250.
  3. Runs the EP to add a new order of $120 to customer 1, but force it to fail. The atomic operation of inserting the order and updating the customer balance should fail together. E.g. no order is inserted and balance still $250.
  4. Runs the EP to add a new order of $120 to customer 1, but instruct the EP to sleep for 10 seconds after insert of order but before update of customer balance (not a good idea in real code but just for the chance to kill the cache server!). When sleeping, kill the cache to simulate a node failure.
  5. EP continues running on surviving node and then will complete the atomic operation, inserting the new order and updating the totals to $370.
import com.oracle.demo.partitionleveltxn.entryprocessors.AddOrderEntryProcessor;
import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.Service;

import com.tangosol.util.filter.EqualsFilter;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

import java.util.Iterator;
import java.util.Map;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Class description
 *
 * @version        Enter version here..., 12/07/05
 * @author         Enter your name here...
 */
public class RunTest
    {
    /**
     * Method description
     *
     * @throws Exception
     */
    @BeforeClass
    public static void setUpBeforeClass()
            throws Exception
        {
        RunDefaultCacheServer.setSystemProperties(false);
        CacheFactory.ensureCluster();

        ncCustomer = CacheFactory.getCache(Customer.CACHENAME);
        ncOrder    = CacheFactory.getCache(Order.CACHENAME);

        ncCustomer.clear();
        ncOrder.clear();
        }

    /**
     * Method description
     *
     * @throws Exception
     */
    @AfterClass
    public static void tearDownAfterClass()
            throws Exception
        {
        CacheFactory.shutdown();
        }

    /**
     * Method description
     */
    @Test
    public void testPartitionLevelTransaction()
        {
        System.out.println("\n**** Starting Test");
        
        // populate MAX customers with zero balance
        populateData();

        // add customer orders
        addCustomerOrder(1, 100, null);
        addCustomerOrder(1, 150, null);
               
        System.out.println("\n**** Validate totals");

        // ensure that the header value total for customer 1 matches the sum of the orders
        float orderTotalFromOrders   = getTotallOrderCount(1);
        float orderTotalFromCustomer = getTotalFromCustomer(1);

        assertTrue("Total order count on Customer is not 250.00. Value is " + orderTotalFromCustomer,
                   orderTotalFromCustomer == orderTotalFromOrders);
        System.out.println("Customer 1, orderTotalFromOrders=" + orderTotalFromOrders + ", orderTotalFromCustomer="
                           + orderTotalFromCustomer);

        // now run the entry processor to add a new order for customer 1 for $120.00.
        // make sure it fails (artificially) and then ensure both the order is not updated
        // and the total is not updated.
        //
        // In the scenario that the cache server dies before the EP finishes,
        // the EP will be run-run on the recovered primary and the atomic operation of
        // creating the customer order and updating the balance will be intact.

        float newOrderValue = 120.0f;

        try
            {
            System.out.println("\n**** Testing failure of a EP due to error.");
            System.out.println("Customer 1 BEFORE failed update: " + (Customer) ncCustomer.get(new Customer.Key(1)));

            addCustomerOrder(1, newOrderValue, "fail");

            }
        catch (Exception e)
            {
            System.out.println("We should have an exception now " + e.getMessage());
            orderTotalFromOrders   = getTotallOrderCount(1);
            orderTotalFromCustomer = getTotalFromCustomer(1);

            // assert that the order total was not updated
            assertTrue("Order value from Customer 1 should be 250. Value is " + orderTotalFromCustomer,
                       orderTotalFromCustomer == 250);

            // assert that the order was not created
            assertTrue("Order should not have been crated", ncOrder.get(new Order.Key(orderNumber.get(), 1)) == null);

            Customer customer = (Customer) ncCustomer.get(new Customer.Key(1));

            System.out.println("Customer 1 AFTER failed update: " + customer);

            System.out.println("Total order count based upon Orders for customer 1 is " + orderTotalFromOrders
                               + " total on Customer object is " + orderTotalFromCustomer
                               + "\nFailure in EP correctly did not partially update/insert order.");

            }

        System.out
            .println("\n**** Testing failure of a node during EP execution. Kill cache server which displays the message.");
        
        addCustomerOrder(1, newOrderValue, "sleep");

        orderTotalFromOrders   = getTotallOrderCount(1);
        orderTotalFromCustomer = getTotalFromCustomer(1);
        
        assertTrue("Order value from Customer 1 should be 370. Value is " + orderTotalFromCustomer,
                   orderTotalFromCustomer == 370);

        System.out.println("Last order is " + orderNumber.get() + " = " + ncOrder.get(new Order.Key(orderNumber.get(), 1)));
        // assert that the order was created
        assertTrue("Order should have been crated", ncOrder.get(new Order.Key(orderNumber.get(), 1)) != null);
        
        Customer customer = (Customer) ncCustomer.get(new Customer.Key(1));

        System.out.println("Customer 1 AFTER failover EP execution: " + customer);
        System.out.println("Total order count based upon Orders for customer 1 is " + orderTotalFromOrders
                           + " total on Customer object is " + orderTotalFromCustomer
                           + "\nAtomic operation was successful");
        }
   
    /**
     * Populate the cache with {@link Customer}s.
     */
    private void populateData()
        {
        NamedCache ncCustomer = CacheFactory.getCache(Customer.CACHENAME);

        // create MAX customers with starting balance of zero
        for (int i = 1; i <= MAX; i++)
            {
            Customer c = new Customer(i, "Customer " + i, "Address " + i, 0f);

            ncCustomer.put(c.getKey(), c);
            }

        System.out.println("Number of customers is " + ncCustomer.size());
        assertTrue("Cache size should be " + MAX, ncCustomer.size() == MAX);

        }

    /**
     * Add a customer order.
     * 
     * @param customerId customer to add order to
     * @param orderValue the value of the order
     * @param operation instructs EP to either:
     *        null - execute normally
     *        "fail" - throw runtime exception
     *        "sleep" - sleep for 10 seconds so we can kill cache server running EP
     */
    private void addCustomerOrder(int customerId, float orderValue, String operation)
        {
        Order newOrder = new Order(customerId, orderNumber.incrementAndGet(), orderValue, System.currentTimeMillis());

        System.out.println("Adding new order " + newOrder + " to customer " + customerId + ", operation=" + operation);

        ncOrder.invoke(newOrder.getKey(), new AddOrderEntryProcessor(newOrder, operation));
        }

    /**
     * Loop through all the {@link Order}s for each customer and get the totals.
     * This is used to validate that the header total matches the number of orders.
     *
     * @param customerId customer to sum for
     * @return the total
     */
    private float getTotallOrderCount(int customerId)
        {
        float    balance = 0f;

        Iterator it      = ncOrder.entrySet(new EqualsFilter("getCustomerId", customerId)).iterator();

        while (it.hasNext())
            {
            Map.Entry<Order.Key, Order> result = (Map.Entry<Order.Key, Order>) it.next();

            balance += result.getValue().getOrderTotal();
            }

        return balance;
        }

    /**
     * Return the customer balance from the {@link Customer} object.
     *
     * @param customerId customer to return balance for
     * @return the balance
     */
    public float getTotalFromCustomer(int customerId)
        {
        return ((Customer) ncCustomer.get(new Customer.Key(1))).getBalance();
        }

    // ----- static ---------------------------------------------------------

    private static final int     MAX             = 5;
    private static NamedCache    ncCustomer      = null;
    private static NamedCache    ncOrder         = null;
    private static AtomicInteger orderNumber     = new AtomicInteger(0);
    }

Lastly we run two cache servers, using the following code, then run our above JUnit test.

import com.oracle.demo.partitionleveltxn.model.Customer;
import com.oracle.demo.partitionleveltxn.model.Order;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

/**
 * Start a "cache server".
 *
 * @author tam  20120.07.05
 */
public class RunDefaultCacheServer
    {
    /**
     * @param args
     */
    public static void main(String[] args)
        {
        setSystemProperties(true);

        CacheFactory.ensureCluster();

        NamedCache ncCustomer = CacheFactory.getCache(Customer.CACHENAME);
        NamedCache ncOrder    = CacheFactory.getCache(Order.CACHENAME);

        System.out.println("Cache server started.");

        try
            {
            Thread.sleep(Long.MAX_VALUE);
            }
        catch (InterruptedException e)
            {
            // TODO Auto-generated catch block
            e.printStackTrace();
            }
        }

    public static void setSystemProperties(boolean storageEnabled)
        {
        System.setProperty("tangosol.coherence.distributed.localstorage", storageEnabled ? "true" : "false");
        System.setProperty("tangosol.coherence.log.level", "3");
        System.setProperty("tangosol.coherence.distributed.threads", "5");

        System.setProperty("tangosol.coherence.localhost", "localhost");
        System.setProperty("tangosol.coherence.ttl", "0");
        System.setProperty("tangosol.coherence.wka", "localhost");
        System.setProperty("tangosol.coherence.wka.port", "8088");
        }
    }

The output below shows the successful running of the JUnit test and Partition Level Transactions. Enjoy!


**** Starting Test
Number of customers is 5
Adding new order Order [customerId=1, orderId=1, orderTotal=100.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=null
Adding new order Order [customerId=1, orderId=2, orderTotal=150.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=null
Adding new order Order [customerId=2, orderId=3, orderTotal=50.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 2, operation=null
Adding new order Order [customerId=2, orderId=4, orderTotal=70.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 2, operation=null

**** Validate totals
Customer 1, orderTotalFromOrders=250.0, orderTotalFromCustomer=250.0

**** Testing failure of a EP due to error.
Customer 1 BEFORE failed update: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=250.0]
Adding new order Order [customerId=1, orderId=5, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=fail
We should have an exception now (Wrapped: Failed request execution for DistributedCache service on Member ....

Customer 1 AFTER failed update: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=250.0]
Total order count based upon Orders for customer 1 is 250.0 total on Customer object is 250.0
Failure in EP correctly did not partially update/insert order.

**** Testing failure of a node during EP execution. Kill cache server which displays the message.
Adding new order Order [customerId=1, orderId=6, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012] to customer 1, operation=sleep

[CACHE SERVER KILLED HERE - SWITCH TO RUNNING CACHE SERVER THAT SHOWS ******* In EP: Sleeping 10 sec, kill this cache server now.]

Last order is 6 = Order [customerId=1, orderId=6, orderTotal=120.0, orderDate=Sat Jul 07 12:37:26 WST 2012]
Customer 1 AFTER failover EP execution: Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=370.0]
Total order count based upon Orders for customer 1 is 370.0 total on Customer object is 370.0
Atomic operation was successful
Customer [customerId=1, customerName=Customer 1, address=Address 1, balance=370.0]
Total from orders is 370.0

Advertisements
This entry was posted in Uncategorized and tagged , . Bookmark the permalink.

9 Responses to Partition Level Transactions in Coherence 3.7

  1. Dave says:

    Nice post Tim. At last someone had done a nice simple example of this great feature.

  2. Denis Sukhoroslov says:

    Thanks for the article Tim, very usefull. I have a question: in a slightly different scenario, when on change in ‘master’ entity we have to add a number of ‘detail’ entities in a separate cache (with key affinity between two caches) – is it possible to create several entries and perform setValue on them in the same atomic way?

    Thanks, Denis.

    • Hi Dennis,

      Thanks for your question. yes you can do that as long as the objects are in the same partition, e.g. via key association. E.g you could have an EntryProcessor that is called against an Order Key, which creates 5 OrderLines for the Order in the “sandbox”
      and have it them all saved as part of the same “atomic” operation.

      Regards
      Tim

  3. G Fernandes says:

    Good one.
    This feature is generally good – except for the “real-world” use-case. i.e. when you have Write-through enabled.

    When you do have Write-through enabled (which is far more common in real-world use-cases), what will usually fail is the Write-through layer (because of DB issues etc.). Your Entry Processor is going to end up with fairly noddy application logic.

    In this case (write-through failure), the roll-back does not work. Your pair of caches end up inconsistent.

    Whats more, because write-through happens per entry, your database is also inconsistent at this point.

    This feature is therefore only marginally useful in the real world (imnsho).

    • Denis Sukhoroslov says:

      Hi,
      I use this feature in “real-world” app and have no issues with it. When something goes wrong on CacheStore layer (I use write-behind strategy) I don’t need to rollback anything, my caches are still in consistent state as a failure on CacheStore layer has no impact on the cache entries at all. Coherence will try to store updated entries via CacheStore as long as you wish, so it is not a problem to repair DB connection and eventually store entries to DB.
      If you use write-though strategy you can rollback changes in case of CacheStore failure if you set rollback-cachestore-failures param properly.
      If you need you DB consistent with cache all the time and store master-detail data in several tables transactionally – then I’d suggest to perform DB updates from one (master) CacheStore only.

      Thanks, Denis.

      • G Fernandes says:

        Hi Denis
        I don’t think you understand the issue. This is an open bug with Coherence and they admit that the situation I’ve described is unsupported by the current version of Coherence (I’m on 3.7.1p6).

        The issue arises because:
        1. you have an entry processor mutating state in 2 caches.
        2. you are doing write-through to the database
        3. write-through happens outside the context of the entry-processor execution
        4. errors in the database layer are transactional per entry (remember we have 2 caches) but not transactional (from a DB perspective) across both entries you mutate.

        The workaround we’re using is we do persistence within the entry-processor. This guarantees correct behavior in the database (both record changes happen in the same transaction) as well as correct behavior in Coherence (exception in the entry-processor correctly rolls back all changes and retains your cache and database in a consistent state.

  4. Hi,
    Can you please confirm if accessing another cache within EntryProcessor is also a thread safe ?
    Thanks
    Deepak

    • Yes it is threadsafe, but be aware of the comment above “You must ensure you do not access the keys out of order and cause a deadlock, although the sandbox will detect this and throw an exception.”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s