Skip to main content

Market data subscriber prototype based on Disruptor




Let's talk about a market data subscriber application based on LMAX Disruptor I recently implemented.

Source code hosted on my Github repo: https://github.com/NiinS/Java/tree/master/MarketDataSubscriber

Quick Overview:

The app subscribes to FX quotes and trade events from an event source and a Swing client displays it in a dynamically updating tabular structure as follows:







Swing interface however is only an example of an end user which is making use of the processed events published by the app. Any number/kind of end user interfaces can be integrated with the application and they can process data in different ways.

Following is a simplified application architecture diagram:





The application employs a concept of gateways, an event wheel and business event processors. 

Event Gateways: Gateways connect to actual event sources and publish raw market events to event wheel.

Event wheel: Imagine an industrial flywheel which keeps rotating to generate rotational energy. The wheel implementation in this case follows the same abstraction. It keeps receiving raw market events from configured gateways and keeps generating processed market events to be consumed by business event processors.

I have implemented an example wheel using a Disruptor based ring buffer.

Event Processors: They deliver processed market events to interested end users. In our case, processors deliver events to a Java Swing client.


Some design choices ...

Disruptor integration

The most important take away from Disruptor architecture is its ability to provide a lock free access to the underlying data structure while maintaining the constraint that slow consumers will never be overrun by fast publishers and therefore events will not be missed.

Publisher and subscriber configuration in the app can be seen here:

Having a Disruptor based ring ensures that a gateway is not going to claim a slot in the ring which is lower than cursor (highest ready to read slot) and is still not read. Therefore giving event processor an opportunity to consume last published event(s) until cursor. 

The wheel impl has been set up to use a Multi-producer sequencer hence all publishers will be gated on the single sequence used for event processor.

//during wheel's  initialization
disruptor = new Disruptor< MarketEvent.Builder >( new MarketEventBuilderFactory(), maxSlots, pool,  ProducerType.MULTI, new YieldingWaitStrategy() );

Gating is applied when the wheel starts rotating..

    public void startRotating()
    {
       if ( disruptor != null )
           disruptor.start();  //internally a gating sequence will be applied on the ring
    }



Market data representation

Google protobuf as a transport offers a good choice because it's fast in serialized form and backward compatible as long as you follow the 'protobuf field position rules'. Only drawback is that once "built", a protobuf object can't be reused. Its effectively an immutable instance.

Our ring buffer on the other hand has slots represented by re-usable protobuf "Builder" objects. So every time a publisher claims a slot, it gets a MarketEvent.Builder instance, which is then filled and built partially. In this way, we don't need to recreate slots for every new raw gateway event.

Raw gateway events themselves are presented as "byte[ ]" to ring buffer to be merged with claimed slots. Effectively there is no object creation in the entire process.

   //how gateways claim and re-publish a slot
   ....

   long claimedSlot = ringBuffer.next(); //claim or wait if not available

    try
    {
        MarketEvent.Builder eventBuilderToFill = ringBuffer.get( claimedSlot );
        eventBuilderToFill.clear();
        eventBuilderToFill.mergeFrom( array, offset, length );
    }
    catch ( InvalidProtocolBufferException e )
    {
        log.error( "Could not build proto event from passed bytes array", e );
    }
    finally
    {
        ringBuffer.publish( claimedSlot ); //re-publish
    }


From here on, published slots will eventually be made available to an event processor which will asynchronously distribute the "partially" built market data events to actual business event processors. 

From an implementation perspective, I've written a Netty based example gateway which knows how to decode market events sent as byte[]. The business event processing is handled via a Swing user interface. 

There might be many areas for improvement or add-on features e.g. we can enrich the user interface to have a dashboard which shows some conclusions based on received market events e.g.

-- widgets to show all cancelled Limit orders received in last 15 minutes 
-- or all INR/USD trades which were successful in a specific date range and so on...

These sort of querying capabilities are easily achievable via Esper event stream processing. The idea is that Esper engine needs to be configured with queries (defined via EQL syntax) and when the market events arrive, the engine evaluates them against the queries.. Hence emitting conclusions.

How to run the application ?

To run subscriber:

-- Main class: ns.freetime.MarketSubscriberMain
-- Should have /resources, /etc and compiled code in class path

To run sample data provider:
 

-- Main class: sample.SampleMarketDataSource
-- Pass "1091" port no as program argument to connect to subscriber. Subscriber's port no can be changed in /etc/subscriber-settings.properties file.





I'd take a break now.. Please feel free to download the source and experiment. Feedback / criticism always welcome :)



Comments

  1. This comment has been removed by a blog administrator.

    ReplyDelete

Post a Comment

Popular posts from this blog

C++11 std::thread Construction and Assignment Tricks

C++11 arrived with std::thread and related classes built within the standard. With the move semantics in place, the construction and assignment practices of 'std threads' deserve a small write up. Let's define a 'Runnable' first: class Runnable {    public:       virtual void operator ()() = 0;       virtual ~Runnable(){} }; class Task : Runnable {      public:          void operator ()()          {               //do sth here..          } };                                                 //later..      #include <thread>     Task runnable;     std::thread t1; //not attached to any thread                                                 std::thread t2 ( runnable ); //attached and running   Copy Construction of std::thread is not allowed            std::thread t3 ( t2 ); // compilation error            std::thread t4 = t2; // compilation error error : use of deleted function std::thread (const std::th

C++ logging using Apache Log4cxx on Linux

I'd demonstrate the set up of Apache log4cxx on Ubuntu 12.x in following steps: 1. Download all required packages 2. Build Log4cxx via Apache Maven 3. Use the libraries in a NetBeans C++ project on Ubuntu Linux. Reference URL: http://www.yolinux.com/TUTORIALS/Log4cxx.html This URL is quite detailed and explains things for other *nix operating systems as well. I wanted to start with minimum steps required, hence this post. I have a Windows 7 desktop and have Ubuntu 12.x 64-bit running on it via Oracle Virtualbox. So Ubuntu is running as a guest on my Windows 7 host. [The reference URL mentions different versions of  'libaprXX' libs but we have to use 'libapr1-dev' and 'libaprutil-dev', will see that later.] Prerequisites (install all of the following one by one) autoconf and automake --> sudo apt-get install autoconf automake libxml2 and libxml2-devel ( http://xmlsoft.org/ ) --> sudo apt-get install libxml2 libxml2-dev gmp (

..Where Apache Camel meets Spring JMS

This post is relevant for developers who are aware of Apache Camel and use its JMS component to consume from a JMS broker. We are only discussing the message consumption part here. I'm assuming you are conversant with Camel's terminology of routes, endpoints, processors and EIPs. If not, following is a good place to start: Link --->  Apache Camel Home   Ever wondered what happens underneath when a Camel consumer route is defined to take messages from a JMS endpoint ? This post is an attempt to explain how Camel integrates with JMS via Spring APIs and how the business logic defined in a Camel route is invoked ? Following is a simplified sequence diagram showing the set-up process which will eventually lead to message consumption from JMS broker. . Creation of JMS endpoints in a camel context is initiated by definition (via Java/XML DSL). A JMS endpoint takes a number of configurable attributes as seen here Camel JMS Component . As soon as context is bootstrappe