Monday, 22 December 2014

JEE Concurrent with Camel

The official specs for JEE servers, say that your not meant to start your own threads, implying an impending doom if you do so. In actually you loose some functionality in terms of what the app server provides/allows and the app server isn't easily able to kill off rogue threads when you try and undeploy. Not the worst things in the world but if we could do something about it we would.

The first 'standard' support for spawning threads in a container was the commonj Work Manager and Timer. It was a bit clunky to use, hard to integrate and not supported across the board.

Thankfully they made the smart decision to just extend Doug Lea's framework in concurrency-ee. Integrating this into camel but substituting the used ThreadFactory with the container provided ManagedThreadFactory. The one downside of doing this is that we loose don't create the threads so we may not be able to set the name. Having said that, using a nifty trick I was able to set it on Websphere Liberty Profile. Here is the basic solution:

<beans xmlns="" xmlns:camel=""

 <camelContext id="camelContext" xmlns="">
  <contextScan />
 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/webAppThreadFactory"
  id="threadFactory" proxy-interface="javax.enterprise.concurrent.ManagedThreadFactory"
  cache="true" lookup-on-startup="false" />

 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/executor" id="executorService" />
 <bean class="net.devgrok.camel.JEEThreadPoolFactory" id="threadPoolFactory">
  <constructor-arg ref="threadFactory" />
I highly recommend including the "java:comp/env/" prefix, as I was caught out several times when it resolving against the global JNDI not the local (as it tries both and if one passes then it doesn't error). JEEThreadPoolFactory is a copy and paste from DefaultThreadPoolFactory, so cutting it down for brevity's sake.
public class JEEThreadPoolFactory implements ThreadPoolFactory {
 private static final Logger log = LoggerFactory.getLogger(JEEThreadPoolFactory.class);
 private final ManagedThreadFactory managedThreadFactory;

 public JEEThreadPoolFactory(ManagedThreadFactory threadFactory) {
  this.managedThreadFactory = threadFactory;

 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  //every threadFactory is replaced by subsituteThreadFactory(threadFactory)
  return Executors.newCachedThreadPool(subsituteThreadFactory(threadFactory));

  * Gets the naming off the passed in factory and creates a wrapper around
  * the managed one to rename the thread. If permission is denied logging MDC could be using instead.
  * @param passedInFactory
  * @return
 protected ThreadFactory subsituteThreadFactory(ThreadFactory passedInFactory) {
  try {
   String pattern = ThreadHelper.DEFAULT_PATTERN;
   String name = "";
   if (passedInFactory instanceof CamelThreadFactory) {
    pattern = (String) FieldUtils.readField(passedInFactory, "pattern", true);
    name = (String) FieldUtils.readField(passedInFactory, "name", true);
    // boolean daemon;
   } else if (passedInFactory instanceof CustomizableThreadFactory) {
    pattern = "#name#-#counter#";
    name = (String) FieldUtils.readField(passedInFactory, "threadNamePrefix", true);
   }"Creating naming factory pattern {} name {}", pattern, name);
   return new NamingManagedThreadFactory(managedThreadFactory, pattern, name);
  } catch (IllegalAccessException e) {
   log.warn("Could not determine fatory name {}", e.toString());
   return managedThreadFactory;
 * Similar to CamelThreadFactory but wraps a ManagedThreadFactory
public class NamingManagedThreadFactory implements ManagedThreadFactory {
 private static final Logger LOG = LoggerFactory.getLogger(NamingManagedThreadFactory.class);
 private final ManagedThreadFactory delegate;
 private String name;
 private String pattern;

 public NamingManagedThreadFactory(ManagedThreadFactory managedThreadFactory, String pattern, String name) {
  this.delegate = managedThreadFactory;
  this.pattern = pattern; = name;

 public Thread newThread(Runnable runnable) {
  String threadName = ThreadHelper.resolveThreadName(pattern, name);
  Thread answer = delegate.newThread(runnable);
  try {
  } catch (Exception e) {
   LOG.warn("Error seting thread name {}", e.toString());

  LOG.trace("Created thread[{}] -> {}", threadName, answer);
  return answer;
In liberty profile I hit a nasty bug looking up the ThreadFactory, but the solution was to stop camel loading on startup by setting SpringCamelContext.setNoStart(false), loading the Spring context, looking up the executorService and asynchronously starting camel.
Caused by: java.lang.UnsupportedOperationException: SRVE8011E: This operation cannot be executed from a programmatically added listener.

Saturday, 20 December 2014

Suppressing class path jaxb-api.jar can not be found

When loading an application in an osgi-like container with an app that references jabx-impl, you might see the following warning:
[WARNING ] SRVE9967W: The manifest class path jaxb-api.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-impl-2.2.7.jar or its parent.
[WARNING ] SRVE9967W: The manifest class path jaxb-core.jar can not be found in jar file file:/D:/Java/maven/repo/com/sun/xml/bind/jaxb-impl/2.2.7/jaxb-core.jar.jar or its parent.

This one is caused by the specific reference to jaxb-api.jar and jaxb-core.jar in jaxb-impl-2.2.7.jar/META-INF/MANIFEST.MF:
Class-Path: jaxb-api.jar jaxb-core.jar
The artifacts that maven pull in have a different filename (with a version number) that doesn't match the expected build version. e.g. jaxb-core-2.2.7.jar and jaxb-api-2.2.7.jar

The solution:
Create 2 empty zip files and name them jaxb-core.jar and jaxb-api.jar
Place them into proj/src/main/WEB-INF/lib

Tuesday, 16 December 2014

Using SQL Server as an imitation Queue with Camel

SQL Server's lock hints allow to you to use it as a database queue. The general process is discussed on stackoverflow.

Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.

Assume we have a generic entity:
public class JpaMessage {
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    /** an optional hint as to type of message */
    private String msgType;
    /** a refrence to another table containing the blob for performance reasons */
    private Long msgBlobId;
    /** to allow for PESSIMISTIC locking */
    private Date lastUpdated;

/** basic route */
public class TestRoutbuiler extends BaseRouteBuilder {
    private String endpointUri;
    public void configure() throws Exception {
            .to("Log received message ${body}");


First is the most basic of forms and the one with the highest portability. It reads in the ResultSet and one by one, tries to lock the entity for update with javax.persistence.lock.timeout=0. In sql server this generates the NOWAIT hint.
endpointUri=jpa:JpaMessage?consumer.query=select m from JpaMessage m&consumer.skipLockedEntity=true
This is the hibernate generate sql:
select as id1_1_, jpamessage0_.last_updated as last_upd2_1_, jpamessage0_.msg_blob_id as msg_blob3_1_, jpamessage0_.msg_type as msg_type4_1_ from JPA_MESSAGE jpamessage0_
select id from JPA_MESSAGE with (updlock, rowlock, nowait ) where id =? and last_updated =?
And this is the locks it obtains in SQL Server:
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message


This is aquires an update lock on the first non-blocked row
endpointUri=jpa:native1?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
select id from JPA_MESSAGE with (updlock, rowlock ) where id =? and last_updated =?;
Locks obtained are the same as above.

nativeQuery with no consumeLockEntity

This is aquires an update lock on the first non-blocked row, disabling the programatic locking of the already locked row
endpointUri=jpa:native2?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumeLockEntity=false&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
Locks are only slightly different:
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message

nativeQuery with delete

The jpa component deletes the entity after it processed it, so technically this saves another db call and if you were to do a select from the table with READUNCOMMITTED then you wouldn't see the row which you would for the other cases. However the more exclusive database locks obtained for the duration of the processing make it less desirable.
endpointUri=jpa:nativeDelete?consumer.nativeQuery=delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*&consumeDelete=false&consumeLockEntity=false&
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*;
And the more agressive lock:


If your using the JpaTransactionManager, then the message routing is done inside the same transaction. So if you had a database error that would force the transaction to rollback. If you had wanted the error handler to consume the message this wouldn't be possible.
The simple way would be to do in a new trans. i.e.:
public void configure() throws Exception {
      .to("Log received message ${body}")
However if you wanted shorter transactions, you either have to do every action in another REQUIRES_NEW transaction or on another thread - such as using seda. The best solution is to use PROPAGATION_NOT_SUPPORTED, which suspends the current transaction meaning transactions will be created only when needed - reducing the times locks are held and less chance of deadlocks.

My complete solution

consumer.greedy=true results in it immediately polling the db for another record if it has one, meaning if the queue is full it will read them off as fast as possible
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 -  when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
@NamedNativeQuery(name = "JpaMessage.byType", query = "select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) where msg_type=:msgType", resultClass = JpaMessage.class) })
public class JpaMessage {}
 <util:map id="params" key-type="java.lang.String">
  <entry key="msgType" value="filterMsgType1"/>
 <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
   <property name="transactionManager" ref="transactionManager"/>
   <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
 <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
   <property name="entityManagerFactory" ref="entityManagerFactory" />
 <bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
  <property name="entityManagerFactory" ref="entityManagerFactory" />
  <property name="transactionManager" ref="transactionManager" />
  <property name="sharedEntityManager" value="true"/>
  <property name="joinTransaction" value="false"/>
public void configure() throws Exception {
      .to("Log received message ${body}")

Saturday, 1 November 2014

Setting ApplIdentityData on a Liberty Profile defined queue

Setting the necessary properties on the Java Client for MQ libraries in order to pass the ApplIdentityData is quiet simple, as mentioned in this stackoverflow post:
MQDestination queue;
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_WRITE_ENABLED, true);
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
javax.jms.Message message;
message.setStringProperty(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, "....");

The meaning of the properties is listed in the documentation: Destination object properties.

If using Websphere Application Server, you can define a Jms Queue as a Resource which can be intern looked up via JNDI. IBM has a pretty good summary of the different methods.
How to read and write fields from the MQMD via WebSphere MQ Classes for JMS Version 7: direct, using JNDI and using WebSphere Application Server.

To set it up in WAS7 for instance:
  • From the admin console
  • Resources/JMS/Queues
  • Select the scope you want to use from the drop down (i.e. Cluster=MYCLUSTER)
  • New
  • "WebSphere MQ messaging provider"
  • Name: any name you want
    JNDI Name: the jndi path you'll look it up via, i.e. jms/SendQ
    Queue name: the name of the queue on the queue manager
  • Apply
  • Advanced properties
    Uncheck "Append RFH version 2 headers to messages sent to this destination" to send raw MQ messages instead of JMS messages (with a JMS header)
  • Custom properties
    • Add name=MDR value=YES Description=WMQ_MQMD_READ_ENABLED Type=String
    • Add name=MDW value=YES Description=WMQ_MQMD_WRITE_ENABLED Type=String
    • Add name=MDCTX value=SET_IDENTITY_CONTEXT Description=WMQ_MQMD_MESSAGE_CONTEXT Type=String
  • Save
However in Websphere Liberty Profile the options are different. Here is a sample server.xml:
<server description="Sample MQ Server">

 <!-- Enable features -->
 <featuremanager onerror="WARN">

 <jmsqueueconnectionfactory jndiname="jms/qcf">
  <properties.wmqjms channel="MYCHANNEL" hostname="THEHOST" queuemanager="THEQUEUEMANAGER" transporttype="CLIENT"/>

 <jmsqueue id="qSend" jndiname="jms/SendQ">
  <properties.wmqjms arbitraryproperties='MDR="YES", MDW="YES", MDCTX="2"' basequeuename="MESSAGE.OUTBOUND" targetclient="MQ"/>

 <variable name="wmqJmsClient.rar.location" value="${shared.resource.dir}/wmq/wmq.jmsra.rar"/>

 <!-- -->
 <webcontainer deferservletload="false"/>
If your using camel you would wire it up thus:
<beans xmlns:jee="" xmlns:xsi="" xmlns="" xsi:schemalocation="">

    <jee:jndi-lookup id="connectionFactory" jndi-name="jms/qcf"/>

    <bean class="org.apache.camel.component.jms.JmsComponent" id="jms">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationResolver" ref="jmsDestinationResolver"/>

    <bean class="" id="jmsDestinationResolver">
        <property name="jndiTemplate" ref="jndiTemplate"/>

    <bean class="org.springframework.jndi.JndiTemplate" id="jndiTemplate"/>

And finally the route:
        .setHeader("JMS_IBM_MQMD_ApplIdentityData", constant("USERNAMEPASSWORD"))

Monday, 1 September 2014

Limitations of compile time woven spring-aspects

The compile time woven aspectj spring-aspects provides some very useful features. The features I was after was:
  • transactional boundaries between methods in the same bean. e.g. nonTransactionMethod() calling requiresNewMethod() without having to use self references or manually handling the transactions.
  • consistent results between testing in Eclipse, testing via maven and running on the server (instead of using the loadtime weaver)
During testing in both eclipse and maven I started getting exceptions of the likes "NoTransactionException". The individual test would pass fine but when running all the tests together they would fail. Debugging through the tests showed that transaction manager was still being called but it was the wrong one! Decompiling the aspectj woven code shows the reason. My simple class:
package net.devgrok;

import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.aspectj.AbstractTransactionAspect;
import org.springframework.transaction.aspectj.AnnotationTransactionAspect;

public class AspectWoven
  public void noTransacted()
  private void doInTransaction()
    Object[] arrayOfObject = new Object[1];arrayOfObject[0] = this;AnnotationTransactionAspect.aspectOf().ajc$around$org_springframework_transaction_aspectj_AbstractTransactionAspect$1$2a73e96c(this, new AspectWoven.AjcClosure1(arrayOfObject), ajc$tjp_0);
  static {}
The spring classes:
public class AnnotationTransactionAspect
  extends AbstractTransactionAspect
  public static boolean hasAspect()
    return ajc$perSingletonInstance != null;
  public static AnnotationTransactionAspect aspectOf()
    if (ajc$perSingletonInstance == null) {
      throw new NoAspectBoundException("org_springframework_transaction_aspectj_AnnotationTransactionAspect", ajc$initFailureCause);
    return ajc$perSingletonInstance;
    catch (Throwable localThrowable)
      ajc$initFailureCause = localThrowable;
  public AnnotationTransactionAspect()
    super(new AnnotationTransactionAttributeSource(false));
public class AspectJTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

 public AnnotationTransactionAspect transactionAspect() {
  AnnotationTransactionAspect txAspect = AnnotationTransactionAspect.aspectOf();
  if (this.txManager != null) {
  return txAspect;
Following the flow through this in reverse:
  • On creation of the context #1, Spring gets a reference to the static AnnotationTransactionAspect.aspectOf and injects the transaction manager
  • Calling the transacted method on a bean from context #1, calls the static AnnotationTransactionAspect.aspectOf referring to the transaction manager from context #1
  • On creation of the context #2, Spring gets a reference to the static AnnotationTransactionAspect.aspectOf and injects the transaction manager, overwriting the transaction manager from context #1
  • Calling the transacted method on a bean from context #1, calls the static AnnotationTransactionAspect.aspectOf referring to the transaction manager from context #2
And there in lies the problem - the transaction manager and the aspect itself is a static singleton. It is mentioned on the spring documentation but not the implications it has on testing. Working with multiple application contexts
The AnnotationBeanConfigurerAspect used to implement the @Configurable support is an AspectJ singleton aspect. The scope of a singleton aspect is the same as the scope of static members, that is to say there is one aspect instance per classloader that defines the type. This means that if you define multiple application contexts within the same classloader hierarchy you need to consider where to define the @EnableSpringConfigured bean and where to place spring-aspects.jar on the classpath.

Consider a typical Spring web-app configuration with a shared parent application context defining common business services and everything needed to support them, and one child application context per servlet containing definitions particular to that servlet. All of these contexts will co-exist within the same classloader hierarchy, and so the AnnotationBeanConfigurerAspect can only hold a reference to one of them. In this case we recommend defining the @EnableSpringConfigured bean in the shared (parent) application context: this defines the services that you are likely to want to inject into domain objects. A consequence is that you cannot configure domain objects with references to beans defined in the child (servlet-specific) contexts using the @Configurable mechanism (probably not something you want to do anyway!).

When deploying multiple web-apps within the same container, ensure that each web-application loads the types in spring-aspects.jar using its own classloader (for example, by placing spring-aspects.jar in 'WEB-INF/lib'). If spring-aspects.jar is only added to the container wide classpath (and hence loaded by the shared parent classloader), all web applications will share the same aspect instance which is probably not what you want.

Which is a massive limitation when trying to do unit tests. It is very rare that your unit test will only have one context (set of configuration files). The only solution to this would be to write your own implementation of the aspects to remember which spring context created each bean.