How to process a stream of events sent to a JMS message broker
You can manage a stream of events using message brokers such as Kafka, ActiveMQ, or RabbitMQ. Each broker has its own advantages and disadvantages. The relative strengths of each broker are not the subject of this article.
In this hypothetical use case, we will simulate some physical devices that generate a data stream of events. The events will be sent to a JMS broker, in this case, ActiveMQ. Each event contains data containing the device ID and the status of whether the device is in a healthy state or not.
A message queue receives the event data stream whilst a Martini service subscribes to the queue, processes the events, and triggers an action depending on the data in the event.
Download this demo to Martini and learn how to process events from IoT devices and trigger actions based on the data received
If you haven't already done so, get yourself a Lonti account, and then download Martini.
Within either Martini Desktop or Martini Online select the "Event Processing" demo from the Welcome Screen and click Install:
The Packagedemo010-processing-events-iot will be automatically downloaded. Right-click the Package and click Start. A green icon will indicate it has started:
Documentation containing a Test Procedure to run the demo is included in the Package readme file \resources\readme\readme.md. The readme file is automatically opened when the Package is started.
The Test Procedure will guide you through the process to send events from 3 mock devices to a message queue. Each device will send a status code of whether the device is available or in error. The Martini service subscribing to the event queue will listen for changes in the status of a device and log any changes to a database.
Line by Line: How this event processing using JMS demo works
Sending Messages to the JMS Endpoint
Open the service SimulateDeviceEvents under \\code\demo010\services\ folder
On line 3-5: A Jms.publishString function is called to send messages to our JMS Endpoint
The function consists of 3 parameters
1st parameter: The name of the destination where to send the object to
2nd parameter: The message to send
3rd paremeter: The properties to send
Processing Messages
Open the service ProcessEvents under \\code\demo010\services\ folder
On line 3: Iterate step is called to process each property received from the service
On line 4-5: Fork step is added to check the property name and the case should device
On line 6: An Cache.cacheGet function is called to check if there's an existing value in memory
On line 7: Fork step is added to check if the an value is existing in memory
Case: value not existing
On line 9-10: Insert record to database and save the value in memory
On line 11: Logs the result of the current value
Case: value is existing:
On line 13: Fork step is added to compare the current value to the memory value
Case: current value and memory value are the same
On line 15: Logs the result that the current value is still the same
Case: current value and memory value are not the same
On line 17-18: Insert the record to the database and update the memory value
On line 19: Logs the result of the current value