//
you're reading...
General, Java/Cloud

Reentrant Threadpools

As everyone knows, threadpools allow us to schedule work on a set of threads which execute the work as and when a thread becomes available for processing the work. This is used for asynchronous processing. But what if the work has to do asynchronous operations? Is it wise to tie up the thread and put it into waiting mode till the asynchronous operation is completed?

In this approach, the thread running the Runnable is tied up till the external webservice returns back. (Refer Fig1)

Fig 1
Would not a better approach, be to store the current operating state of the runnable, and, when the response to the operation arrives, the runnable is re-scheduled on the next available thread in the threadpool or simply put Reentrant Threadpools?  (Refer Fig 2).

Fig 2

So, how can we create a reentrant threadpool? A search for such functionality lead me to look at a very neat commons library called javaflow. Details of this can be found at: Commons-javaflow

What does this do? This allows us to suspend and continue operations using something called Continuations. (please do look at the disclaimers below)
So, now using this, let’s see how we can achieve what we are trying to do. say we wrote a ReentrantRunnable as below:
public class ReentrantRunnable implements Runnable
{
    private Runnable _myRun;
    private Continuation _startedWith;
    private Continuation _continueWith;
    public ReentrantRunnable(Runnable run)
    {
        _myRun = run;
        _startedWith = null;
        _continueWith = null;
    }
    public void runhelper()
        throws Exception
    {
        if (_startedWith == null)
        {
            _startedWith = Continuation.startWith(_myRun);
            _continueWith = _startedWith;
        }
        else if (_continueWith != null)
        {
            _continueWith = Continuation.continueWith(_continueWith);
        }
    }
    public void run()
    {
        try
        {
            runhelper();
        }
        catch (Exception e)
        {
            //TODO: log fatal error
            e.printStackTrace();
        }
    }
}
What does this do:
  • If this is the first time this runnable is run, it starts off the run, by calling the run on the Runnable which we are encapsulating here. 
  • Else if this runnable had already run, then continue it from where it was left off.
Now it is very simple, schedule this runnable instead of the original runnable on the threadpools, so..
public class SomeClass
{
    private static ExecutorService service = Executors.newCachedThreadPool();
     ……
    public ReentrantRunnable schedule(Runnable run)
    {
          ReentrantRunnable rrunnable = new ReentrantRunnable(run);
          service.execute(rrunnable);
          return rrunnable;
    }
    public ReentrantRunnable resume(ReentrantRunnable run)
    {
        service.execute(run);
        return run;
    }
}
Now if we have a MyRunnable as below:
public class MyRunnable implements Runnable
{
    public MyRunnable()
        throws Exception
    {
    }
    private void callsomeshared()
        throws Exception
    {
        SomeShared shared = new SomeShared(0);
        while (shared.value() < 10)
        {
            shared.echo();
//Note this suspend, this can be called in U’r async functions
            Continuation.suspend(); 
        }
    }
    
    public void run()
    {
        System.out.println(“started!”);
        try
        {
            callsomeshared();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println(“Finished!”);
    }
}
Calling schedule(new MyRunnable()), and subsequently calling resume() will achieve what we are trying to do. Wait, there is a catch, javaflow requires that the classes that call suspend and continue are loaded with their classloader (you can load it with the already available class loader called ContinuationClassLoader), or you can create your own classloader using AsmClassTransformer, to do this override the findClass as below:
public class MyClassLoader extends URLClassLoader
{
    @Override
    public Class findClass(String className)
        throws ClassNotFoundException
    {
        Class resultClass = null;
        try
        {
            InputStream str = getClassStream(className);
            if (str != null)
            {
                AsmClassTransformer at = new AsmClassTransformer();
                byte[] bytes = at.transform(str);
                resultClass = defineClass(className, bytes, 0, bytes.length);
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
            throw new ClassNotFoundException(e.getMessage());
        }
        return resultClass;
    }
}
Now we have all the pieces, load the MyRunnable and SomeClass using this MyClassloader and run.
Disclaimers
I have tested javaflow and found that it does not save and restore states correctly in a few scenarios:
  • Where there are inner classes involved and suspend is called from an inner class, javaflow does not restore the stack correctly
  •  Where reflections are used, and suspend is called within the method called using reflection, javaflow does not store and restore the stack correctly
  • Where a suspend is called within a function within a try/catch loop state is not stored or restored correctly.
  • If along the stack trace of the suspend call, if there is any de-serialization or serialization classes there are problems
  • None of the constructors are BCI’ed when using the ASM class transformer. Hence if a suspend is called where a constructor is present in the stack trace, it continues and causes problems.

Advertisements

Discussion

No comments yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: