Using SQL Server as an imitation Queue with Camel
SQL Server's lock hints allow to you to use it as a database queue. The general process is discussed on stackoverflow.
Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.
Assume we have a generic entity:
The simple way would be to do in a new trans. i.e.:
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 - when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
Here I wanted to summarise the different ways it can be configured with the camel-jpa component. Testing was done against SQL Server 2008 R2 Developer Edition. The list of locks was obtained using this useful script from Microsoft.
Assume we have a generic entity:
@Entity public class JpaMessage { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; /** an optional hint as to type of message */ private String msgType; /** a refrence to another table containing the blob for performance reasons */ private Long msgBlobId; /** to allow for PESSIMISTIC locking */ @Version private Date lastUpdated; } /** basic route */ public class TestRoutbuiler extends BaseRouteBuilder { @Value("${endpoint.uri}") private String endpointUri; @Override public void configure() throws Exception { from(endpointUri) .to("Log received message ${body}"); } }
SkipLockedEntity
First is the most basic of forms and the one with the highest portability. It reads in the ResultSet and one by one, tries to lock the entity for update with javax.persistence.lock.timeout=0. In sql server this generates the NOWAIT hint.endpointUri=jpa:JpaMessage?consumer.query=select m from JpaMessage m&consumer.skipLockedEntity=trueThis is the hibernate generate sql:
select jpamessage0_.id as id1_1_, jpamessage0_.last_updated as last_upd2_1_, jpamessage0_.msg_blob_id as msg_blob3_1_, jpamessage0_.msg_type as msg_type4_1_ from JPA_MESSAGE jpamessage0_ select id from JPA_MESSAGE with (updlock, rowlock, nowait ) where id =? and last_updated =?And this is the locks it obtains in SQL Server:
ResType | ResDescr | ReqMode | ReqType | ReqStatus | ObjectName | IndexName |
---|---|---|---|---|---|---|
KEY | (010008207756) | U | LOCK | GRANT | jpa_message | PK_jpa_message |
METADATA | principal_id = 1 | Sch-S | LOCK | GRANT | NULL | NULL |
METADATA | data_space_id = 2 | Sch-S | LOCK | GRANT | NULL | NULL |
OBJECT | IX | LOCK | GRANT | jpa_message | NULL | |
PAGE | 3:110 | IU | LOCK | GRANT | jpa_message | PK_jpa_message |
nativeQuery
This is aquires an update lock on the first non-blocked rowendpointUri=jpa:native1?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumer.resultClass=net.devgrok.model.support.JpaMessageBeing native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK); select id from JPA_MESSAGE with (updlock, rowlock ) where id =? and last_updated =?;Locks obtained are the same as above.
nativeQuery with no consumeLockEntity
This is aquires an update lock on the first non-blocked row, disabling the programatic locking of the already locked rowendpointUri=jpa:native2?consumer.nativeQuery=select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK)&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessageBeing native query it generates a statement similar to above plus the entity lock which is performed by default:
select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK);Locks are only slightly different:
ResType | ResDescr | ReqMode | ReqType | ReqStatus | ObjectName | IndexName |
---|---|---|---|---|---|---|
KEY | (010008207756) | U | LOCK | GRANT | jpa_message | PK_jpa_message |
OBJECT | IX | LOCK | GRANT | jpa_message | NULL | |
PAGE | 3:110 | IU | LOCK | GRANT | jpa_message | PK_jpa_message |
nativeQuery with delete
The jpa component deletes the entity after it processed it, so technically this saves another db call and if you were to do a select from the table with READUNCOMMITTED then you wouldn't see the row which you would for the other cases. However the more exclusive database locks obtained for the duration of the processing make it less desirable.endpointUri=jpa:nativeDelete?consumer.nativeQuery=delete top (1) from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*&consumeDelete=false&consumeLockEntity=false&consumer.resultClass=net.devgrok.model.support.JpaMessageBeing native query it generates a statement similar to above plus the entity lock which is performed by default:
delete top (1) from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) output deleted.*;And the more agressive lock:
ResType | ResDescr | ReqMode | ReqType | ReqStatus | ObjectName | IndexName |
---|---|---|---|---|---|---|
KEY | (010008207756) | X | LOCK | GRANT | jpa_message | PK_jpa_message |
METADATA | principal_id = 1 | Sch-S | LOCK | GRANT | NULL | NULL |
OBJECT | IX | LOCK | GRANT | jpa_message | NULL | |
PAGE | 0.201388889 | IX | LOCK | GRANT | jpa_message | PK_jpa_message |
Cavets
If your using the JpaTransactionManager, then the message routing is done inside the same transaction. So if you had a database error that would force the transaction to rollback. If you had wanted the error handler to consume the message this wouldn't be possible.The simple way would be to do in a new trans. i.e.:
public void configure() throws Exception { from(endpointUri) .errorHandler(deadLetterChannel("log:error")) .transacted("PROPAGATION_REQUIRES_NEW") .to("Log received message ${body}") .bean(someBeanWithRollback); }However if you wanted shorter transactions, you either have to do every action in another REQUIRES_NEW transaction or on another thread - such as using seda. The best solution is to use PROPAGATION_NOT_SUPPORTED, which suspends the current transaction meaning transactions will be created only when needed - reducing the times locks are held and less chance of deadlocks.
My complete solution
jpa.delay=1000 endpointUri=jpa:namedQuery?consumer.delay=${jpa.delay}&consumer.greedy=true&consumer.namedQuery=JpaMessage.byType&consumer.parameters=#params&consumeLockEntity=falseconsumer.greedy=true results in it immediately polling the db for another record if it has one, meaning if the queue is full it will read them off as fast as possible
consumer.namedQuery is a jpa query defined below
consumer.parameters references a map with the parameters below, allows different endpoints to use the same query with different parameters
sharedEntityManager - CAMEL-8054 - when using a JpaTransactionManager, stops an additional EntityManager and DB connection from being created.
joinTransaction - set to false as sharedEntityManager set to true
@Entity @NamedNativeQuery(name = "JpaMessage.byType", query = "select top 1 * from jpa_message WITH (UPDLOCK, READPAST, ROWLOCK) where msg_type=:msgType", resultClass = JpaMessage.class) }) public class JpaMessage {}
<beans> <util:map id="params" key-type="java.lang.String"> <entry key="msgType" value="filterMsgType1"/> </util:map> <bean id="PROPAGATION_NOT_SUPPORTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="transactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/> </bean> <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager"> <property name="entityManagerFactory" ref="entityManagerFactory" /> </bean> <bean id="jpa" class="org.apache.camel.component.jpa.JpaComponent"> <property name="entityManagerFactory" ref="entityManagerFactory" /> <property name="transactionManager" ref="transactionManager" /> <property name="sharedEntityManager" value="true"/> <property name="joinTransaction" value="false"/> </bean> </beans>
public void configure() throws Exception { from(endpointUri) .errorHandler(deadLetterChannel("log:error")) .transacted("PROPAGATION_NOT_SUPPORTED") .to("Log received message ${body}") ; }
Comments
Post a Comment