package org.jboss.cache.buddyreplication;

import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.transaction.DummyTransactionManagerLookup;
import org.testng.annotations.Test;

import javax.transaction.RollbackException;
import javax.transaction.TransactionManager;
import java.util.List;

/**
 * Tests that a failed PREPARE on a buddy results in a proper rollback.
 *
 * @author Brian Stansberry
 * @since 3.1.12
 */
@Test(groups = "functional", testName = "buddyreplication.Buddy3NodesFailedRemotePrepareTest")
public class Buddy3NodesFailedRemotePrepareTest extends BuddyReplicationTestsBase
{
   boolean optimistic = false;

   @CacheListener
   public static class FailureListener
   {
      private boolean rejected;

      @NodeModified
      public void nodeModified(NodeModifiedEvent event)
      {
         if (!rejected && !event.isOriginLocal() && event.getFqn().hasElement("0"))
         {
            rejected = true;
            throw new RuntimeException("rejected");
         }
      }
   }

   public void testFailedPrepare() throws Exception
   {
      List<CacheSPI<Object, Object>> caches = createCaches(2, 3, false, false, optimistic, false);
      cachesTL.set(caches);

      int cacheNumber = 0;
      for (CacheSPI<Object, Object> c : caches)
      {
         c.getConfiguration().setFetchInMemoryState(false);
         c.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
         c.start();
         c.put("/" + cacheNumber++, "k", "v");
      }

      waitForBuddy(caches.get(0), caches.get(1), false);
      waitForBuddy(caches.get(0), caches.get(2), false);
      waitForBuddy(caches.get(1), caches.get(2), false);
      waitForBuddy(caches.get(1), caches.get(0), false);
      waitForBuddy(caches.get(2), caches.get(0), false);
      waitForBuddy(caches.get(2), caches.get(1), false);
      Thread.sleep(2000);//wait for state transfer

      caches.get(0).put("/0", "k", "v");
      Fqn backup = fqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), Fqn.fromString("/0"));
      assert (caches.get(1).exists(backup));
      assert (caches.get(2).exists(backup));

      caches.get(2).addCacheListener(new FailureListener());

      TransactionManager tm = caches.get(0).getTransactionManager();
      tm.begin();
      caches.get(0).put("/0", "k", "v1");

      try
      {
         tm.commit();
         assert false : "Put should have thrown an exception";
      }
      catch (RollbackException expected)
      {
         expected.printStackTrace(System.out);
      }

      Object val = caches.get(0).get("/0", "k");
      assert "v".equals(val) : "cache 0 had " + val;

      caches.get(2).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
      val = caches.get(2).get("/0", "k");
      TestingUtil.sleepThread(100);
      assert "v".equals(val) : "cache 2 had " + val;

      caches.get(1).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
      val = caches.get(1).get("/0", "k");
      TestingUtil.sleepThread(100);
      assert "v".equals(val) : "cache 1 had " + val;
   }
}
