Monday, 22 December 2014

JEE Concurrent with Camel

The official specs for JEE servers, say that your not meant to start your own threads, implying an impending doom if you do so. In actually you loose some functionality in terms of what the app server provides/allows and the app server isn't easily able to kill off rogue threads when you try and undeploy. Not the worst things in the world but if we could do something about it we would.

The first 'standard' support for spawning threads in a container was the commonj Work Manager and Timer. It was a bit clunky to use, hard to integrate and not supported across the board.

Thankfully they made the smart decision to just extend Doug Lea's framework in concurrency-ee. Integrating this into camel but substituting the used ThreadFactory with the container provided ManagedThreadFactory. The one downside of doing this is that we loose don't create the threads so we may not be able to set the name. Having said that, using a nifty trick I was able to set it on Websphere Liberty Profile. Here is the basic solution:

<beans xmlns="" xmlns:camel=""

 <camelContext id="camelContext" xmlns="">
  <contextScan />
 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/webAppThreadFactory"
  id="threadFactory" proxy-interface="javax.enterprise.concurrent.ManagedThreadFactory"
  cache="true" lookup-on-startup="false" />

 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/executor" id="executorService" />
 <bean class="net.devgrok.camel.JEEThreadPoolFactory" id="threadPoolFactory">
  <constructor-arg ref="threadFactory" />
I highly recommend including the "java:comp/env/" prefix, as I was caught out several times when it resolving against the global JNDI not the local (as it tries both and if one passes then it doesn't error). JEEThreadPoolFactory is a copy and paste from DefaultThreadPoolFactory, so cutting it down for brevity's sake.
public class JEEThreadPoolFactory implements ThreadPoolFactory {
 private static final Logger log = LoggerFactory.getLogger(JEEThreadPoolFactory.class);
 private final ManagedThreadFactory managedThreadFactory;

 public JEEThreadPoolFactory(ManagedThreadFactory threadFactory) {
  this.managedThreadFactory = threadFactory;

 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  //every threadFactory is replaced by subsituteThreadFactory(threadFactory)
  return Executors.newCachedThreadPool(subsituteThreadFactory(threadFactory));

  * Gets the naming off the passed in factory and creates a wrapper around
  * the managed one to rename the thread. If permission is denied logging MDC could be using instead.
  * @param passedInFactory
  * @return
 protected ThreadFactory subsituteThreadFactory(ThreadFactory passedInFactory) {
  try {
   String pattern = ThreadHelper.DEFAULT_PATTERN;
   String name = "";
   if (passedInFactory instanceof CamelThreadFactory) {
    pattern = (String) FieldUtils.readField(passedInFactory, "pattern", true);
    name = (String) FieldUtils.readField(passedInFactory, "name", true);
    // boolean daemon;
   } else if (passedInFactory instanceof CustomizableThreadFactory) {
    pattern = "#name#-#counter#";
    name = (String) FieldUtils.readField(passedInFactory, "threadNamePrefix", true);
   }"Creating naming factory pattern {} name {}", pattern, name);
   return new NamingManagedThreadFactory(managedThreadFactory, pattern, name);
  } catch (IllegalAccessException e) {
   log.warn("Could not determine fatory name {}", e.toString());
   return managedThreadFactory;
 * Similar to CamelThreadFactory but wraps a ManagedThreadFactory
public class NamingManagedThreadFactory implements ManagedThreadFactory {
 private static final Logger LOG = LoggerFactory.getLogger(NamingManagedThreadFactory.class);
 private final ManagedThreadFactory delegate;
 private String name;
 private String pattern;

 public NamingManagedThreadFactory(ManagedThreadFactory managedThreadFactory, String pattern, String name) {
  this.delegate = managedThreadFactory;
  this.pattern = pattern; = name;

 public Thread newThread(Runnable runnable) {
  String threadName = ThreadHelper.resolveThreadName(pattern, name);
  Thread answer = delegate.newThread(runnable);
  try {
  } catch (Exception e) {
   LOG.warn("Error seting thread name {}", e.toString());

  LOG.trace("Created thread[{}] -> {}", threadName, answer);
  return answer;
In liberty profile I hit a nasty bug looking up the ThreadFactory, but the solution was to stop camel loading on startup by setting SpringCamelContext.setNoStart(false), loading the Spring context, looking up the executorService and asynchronously starting camel.
Caused by: java.lang.UnsupportedOperationException: SRVE8011E: This operation cannot be executed from a programmatically added listener.

Saturday, 20 December 2014

Suppressing class path jaxb-api.jar can not be found

When loading an application in an osgi-like container with an app that references jabx-impl, you might see the following warning:
[WARNING ] SRVE9967W: The manifest class path jaxb-api.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-impl-2.2.7.jar or its parent.
[WARNING ] SRVE9967W: The manifest class path jaxb-core.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-core.jar.jar or its parent.

This one is caused by the specific reference to jaxb-api.jar and jaxb-core.jar in jaxb-impl-2.2.7.jar/META-INF/MANIFEST.MF:
Class-Path: jaxb-api.jar jaxb-core.jar
The artifacts that maven pull in have a different filename (with a version number) that doesn't match the expected build version. e.g. jaxb-core-2.2.7.jar and jaxb-api-2.2.7.jar

The solution:
Create 2 empty zip files and name them jaxb-core.jar and jaxb-api.jar
Place them into proj/src/main/WEB-INF/lib

Tuesday, 16 December 2014

Using SQL Server as an imitation Queue with Camel

SQL Server's lock hints allow to you to use it as a database queue. The general process is discussed on stackoverflow.

Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.

Assume we have a generic entity:
public class JpaMessage {
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    /** an optional hint as to type of message */
    private String msgType;
    /** a refrence to another table containing the blob for performance reasons */
    private Long msgBlobId;
    /** to allow for PESSIMISTIC locking */
    private Date lastUpdated;

/** basic route */
public class TestRoutbuiler extends BaseRouteBuilder {
    private String endpointUri;
    public void configure() throws Exception {
            .to("Log received message ${body}");


First is the most basic of forms and the one with the highest portability. It reads in the ResultSet and one by one, tries to lock the entity for update with javax.persistence.lock.timeout=0. In sql server this generates the NOWAIT hint.
endpointUri=jpa:JpaMessage?consumer.query=select m from JpaMessage m&consumer.skipLockedEntity=true
This is the hibernate generate sql:
select as id1_1_, jpamessage0_.last_updated as last_upd2_1_, jpamessage0_.msg_blob_id as msg_blob3_1_, jpamessage0_.msg_type as msg_type4_1_ from JPA_MESSAGE jpamessage0_
select id from JPA_MESSAGE with (updlock, rowlock, nowait ) where id =? and last_updated =?
And this is the locks it obtains in SQL Server:
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message


This is aquires an update lock on the first non-blocked row
endpointUri=jpa:native1?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
select id from JPA_MESSAGE with (updlock, rowlock ) where id =? and last_updated =?;
Locks obtained are the same as above.

nativeQuery with no consumeLockEntity

This is aquires an update lock on the first non-blocked row, disabling the programatic locking of the already locked row
endpointUri=jpa:native2?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumeLockEntity=false&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
Locks are only slightly different:
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message

nativeQuery with delete

The jpa component deletes the entity after it processed it, so technically this saves another db call and if you were to do a select from the table with READUNCOMMITTED then you wouldn't see the row which you would for the other cases. However the more exclusive database locks obtained for the duration of the processing make it less desirable.
endpointUri=jpa:nativeDelete?consumer.nativeQuery=delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*&consumeDelete=false&consumeLockEntity=false&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*;
And the more agressive lock:


If your using the JpaTransactionManager, then the message routing is done inside the same transaction. So if you had a database error that would force the transaction to rollback. If you had wanted the error handler to consume the message this wouldn't be possible.
The simple way would be to do in a new trans. i.e.:
public void configure() throws Exception {
      .to("Log received message ${body}")
However if you wanted shorter transactions, you either have to do every action in another REQUIRES_NEW transaction or on another thread - such as using seda. The best solution is to use PROPAGATION_NOT_SUPPORTED, which suspends the current transaction meaning transactions will be created only when needed - reducing the times locks are held and less chance of deadlocks.

My complete solution

consumer.greedy=true results in it immediately polling the db for another record if it has one, meaning if the queue is full it will read them off as fast as possible
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 -  when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
@NamedNativeQuery(name = "JpaMessage.byType", query = "select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) where msg_type=:msgType", resultClass = JpaMessage.class) })
public class JpaMessage {}
 <util:map id="params" key-type="java.lang.String">
  <entry key="msgType" value="filterMsgType1"/>
 <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
   <property name="transactionManager" ref="transactionManager"/>
   <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
 <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
   <property name="entityManagerFactory" ref="entityManagerFactory" />
 <bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
  <property name="entityManagerFactory" ref="entityManagerFactory" />
  <property name="transactionManager" ref="transactionManager" />
  <property name="sharedEntityManager" value="true"/>
  <property name="joinTransaction" value="false"/>
public void configure() throws Exception {
      .to("Log received message ${body}")