Queues are FIFO data structures that hold elements in the order they are received for processing. There are typically two ways in which queues can be processed. One method is by polling and the second method is by blocking. In polling, the queue is polled frequently for messages till one is got, while in the blocking, a thread blocks on the queue till a message is available. In both the methods one thread is tied waiting or polling the queue.
In SMART platform, we associate queues to objects. Events meant for the object is posted in these queues to be processed. In this scenario, having one thread tied to poll or wait for message in a single queue is not feasible, because the number of objects created can be huge. There are several options that we tried. One option is to have “ONE thread” poll all the object queues and hand off the processing to a thread pool. This we eliminated immediately, since the delay in processing individual messages becomes high. The second option is to have a set of threads polling the queues. Distribute a set of objects to be polled by a thread. Thus if I have 500 objects, I have 5 threads with each thread looking for messages in 100 queues. Here, the delay is reduced, but not completely eliminated. Moreover as the number of objects grows the number of threads that are waiting also grows. What we want is a way in which the queue processing can scale without increasing the resources used, and without introducing a delay in processing of these messages.
JIT Queues (Just in time processing queues) was the answer we came up with for this problem. A JIT queue does not have threads that wait for messages on the queue. Rather, when a message is posted to the queue, the posting thread checks if a thread exists that is processing the messages of this queue. If not, it kicks of a thread to process the messages in the queue. In this situation, a thread to process messages in the queue is only created when required, and does not exist prior to it, hence it is a just in time processing. A few characteristics of JIT queues are:
- As the number of queues increase the thread count does not increase until all queues are continuously busy queues.
- Further optimization can be done by using threads from thread pool to process queue messages. This will help us limit the number of queues that are concurrently being processed and hence help us control resource utilization.
- Delays in message processing are introduced only if there are insufficient threads in the thread pool and this can be tuned appropriately for good performance.
Implementing a JITQ
The implementation of JITQs is very simple (An implementation can be found here). We need a queue object that overrides the “add” method to invoke the processing code as below:
public booleanadd(Object data)
booleanret = _queue.add(data);
In the above code I have modified the add function to also call processJITQueue. So, what does processJITQueue do? If U look at this function in JITQueueService.java (Here), it just calls jitProcess from the JITProcessor class. This function does the following:
JITQueueRunnable qrun = new JITQueueRunnable(jitq);
retrycnt = MAXRETRYCNT; //break the loop
In the above code it checks if the queue is being already processed, if not, it creates a runnable and schedules it on the thread pool. This starts up the processing for the queue.