Compared to the previous generations of Refinitiv APIs, the Enterprise Message API (EMA) greatly improves the ease with which developers write applications for Refinitiv Real Time platforms. Even if you are new to real time applications development, EMA quick start guides and tutorials will quickly bring you up to speed with a running application that retrieves real time data. Once you reached this point you may wonder what to do next. How to actually integrate EMA in your own application? Indeed, quick start guides and tutorials are excellent to learn how the API works but they rarely do much more than just printing out the data they receive. Real applications, on the other hand, need a bit more than that.
Because EMA is a relatively low level API, applications often have to implement an intermediate layer between the API and their business logic. The goal of this layer is usually to provide a level of abstraction that is comfortable enough so that the business logic doesn’t have to get into the nuts and bolts that come with EMA. Obviously, different applications have different needs in this domain, but there are commonalities and questions developers often ask when they implement this kind of layer.
The purpose of this article is to explain some of the key elements that will help you to integrate EMA in your application for market price subscription. The article is in two parts:
- In the first part I ask some of the common questions developers ask and suggest possible answers and solutions.
- In the second part I present an implementation of the proposed solutions.
Note: This implementation is part of a set of value add objects built on top of EMA.These objects are available in the ValueAddObjectsForEMA GitHub project (read the Simplifying Content Access in EMA article to learn more about this project).
There’s no rocket science here. What I’m about to explain is probably well known by every experienced real time application developer, but non-experts may find this information useful. If you fall into this second category, make sure you are familiar with the concepts explained in 10 important things you need to know before you write an Refinitiv Real Time application before you read this article.
If you followed the EMA tutorials or read the documentation, you learnt that just after it subscribed to a MarketPrice data item, your application receives a stream of events and messages containing the requested data. That’s all fine, but as you understood these messages come asynchronously after the subscription method (registerClient) has returned control to your application thread. Because of that your application must wait for the requested data to come in before it can move forward and actually do something with the subscribed item. But, how do you know that the item is complete and what does "complete" exactly mean?
By complete I mean that either your application received the requested data so that it can actually use it or that it received an error status indicating a problem with the item.
Because a MarketPrice image always come via a single Refresh message, deciding if a MarketPrice item is complete is quite easy.
Note: Deciding on the completion of other item types like order books, chains or pages is a bit more complex because their first image may come in several Refresh Messages. Read my other articles to find out how to proceed with these other item types.
A MarketPrice is considered complete as soon as your application received either:
- a Refresh message or
- a Status message indicating a closed stream (CLOSED, CLOSED_RECOVER or CLOSED_REDIRECTED).
The following diagrams illustrate three different cases of MarketPrice data item completion.
Note: Two of these use cases rely on the interestAfterRefresh EMA request parameter that indicates if the subscription request is made in streaming mode (interestAfterRefresh=true ; with updates) or in snapshot mode (interestAfterRefresh=false ; no updates).
The MarketPrice is considered complete as soon as the first Refresh message is received. Whatever comes next, updates, statuses or other refresh messages, the data item keeps this logical completion state, meaning that it can be used by the business logic of the application.
Subscription for a valid MarketPrice item (interestAfterRefresh set to false):
The MarketPrice is considered complete as soon as the first Refresh message is received. As interestAfterRefresh has been set to false, no subsequent message will be received.
Subscription for an invalid MarketPrice item:
If the MarketPrice was not yet complete, it is considered complete as soon as it receives a State message with a closed stream state (CLOSED, CLOSED_RECOVER or CLOSED_REDIRECTED). As the item stream is closed, no subsequent message will be received.
In response to the item subscriptions your application has made, EMA sends messages that contain transient information about the subscribed item. If your application subscribes to a significant number of very active items, EMA may send a large amount of messages. As a performance API, EMA has been optimized for this kind of scenario and, for example, it recycles messages and reuses them to avoid the overhead caused by frequent objects deletion and creation.
Because they are extensively reused by EMA, your application must not preserve any reference to these messages. The same rule applies to the underlying data, like fields and their values. Actually, these objects can only be used in the context of the EMA callbacks implemented by your application (onRefreshMsg, onUpdateMsg…). As soon as your application returns from one of these callback methods, it must release any reference to the received message and data.
If your application needs to use the received data outside of the callback methods or from another thread – which is very likely – it must cache it in its own memory space (or in any other storage facility) and use this preserved data instead. This caching mechanism is not provided by EMA and must be implemented by your application.
So, let’s see what it takes to cache a Market Price data item. Basically it means that your application must take care of four things:
- Duplicating the fields’ values received for the item. This requires to decode the fields received via EMA messages and to preserve their values in the data model your application uses to represent them.
- Building the image of the item and storing it somewhere (e.g. in memory). This requires your application to build its own representation of a MarketPrice image (field list + status + other details like the item name) and store it somewhere.
- Keeping this image updated. This requires your application to keep the internal image updated when it receives messages from EMA.
- Providing access to the cached data. This may require the related application source code to be thread safe if EMA messages are dispatched from another thread than the one your application uses to read the data.
The following diagrams illustrate this mechanism for the 3 different message types (refresh, update and status) EMA can possibly send for the subscribed data items:
On incoming refresh messages
Notes: In case of subsequent refresh messages, the internal cache entry representing the item must be cleared out before it is rebuilt with the new fields. The cached item image does not have to keep the same fields order than the refresh message.
On incoming update messages
On incoming status messages
Dispatching events is a mandatory processing that must be accomplished so that EMA can distribute events and messages to your application. This processing can be done either by one of your application’s threads (main thread or a background thread – USER_DISPATCH operation model) or by one of the EMA library’s threads (API_DISPATCH operation model). In the first case your application must implement a dispatching loop. In the second case the loop is not required but your application will probably have to be thread safe in order to be protected against concurrent calls from the EMA thread and its own thread(s). In both cases EMA events and messages will come to your application asynchronously via callback methods. As explained earlier, the items data (fields values) will then have to be extracted from these messages and optionally preserved for later use.
The complexity that comes with this asynchronism is the price to pay for real time notifications of items updates. A synchronous API, on the other hand, would not allow such real time notifications but it would be so much easier to use. For example, wouldn’t it be great to be able to retrieve market prices like this?
MarketPrice euro = new MarketPrice("EUR=");
double bid = euro.getField("BID").value();
double ask = euro.getField("ASK").value();
Getting MarketPrice data via the returned values of simple method calls is achievable. This however requires an additional layer implemented on top of EMA. This layer would provide objects and mechanisms that re-synchronize EMA asynchronous calls with methods called from your application thread. For example, in the above code snippet we can imagine that the open() method subscribes to the EUR= MarketPrice item and dispatches events (or sleeps, depending on the operation model) until the item is complete. On item completion the open() method returns the control to the application thread. The application then calls the getField(String) method to access the fields values.
This mechanism is illustrated by the following diagram:
Note: It is important to note that the dispatching loop represented above does not only dispatch the events related to this item subscription but also the events of the other subscriptions opened with the same OmmConsumer.
Being able to retrieve MarketPrice data via synchronous method calls definitely simplifies applications programming. It makes it more intuitive and the application flow becomes much easier to understand. However, this simplicity comes with a cost that must be taken into account and that is not appropriate to all use cases.
With synchronous calls, the application must wait for the item image to be received before it can do anything else. This means that if your application needs to subscribe to a large number of items, it will open them synchronously one by one, waiting for the image of the previous item to come before it can open the next. This induces a cumulated delay that becomes more noticeable as you increase the number of items. With the asynchronous model, on the other hand, the delay is not cumulated because all items are opened in parallel. This means that with the synchronous model the total time for opening several items is the cumulated time to get the first image of each item. While with the asynchronous model, the total time for opening the same number of items is the time spent to get the first image of the slowest item. In terms of complexity, the synchronous model has an O(N) complexity while the asynchronous model has an O(1) complexity which is obviously much more effective. The following diagrams illustrate this difference:
Synchronous model complexity O(N):
Asynchronous model complexity O(1):
Ideally, an additional layer implemented on top of EMA should provide, while remaining simple to use, both the synchronous and the asynchronous models so that it fits into a wide range of use cases.
I’m not interested by all this data. What can I do to decrease the number of messages if I just need BID and ASK?
Most of the MarketPrice items available on the Real-Time datafeed are made of several hundred fields. All those fields describe the different aspects of the item. For example, if you subscribe to the quotation of the Apple stock traded on the Nasdaq (AAPL.O) you will receive more than 270 fields. These fields tell you about the Apple stock price (latest offered price, latest ask price, latest trade price…) but also give you more general information like the dividends payment date or the currency used to trade this stock.
Depending on the use case, your application may not be interested in all this information. In that case it has the option to request only a subset of the available fields. Doing that, your application will receive refresh and update messages that only contain the requested fields. This will result in fewer and smaller messages. Your application will then consume less memory and require less processing time.
In the Refinitiv Real Time literature, subscribing to a subset of fields is referred to as “Views”. You can specify the fields you want by adding a view to the request message used to subscribe to the item. You have the option to indicate the fields you want using either their names or their IDs. The following code snippet illustrates how to subscribe to the BID and ASK fields of the EasyJet stock (EZJ.L) using Views and field IDs.
ElementList view = EmaFactory.createElementList();
OmmArray array = EmaFactory.createOmmArray();
array.add(EmaFactory.createOmmArrayEntry().intValue(22)); // BID
array.add(EmaFactory.createOmmArrayEntry().intValue(25)); // ASK
Please refer to the EMA documentation and examples for more about Views.
In the second part of this article I present a simple to use MarketPrice class that illustrates the concepts presented above. The usage of this class is demonstrated in an example application and a command line tool anyone can use to retrieve market prices in a displayable format or a JSON format. The related source code is based on the Java edition of the Refinitiv Message API and is available on GitHub. It should give you good indications for implementing the same kind of features in your own applications.