Skip to main content

..Where Apache Camel meets Spring JMS

This post is relevant for developers who are aware of Apache Camel and use its JMS component to consume from a JMS broker. We are only discussing the message consumption part here. I'm assuming you are conversant with Camel's terminology of routes, endpoints, processors and EIPs. If not, following is a good place to start:

Link --->  Apache Camel Home 

Ever wondered what happens underneath when a Camel consumer route is defined to take messages from a JMS endpoint ? This post is an attempt to explain how Camel integrates with JMS via Spring APIs and how the business logic defined in a Camel route is invoked ?

Following is a simplified sequence diagram showing the set-up process which will eventually lead to message consumption from JMS broker.
.

Creation of JMS endpoints in a camel context is initiated by definition (via Java/XML DSL). A JMS endpoint takes a number of configurable attributes as seen here Camel JMS Component.

As soon as context is bootstrapped, if it finds a JMS endpoint defined in a "from" clause, the Camel machinery will try to create a consumer endpoint (JMSEndpoint and JMSConsumer instance) which internally refers to a DefaultMessageListenerContainer instance.

The container IS-A specialization of Spring's well known DefaultMessageListenerContainer. The initialization process also internally installs a javax.jms.MessageListener impl via EndpointMessageListener which comes from Camel API.


EndpointMessageListener is the entry point from where our business route definition will be invoked later on.


Another important thing which happens is the specification of task executor which is a cached thread pool by default. On this pool all the message consumption and "onMessage" call back will happen.

As soon as the JMSConsumer will start (auto start by deafult), it will post N number of AsyncMessageListenerInvoker instances on the task execution pool. These instances are obviously Runnables.

Also N = no. of concurrent consumers which we had specified in endpoint configuration


From here Spring will take over...



AsynMsgListenerInvoker IS-A Runnable which was posted on the task executor. It keeps polling the JMS broker via the message listener container (because container IS-A polling consumer as well, see the 1st diagram).

As soon as a message is received, the EndpointMessageListener is invoked which has a reference to the first Processor in route definition hence triggering the business logic.

We must appreciate the fact that "onMessage" is invoked in a try-catch block which captures all Throwables. That means Spring always has an opportunity to deal with issues which remained un-handled in our business route.

Should there be any exceptions caught by Spring, it will rollback the JMS transaction if session was transacted. In case of no errors, the transaction will be committed. In case of non-transacted sessions, no rollback/commit takes place.

In any case, if there is an error, the "onMessage" handler of EndpointMessageListener will invoke route specific ErrorHandler (in sync or async manner), and will propagate back to Spring which will catch it and deal with it.


This discussion shows the points in time when a JMS commit or rollback actually takes place and how Spring gracefully deals with errors.




Comments

Popular posts from this blog

C++ logging using Apache Log4cxx on Linux

I'd demonstrate the set up of Apache log4cxx on Ubuntu 12.x in following steps: 1. Download all required packages 2. Build Log4cxx via Apache Maven 3. Use the libraries in a NetBeans C++ project on Ubuntu Linux. Reference URL: http://www.yolinux.com/TUTORIALS/Log4cxx.html This URL is quite detailed and explains things for other *nix operating systems as well. I wanted to start with minimum steps required, hence this post. I have a Windows 7 desktop and have Ubuntu 12.x 64-bit running on it via Oracle Virtualbox. So Ubuntu is running as a guest on my Windows 7 host. [The reference URL mentions different versions of  'libaprXX' libs but we have to use 'libapr1-dev' and 'libaprutil-dev', will see that later.] Prerequisites (install all of the following one by one) autoconf and automake --> sudo apt-get install autoconf automake libxml2 and libxml2-devel ( http://xmlsoft.org/ ) --> sudo apt-get install libxml2 libxml2-dev gmp ( ...

C++11 std::thread Construction and Assignment Tricks

C++11 arrived with std::thread and related classes built within the standard. With the move semantics in place, the construction and assignment practices of 'std threads' deserve a small write up. Let's define a 'Runnable' first: class Runnable {    public:       virtual void operator ()() = 0;       virtual ~Runnable(){} }; class Task : Runnable {      public:          void operator ()()          {               //do sth here..          } };                                           ...

C++ Initialization Subtleties (significance of -Werror compiler option)

Modern C++ continuous integration build pipelines might produce huge logs which may be easily overlooked. Among other errors/warnings, a potential risk caused by invalid narrowing assignments might be lurking in those dark corners... This write up is a little reminder about an essential feature in modern C++ compilers and can help defend against that specific problem. Prior to C++11, the following was a valid assignment and still is even in C++20 or higher ... unsigned int unsignedSinceBirth = 10; unsignedSinceBirth = -10;  // assigning a negative value to an unsigned container printUnsignedVal(unsignedSinceBirth); which when compiled using these options "g++ -std=c++20 <cpp files> -o <executable-name>" does not even emit a warning. And an executable is generated successfully. The output of running that code is an arbitrary unexpected value. Modern C++ (post Cpp11) allow uniform initialization syntax which can help the compiler detect this situation as follows: ...