Monday, 22 December 2014

JEE Concurrent with Camel

The official specs for JEE servers, say that your not meant to start your own threads, implying an impending doom if you do so. In actually you loose some functionality in terms of what the app server provides/allows and the app server isn't easily able to kill off rogue threads when you try and undeploy. Not the worst things in the world but if we could do something about it we would.

The first 'standard' support for spawning threads in a container was the commonj Work Manager and Timer. It was a bit clunky to use, hard to integrate and not supported across the board.

Thankfully they made the smart decision to just extend Doug Lea's framework in concurrency-ee. Integrating this into camel but substituting the used ThreadFactory with the container provided ManagedThreadFactory. The one downside of doing this is that we loose don't create the threads so we may not be able to set the name. Having said that, using a nifty trick I was able to set it on Websphere Liberty Profile. Here is the basic solution:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:camel="http://camel.apache.org/schema/spring"
 xsi:schemaLocation="http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.1.xsd
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

 <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
  <contextScan />
 </camelContext>
 
 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/webAppThreadFactory"
  id="threadFactory" proxy-interface="javax.enterprise.concurrent.ManagedThreadFactory"
  cache="true" lookup-on-startup="false" />

 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/executor" id="executorService" />
  
 <bean class="net.devgrok.camel.JEEThreadPoolFactory" id="threadPoolFactory">
  <constructor-arg ref="threadFactory" />
 </bean>
  
</beans>
I highly recommend including the "java:comp/env/" prefix, as I was caught out several times when it resolving against the global JNDI not the local (as it tries both and if one passes then it doesn't error). JEEThreadPoolFactory is a copy and paste from DefaultThreadPoolFactory, so cutting it down for brevity's sake.
public class JEEThreadPoolFactory implements ThreadPoolFactory {
 private static final Logger log = LoggerFactory.getLogger(JEEThreadPoolFactory.class);
 private final ManagedThreadFactory managedThreadFactory;

 public JEEThreadPoolFactory(ManagedThreadFactory threadFactory) {
  this.managedThreadFactory = threadFactory;
 }

 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  //every threadFactory is replaced by subsituteThreadFactory(threadFactory)
  return Executors.newCachedThreadPool(subsituteThreadFactory(threadFactory));
 }

 /**
  * Gets the naming off the passed in factory and creates a wrapper around
  * the managed one to rename the thread. If permission is denied logging MDC could be using instead.
  * 
  * @param passedInFactory
  * @return
  */
 protected ThreadFactory subsituteThreadFactory(ThreadFactory passedInFactory) {
  try {
   String pattern = ThreadHelper.DEFAULT_PATTERN;
   String name = "";
   if (passedInFactory instanceof CamelThreadFactory) {
    pattern = (String) FieldUtils.readField(passedInFactory, "pattern", true);
    name = (String) FieldUtils.readField(passedInFactory, "name", true);
    // boolean daemon;
   } else if (passedInFactory instanceof CustomizableThreadFactory) {
    pattern = "#name#-#counter#";
    name = (String) FieldUtils.readField(passedInFactory, "threadNamePrefix", true);
   }
   log.info("Creating naming factory pattern {} name {}", pattern, name);
   return new NamingManagedThreadFactory(managedThreadFactory, pattern, name);
  } catch (IllegalAccessException e) {
   log.warn("Could not determine fatory name {}", e.toString());
   return managedThreadFactory;
  }
 }
}
/**
 * Similar to CamelThreadFactory but wraps a ManagedThreadFactory
 */
public class NamingManagedThreadFactory implements ManagedThreadFactory {
 private static final Logger LOG = LoggerFactory.getLogger(NamingManagedThreadFactory.class);
 private final ManagedThreadFactory delegate;
 private String name;
 private String pattern;

 public NamingManagedThreadFactory(ManagedThreadFactory managedThreadFactory, String pattern, String name) {
  this.delegate = managedThreadFactory;
  this.pattern = pattern;
  this.name = name;
 }

 @Override
 public Thread newThread(Runnable runnable) {
  String threadName = ThreadHelper.resolveThreadName(pattern, name);
  Thread answer = delegate.newThread(runnable);
  try {
   answer.setName(threadName);
  } catch (Exception e) {
   LOG.warn("Error seting thread name {}", e.toString());
  }

  LOG.trace("Created thread[{}] -> {}", threadName, answer);
  return answer;
 }
}
In liberty profile I hit a nasty bug looking up the ThreadFactory, but the solution was to stop camel loading on startup by setting SpringCamelContext.setNoStart(false), loading the Spring context, looking up the executorService and asynchronously starting camel.
Caused by: java.lang.UnsupportedOperationException: SRVE8011E: This operation cannot be executed from a programmatically added listener.
 at com.ibm.ws.webcontainer.webapp.WebApp.commonAddListener(WebApp.java:5871)

Saturday, 20 December 2014

Suppressing class path jaxb-api.jar can not be found

When loading an application in an osgi-like container with an app that references jabx-impl, you might see the following warning:
[WARNING ] SRVE9967W: The manifest class path jaxb-api.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-impl-2.2.7.jar or its parent.
[WARNING ] SRVE9967W: The manifest class path jaxb-core.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-core.jar.jar or its parent.

This one is caused by the specific reference to jaxb-api.jar and jaxb-core.jar in jaxb-impl-2.2.7.jar/META-INF/MANIFEST.MF:
Class-Path: jaxb-api.jar jaxb-core.jar
The artifacts that maven pull in have a different filename (with a version number) that doesn't match the expected build version. e.g. jaxb-core-2.2.7.jar and jaxb-api-2.2.7.jar

The solution:
Create 2 empty zip files and name them jaxb-core.jar and jaxb-api.jar
Place them into proj/src/main/WEB-INF/lib
Done

Tuesday, 16 December 2014

Using SQL Server as an imitation Queue with Camel

SQL Server's lock hints allow to you to use it as a database queue. The general process is discussed on stackoverflow.

Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.

Assume we have a generic entity:
@Entity
public class JpaMessage {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    /** an optional hint as to type of message */
    private String msgType;
    /** a refrence to another table containing the blob for performance reasons */
    private Long msgBlobId;
    /** to allow for PESSIMISTIC locking */
    @Version
    private Date lastUpdated;
}

/** basic route */
public class TestRoutbuiler extends BaseRouteBuilder {
    @Value("${endpoint.uri}")
    private String endpointUri;
    @Override
    public void configure() throws Exception {
        from(endpointUri)
            .to("Log received message ${body}");
    }
}


SkipLockedEntity

First is the most basic of forms and the one with the highest portability. It reads in the ResultSet and one by one, tries to lock the entity for update with javax.persistence.lock.timeout=0. In sql server this generates the NOWAIT hint.
endpointUri=jpa:JpaMessage?consumer.query=select m from JpaMessage m&consumer.skipLockedEntity=true
This is the hibernate generate sql:
select jpamessage0_.id as id1_1_, jpamessage0_.last_updated as last_upd2_1_, jpamessage0_.msg_blob_id as msg_blob3_1_, jpamessage0_.msg_type as msg_type4_1_ from JPA_MESSAGE jpamessage0_
select id from JPA_MESSAGE with (updlock, rowlock, nowait ) where id =? and last_updated =?
And this is the locks it obtains in SQL Server:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message
METADATAprincipal_id = 1Sch-SLOCKGRANTNULLNULL
METADATAdata_space_id = 2Sch-SLOCKGRANTNULLNULL
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE3:110IULOCKGRANTjpa_messagePK_jpa_message


nativeQuery

This is aquires an update lock on the first non-blocked row
endpointUri=jpa:native1?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
select id from JPA_MESSAGE with (updlock, rowlock ) where id =? and last_updated =?;
Locks obtained are the same as above.


nativeQuery with no consumeLockEntity

This is aquires an update lock on the first non-blocked row, disabling the programatic locking of the already locked row
endpointUri=jpa:native2?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
Locks are only slightly different:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE3:110IULOCKGRANTjpa_messagePK_jpa_message


nativeQuery with delete

The jpa component deletes the entity after it processed it, so technically this saves another db call and if you were to do a select from the table with READUNCOMMITTED then you wouldn't see the row which you would for the other cases. However the more exclusive database locks obtained for the duration of the processing make it less desirable.
endpointUri=jpa:nativeDelete?consumer.nativeQuery=delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*&consumeDelete=false&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*;
And the more agressive lock:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756)XLOCKGRANTjpa_messagePK_jpa_message
METADATAprincipal_id = 1Sch-SLOCKGRANTNULLNULL
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE0.201388889IXLOCKGRANTjpa_messagePK_jpa_message


Cavets

If your using the JpaTransactionManager, then the message routing is done inside the same transaction. So if you had a database error that would force the transaction to rollback. If you had wanted the error handler to consume the message this wouldn't be possible.
The simple way would be to do in a new trans. i.e.:
public void configure() throws Exception {
   from(endpointUri)
      .errorHandler(deadLetterChannel("log:error"))
      .transacted("PROPAGATION_REQUIRES_NEW")
      .to("Log received message ${body}")
      .bean(someBeanWithRollback);
}
However if you wanted shorter transactions, you either have to do every action in another REQUIRES_NEW transaction or on another thread - such as using seda. The best solution is to use PROPAGATION_NOT_SUPPORTED, which suspends the current transaction meaning transactions will be created only when needed - reducing the times locks are held and less chance of deadlocks.


My complete solution

jpa.delay=1000
endpointUri=jpa:namedQuery?consumer.delay=${jpa.delay}&consumer.greedy=true&consumer.namedQuery=JpaMessage.byType&consumer.parameters=#params&consumeLockEntity=false
consumer.greedy=true results in it immediately polling the db for another record if it has one, meaning if the queue is full it will read them off as fast as possible
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 -  when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
@Entity
@NamedNativeQuery(name = "JpaMessage.byType", query = "select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) where msg_type=:msgType", resultClass = JpaMessage.class) })
public class JpaMessage {}
<beans> 
 <util:map id="params" key-type="java.lang.String">
  <entry key="msgType" value="filterMsgType1"/>
 </util:map>
 <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
   <property name="transactionManager" ref="transactionManager"/>
   <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
 </bean>
 <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
   <property name="entityManagerFactory" ref="entityManagerFactory" />
 </bean>
 <bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
  <property name="entityManagerFactory" ref="entityManagerFactory" />
  <property name="transactionManager" ref="transactionManager" />
  <property name="sharedEntityManager" value="true"/>
  <property name="joinTransaction" value="false"/>
 </bean>
</beans>
public void configure() throws Exception {
   from(endpointUri)
      .errorHandler(deadLetterChannel("log:error"))
      .transacted("PROPAGATION_NOT_SUPPORTED")
      .to("Log received message ${body}")
      ;
}