Using SQL Server as an imitation Queue with Camel

SQL Server's lock hints allow to you to use it as a database queue. The general process is discussed on stackoverflow.

Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.

Assume we have a generic entity:
@Entity
public class JpaMessage {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    /** an optional hint as to type of message */
    private String msgType;
    /** a refrence to another table containing the blob for performance reasons */
    private Long msgBlobId;
    /** to allow for PESSIMISTIC locking */
    @Version
    private Date lastUpdated;
}

/** basic route */
public class TestRoutbuiler extends BaseRouteBuilder {
    @Value("${endpoint.uri}")
    private String endpointUri;
    @Override
    public void configure() throws Exception {
        from(endpointUri)
            .to("Log received message ${body}");
    }
}


SkipLockedEntity

First is the most basic of forms and the one with the highest portability. It reads in the ResultSet and one by one, tries to lock the entity for update with javax.persistence.lock.timeout=0. In sql server this generates the NOWAIT hint.
endpointUri=jpa:JpaMessage?consumer.query=select m from JpaMessage m&consumer.skipLockedEntity=true
This is the hibernate generate sql:
select jpamessage0_.id as id1_1_, jpamessage0_.last_updated as last_upd2_1_, jpamessage0_.msg_blob_id as msg_blob3_1_, jpamessage0_.msg_type as msg_type4_1_ from JPA_MESSAGE jpamessage0_
select id from JPA_MESSAGE with (updlock, rowlock, nowait ) where id =? and last_updated =?
And this is the locks it obtains in SQL Server:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message
METADATAprincipal_id = 1Sch-SLOCKGRANTNULLNULL
METADATAdata_space_id = 2Sch-SLOCKGRANTNULLNULL
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE3:110IULOCKGRANTjpa_messagePK_jpa_message


nativeQuery

This is aquires an update lock on the first non-blocked row
endpointUri=jpa:native1?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
select id from JPA_MESSAGE with (updlock, rowlock ) where id =? and last_updated =?;
Locks obtained are the same as above.


nativeQuery with no consumeLockEntity

This is aquires an update lock on the first non-blocked row, disabling the programatic locking of the already locked row
endpointUri=jpa:native2?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);
Locks are only slightly different:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756) ULOCKGRANTjpa_messagePK_jpa_message
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE3:110IULOCKGRANTjpa_messagePK_jpa_message


nativeQuery with delete

The jpa component deletes the entity after it processed it, so technically this saves another db call and if you were to do a select from the table with READUNCOMMITTED then you wouldn't see the row which you would for the other cases. However the more exclusive database locks obtained for the duration of the processing make it less desirable.
endpointUri=jpa:nativeDelete?consumer.nativeQuery=delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*&consumeDelete=false&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessage
Being native query it generates a statement similar to above plus the entity lock which is performed by default:
delete top (1) from jpa_message  WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*;
And the more agressive lock:
ResTypeResDescrReqModeReqTypeReqStatusObjectNameIndexName
KEY(010008207756)XLOCKGRANTjpa_messagePK_jpa_message
METADATAprincipal_id = 1Sch-SLOCKGRANTNULLNULL
OBJECTIXLOCKGRANTjpa_messageNULL
PAGE0.201388889IXLOCKGRANTjpa_messagePK_jpa_message


Cavets

If your using the JpaTransactionManager, then the message routing is done inside the same transaction. So if you had a database error that would force the transaction to rollback. If you had wanted the error handler to consume the message this wouldn't be possible.
The simple way would be to do in a new trans. i.e.:
public void configure() throws Exception {
   from(endpointUri)
      .errorHandler(deadLetterChannel("log:error"))
      .transacted("PROPAGATION_REQUIRES_NEW")
      .to("Log received message ${body}")
      .bean(someBeanWithRollback);
}
However if you wanted shorter transactions, you either have to do every action in another REQUIRES_NEW transaction or on another thread - such as using seda. The best solution is to use PROPAGATION_NOT_SUPPORTED, which suspends the current transaction meaning transactions will be created only when needed - reducing the times locks are held and less chance of deadlocks.


My complete solution

jpa.delay=1000
endpointUri=jpa:namedQuery?consumer.delay=${jpa.delay}&consumer.greedy=true&consumer.namedQuery=JpaMessage.byType&consumer.parameters=#params&consumeLockEntity=false
consumer.greedy=true results in it immediately polling the db for another record if it has one, meaning if the queue is full it will read them off as fast as possible
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 -  when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
@Entity
@NamedNativeQuery(name = "JpaMessage.byType", query = "select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) where msg_type=:msgType", resultClass = JpaMessage.class) })
public class JpaMessage {}
<beans> 
 <util:map id="params" key-type="java.lang.String">
  <entry key="msgType" value="filterMsgType1"/>
 </util:map>
 <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
   <property name="transactionManager" ref="transactionManager"/>
   <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/>
 </bean>
 <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
   <property name="entityManagerFactory" ref="entityManagerFactory" />
 </bean>
 <bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent">
  <property name="entityManagerFactory" ref="entityManagerFactory" />
  <property name="transactionManager" ref="transactionManager" />
  <property name="sharedEntityManager" value="true"/>
  <property name="joinTransaction" value="false"/>
 </bean>
</beans>
public void configure() throws Exception {
   from(endpointUri)
      .errorHandler(deadLetterChannel("log:error"))
      .transacted("PROPAGATION_NOT_SUPPORTED")
      .to("Log received message ${body}")
      ;
}

Comments