Notifying ExecutorService (2)

This is the second post about the NotifyingExecutorService. Don’t miss the first half!

The ThreadPoolExecutor in the java.util.concurrency package has several methods for submitting tasks. Each method wraps the incoming Runnable (or Callable) to a FutureTask containing that task and the future result by forwarding the submit()-methods  to the protected method newTaskFor(). The FutureTask itself has also some interesting methods to observe the state of the task (isDone(), iscancelled() for instance). Additionally the done()-method gets called when the task has been processed by the TPE – despite of whether the computation of the task failed or completed. By default, this method does nothing and is therefore intended to be overridden. This is a good entrypoint for the listener-modell described above:

1.    Create a subclass of FutureTask, override done() and add a method to register a listener to the task. Notify listener (if registered) when done() gets called.
2.    Create a subclass of ThreadPoolExecutor and return the subclassed FutureTask whenever newTaskFor() gets called.
3.    Return the subclassed FutureTask on every submit()-method instead of a plain Future to allow registration of listeners to a task.

That’s almost all. Everything is cool, isn’t it? Unfortunately no, because of two major problems that one has to be aware of:
1.    We have to ensure that the  listener gets notified although the task has been already executed. Remember that submitting tasks to the TPE is an asynchronous operation. The caller-thread adds the FutureTask to a queue and returns immediately. Meanwhile pool-threads are working on the queue. There is no guarantee what happens first: adding a listener to the task (by caller thread) or computing the result of the task (by pool-thread). Which leads us directly to the second problem:
2.    The thread that notifies the listeners is not the caller-thread it’s one of the pool-threads. That means that every computation that is made by the listener is done by a pool-thread. This is very bad, because a TPE is normally added to achieve decoupling. We would break the design when long running tasks are implied by the listener’s callback-method. Remember the game-example from above:

•    New Gameobject is created by gamelogic. (Thread A)
•    A “persist task” is added to our database-TPE which persists the object into the database (and assigns a new id to the object).  (Pool Thread B)
•    We register a callback-listener to the FutureTask returned by TPE. When notified, the listener sends the new Gameobject to the connected clients. (Thread A)
•    Database-TPE executes task and notifies listener. The messages are send to clients. (Pool Thread B)

The last operation should not be done by a pool-thread, because the pool should only execute database-tasks. The solution of the problem is to define a TPE that executes the callback-tasks when the FutureTask is done, simply by adding a TPE-parameter to the addListener()-method. Sadly this solution has a tiny disadvantage: passing a task to a TPE comes with some performance tradeoffs. When the listener-callbacks aren’t expensive you might consider executing the callback in the pool-thread. Google’s guava-library helps us in this case: use their MoreExecutors.sameThreadExecutor() which executes all submitted tasks in the caller thread.

Finallay, here are the classes:

public interface INotifyingExecutorService extends ExecutorService{

    @Override
    public <T> INotifyingFuture<T> submit(Callable<T> task);
    
    @Override
    public INotifyingFuture<Void> submit(Runnable task);
    
    @Override
    public <T> INotifyingFuture<T> submit(Runnable task, T result);
    
}

As you can see, this interface just extends the ExecutorService to enforce that an INotifyingFuture has to be returned by any INotifyingExecutorService.

public class NotifyingExecutorService extends ForwardingExecutorService implements INotifyingExecutorService{

    private final ExecutorService delegate;
    
    public NotifyingExecutorService(ExecutorService delegate) {
		super();
		this.delegate = delegate;
	}

	@Override
    public <T> INotifyingFuture<T> submit(Callable<T> task) {
    	if (task == null) throw new NullPointerException();
    	INotifyingFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
	@Override
    public INotifyingFuture<Void> submit(Runnable task) {
    	if (task == null) throw new NullPointerException();
        INotifyingFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
	@Override
    public <T> INotifyingFuture<T> submit(Runnable task, T result) {
    	if (task == null) throw new NullPointerException();
    	INotifyingFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    
	@Override
	protected ExecutorService delegate() {
		return this.delegate;
	}
    
	protected <T> INotifyingFuture<T> newTaskFor(Callable<T> callable) {
		return new NotifyingFuture<T>(callable);
	}
	
	protected <T extends Object> INotifyingFuture<T> newTaskFor(Runnable runnable, T value) {
		return new NotifyingFuture<T>(runnable, value);
	}
}

This class implements the new INotifyingExecutorService and extends guavas ForwardingExecutorService. This special ExecutorService implementation delegates all calls to any ExecutorService-instance that is passed into the constructor.

public interface INotifyingFuture<V> extends RunnableFuture<V>{

    /**
     * Sets this listener to a {@link INotifyingFuture}. When the future is done
     * or canceled the listener gets notified.<br> 
     * @param listener
     * @param the executor that executes the shiet.
     */
    public void setListener(IFutureListener<V> listener, ExecutorService executor);
    
    /**
     * Sets this listener to a {@link INotifyingFuture}. When the future is done
     * or canceled the listener gets notified.<br>
     * <b>Attention</b>: Be aware of the fact that everything that is done in that
     * listener is executed in same thread as the original task that this listener listens
     * to. Only use this method when you are sure that no long running task is performed
     * by the listener. When you want the listener's tasks to be performed asynchronous
     * use {@link #setListener(IFutureListener, ExecutorService)} instead.
     * @param listener
     */
    public void setListener(IFutureListener<V> listener);
}
public interface IFutureListener<V> {
	
	/**
	 * The task was computed successfully.
	 * @param result
	 */
	public void onSuccess(V result);
	
	/**
	 * called when future state is canceled.
	 */
	public void onCancel(RunnableFuture<V> cancelledFuture);
	
	/**
	 * Called when there was an error while executing 
	 * this future.
	 * @param e
	 * @param future the future that fails
	 */
	public void onError(Throwable e, Future<V> future);
}
public class NotifyingFuture<V> extends FutureTask<V> implements INotifyingFuture<V>{

    private static final ExecutorService DEFAULT_EXECUTOR = MoreExecutors.sameThreadExecutor();
    
	private IFutureListener<V> listener = null;
	private ExecutorService executor = null;
	private final AtomicBoolean executed = new AtomicBoolean();
	
	public NotifyingFuture(Callable<V> callable) {
		super(callable);
		setExecutor(DEFAULT_EXECUTOR);
	}

	public NotifyingFuture(Runnable runnable, V result) {
        super(runnable,result);
        setExecutor(DEFAULT_EXECUTOR);
    }

	
	@Override
	protected void done() {
	    if(listener==null){
	        return;
	    }
	    notifyListenerOnce();
	}

	/**
	 * Atomically executes the task only one time.
	 */
	protected void notifyListenerOnce(){
		if(!this.executed.getAndSet(true)){
			notifyListener();
		}
	}
	
	protected void notifyListener(){
		this.executor.submit(new TaskCompletionRunner<V>(delegateFuture(),this.listener));
	}
	
	/**
	 * @return the future that was processed.
	 */
	protected RunnableFuture<V> delegateFuture(){
		return this;
	}
	
	@Override
	public void setListener(IFutureListener<V> listener, ExecutorService executor) {
		setExecutor(executor);
		setListener(listener);
	}
	
	public void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    @Override
    public void setListener(IFutureListener<V> listener) {
        this.listener = listener;
        /*
         * Probably the task was already executed. If so, call done() manually. 
         */
        runWhenDone();
    }
    
    protected void runWhenDone(){
    	if(isDone()){
        	notifyListenerOnce();
        }
    }
    
    private static class TaskCompletionRunner<V> implements Runnable{

        private final IFutureListener<V> listener;
        private final RunnableFuture<V> future;
        
        public TaskCompletionRunner(RunnableFuture<V> future, IFutureListener<V> listener) {
            this.future = future;
            this.listener = listener;
        }
        
        @Override
        public void run() {
            if(this.future.isCancelled()){
                this.listener.onCancel(this.future);
            }else{
                try {
                    this.listener.onSuccess(this.future.get());
                } catch (InterruptedException e) {
                    this.listener.onError(e, this.future);
                } catch (ExecutionException e) {
                    this.listener.onError(e, this.future);
                }
            }
        }
        
    }
}

Happy coding!

Advertisements

4 thoughts on “Notifying ExecutorService (2)”

  1. Hello there! This post could not be written much better!
    Going through this post reminds me of my previous roommate!
    He continually kept talking about this. I most certainly will send this post to him.
    Fairly certain he will have a very good read. Thanks for
    sharing!

  2. Hmm it appears like your blog ate my first comment (it was extremely long) so I guess I’ll just sum it up what I wrote and say, I’m
    thoroughly enjoying your blog. I as well am an aspiring
    blog blogger but I’m still new to the whole thing. Do you have any tips and hints for first-time blog writers? I’d genuinely appreciate it.

  3. I am really enjoying the theme/design of your web site.
    Do you ever run into any web browser compatibility issues?
    A couple of my blog readers have complained about my site not working
    correctly in Explorer but looks great in Firefox.
    Do you have any ideas to help fix this problem?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s