Monday, 22 December 2014

JEE Concurrent with Camel

The official specs for JEE servers, say that your not meant to start your own threads, implying an impending doom if you do so. In actually you loose some functionality in terms of what the app server provides/allows and the app server isn't easily able to kill off rogue threads when you try and undeploy. Not the worst things in the world but if we could do something about it we would.

The first 'standard' support for spawning threads in a container was the commonj Work Manager and Timer. It was a bit clunky to use, hard to integrate and not supported across the board.

Thankfully they made the smart decision to just extend Doug Lea's framework in concurrency-ee. Integrating this into camel but substituting the used ThreadFactory with the container provided ManagedThreadFactory. The one downside of doing this is that we loose don't create the threads so we may not be able to set the name. Having said that, using a nifty trick I was able to set it on Websphere Liberty Profile. Here is the basic solution:

<beans xmlns="" xmlns:camel=""

 <camelContext id="camelContext" xmlns="">
  <contextScan />
 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/webAppThreadFactory"
  id="threadFactory" proxy-interface="javax.enterprise.concurrent.ManagedThreadFactory"
  cache="true" lookup-on-startup="false" />

 <jee:jndi-lookup jndi-name="java:comp/env/concurrent/executor" id="executorService" />
 <bean class="net.devgrok.camel.JEEThreadPoolFactory" id="threadPoolFactory">
  <constructor-arg ref="threadFactory" />
I highly recommend including the "java:comp/env/" prefix, as I was caught out several times when it resolving against the global JNDI not the local (as it tries both and if one passes then it doesn't error). JEEThreadPoolFactory is a copy and paste from DefaultThreadPoolFactory, so cutting it down for brevity's sake.
public class JEEThreadPoolFactory implements ThreadPoolFactory {
 private static final Logger log = LoggerFactory.getLogger(JEEThreadPoolFactory.class);
 private final ManagedThreadFactory managedThreadFactory;

 public JEEThreadPoolFactory(ManagedThreadFactory threadFactory) {
  this.managedThreadFactory = threadFactory;

 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  //every threadFactory is replaced by subsituteThreadFactory(threadFactory)
  return Executors.newCachedThreadPool(subsituteThreadFactory(threadFactory));

  * Gets the naming off the passed in factory and creates a wrapper around
  * the managed one to rename the thread. If permission is denied logging MDC could be using instead.
  * @param passedInFactory
  * @return
 protected ThreadFactory subsituteThreadFactory(ThreadFactory passedInFactory) {
  try {
   String pattern = ThreadHelper.DEFAULT_PATTERN;
   String name = "";
   if (passedInFactory instanceof CamelThreadFactory) {
    pattern = (String) FieldUtils.readField(passedInFactory, "pattern", true);
    name = (String) FieldUtils.readField(passedInFactory, "name", true);
    // boolean daemon;
   } else if (passedInFactory instanceof CustomizableThreadFactory) {
    pattern = "#name#-#counter#";
    name = (String) FieldUtils.readField(passedInFactory, "threadNamePrefix", true);
   }"Creating naming factory pattern {} name {}", pattern, name);
   return new NamingManagedThreadFactory(managedThreadFactory, pattern, name);
  } catch (IllegalAccessException e) {
   log.warn("Could not determine fatory name {}", e.toString());
   return managedThreadFactory;
 * Similar to CamelThreadFactory but wraps a ManagedThreadFactory
public class NamingManagedThreadFactory implements ManagedThreadFactory {
 private static final Logger LOG = LoggerFactory.getLogger(NamingManagedThreadFactory.class);
 private final ManagedThreadFactory delegate;
 private String name;
 private String pattern;

 public NamingManagedThreadFactory(ManagedThreadFactory managedThreadFactory, String pattern, String name) {
  this.delegate = managedThreadFactory;
  this.pattern = pattern; = name;

 public Thread newThread(Runnable runnable) {
  String threadName = ThreadHelper.resolveThreadName(pattern, name);
  Thread answer = delegate.newThread(runnable);
  try {
  } catch (Exception e) {
   LOG.warn("Error seting thread name {}", e.toString());

  LOG.trace("Created thread[{}] -> {}", threadName, answer);
  return answer;
In liberty profile I hit a nasty bug looking up the ThreadFactory, but the solution was to stop camel loading on startup by setting SpringCamelContext.setNoStart(false), loading the Spring context, looking up the executorService and asynchronously starting camel.
Caused by: java.lang.UnsupportedOperationException: SRVE8011E: This operation cannot be executed from a programmatically added listener.

1 comment:

  1. Thanks Chris for your post. It was useful.