Home > Computers and Internet > An Asynchronous Resource Pool

An Asynchronous Resource Pool

I was thinking over this last week about creating an efficient Asynchronously driven generic resource pool. My first solution was a bit more complicated than this one. Originally I tied the IASyncResult from the creation method to the IASyncResult that I returned to the caller. This got too complicated. After a good nights rest I was able to see what I was really trying to do and here’s the result.

 

using System;
using System.Collections.Generic;
using System.Threading;

namespace ResourcePool
{
    /// <summary>
    /// Class for highly efficient pool. 
    /// </summary>
    /// <typeparam name="T">A type that is expensive in use or in creation.</typeparam>
    /// <author>jader3rd</author>
    public class ResourcePool<T> : IDisposable where T : class
    {
        /// <summary>
        /// If grabbing both results and pool, grab results before grabbing the pool lock.
        /// </summary>
        private ReaderWriterLockSlim poolLock, resultsLock;
        private Queue<T> free;
        private Func<AsyncCallback, Object, IAsyncResult> beginCreateResource;
        private Func<IAsyncResult, T> endCreateResource;
        private Queue<PoolResult> outstandingResults;
        private Timer drainTimer;
        private bool disposing;
        private TimeSpan drainFreq;

        public TimeSpan DrainFrequency {
            get
            {
                return drainFreq;
            }
            set
            {
                drainFreq = value;
                if (null != drainTimer)
                {
                    drainTimer.Change(value, value);
                }
            }
        }

        /// <summary>
        /// Creates a pool with no objects and a DrainFrequency of thirty seconds.
        /// </summary>
        /// <param name="creationAction">The action to call to create an item. Place any needed item creation error checking in the action.</param>
        public ResourcePool(Func<T> creationAction) : this (new Func<AsyncCallback, Object, IAsyncResult>(creationAction.BeginInvoke), new Func<IAsyncResult, T>(creationAction.EndInvoke))
        {
        }

        /// <summary>
        /// Creates a pool with no objects and a DrainFrequency of thirty seconds.
        /// </summary>
        /// <param name="beginCreateFunc">The APM begin method for creating the resource.</param>
        /// <param name="endCreateFunc">The APM end method for creating the resource.</param>
        public ResourcePool(Func<AsyncCallback, Object, IAsyncResult> beginCreateFunc, Func<IAsyncResult, T> endCreateFunc)
        {
            beginCreateResource = beginCreateFunc;
            endCreateResource = endCreateFunc;
            poolLock = new ReaderWriterLockSlim();
            free = new Queue<T>();
            DrainFrequency = TimeSpan.FromSeconds(30);
            outstandingResults = new Queue<PoolResult>();
            resultsLock = new ReaderWriterLockSlim();
            drainTimer = new Timer(checkFreePool, null, DrainFrequency, DrainFrequency);
            disposing = false;
        }

        /// <summary>
        /// If there is a free object. Grab it. Does not add to the pool.
        /// </summary>
        /// <param name="item">The item</param>
        /// <returns>True if an object is grabbed.</returns>
        public bool TryGet(out T item)
        {
            try
            {
                poolLock.EnterWriteLock();
                if (disposing)
                {
                    throw new ObjectDisposedException(GetType().Name);
                }
                if (0 < free.Count)
                {
                    item = free.Dequeue();
                    return true;
                }
            }
            finally
            {
                if (poolLock.IsWriteLockHeld) poolLock.ExitWriteLock();
            }
            item = null;
            return false;
        }

        /// <summary>
        /// Get an object from the pool.
        /// </summary>
        /// <param name="callback">The method to be called when an object is free from the pool. If there is an object free right away the callback gets called synchronously.</param>
        /// <param name="state">The object to get passed to the callback method in the aync state.</param>
        /// <returns>An IASyncResult that references the get.</returns>
        public IAsyncResult BeginGet(AsyncCallback callback, Object state)
        {
            T item;
            if (TryGet(out item))
            {
                PoolResult result = new ResourcePool<T>.PoolResult(callback, state, item);
                if (null != callback)
                {
                    callback.Invoke(result);
                }
                return result;
            }
            else
            {
                PoolResult result = new ResourcePool<T>.PoolResult(
                    callback,
                    state,
                    null);
                try
                {
                    resultsLock.EnterWriteLock();
                    outstandingResults.Enqueue(result);
                }
                finally
                {
                    if (resultsLock.IsWriteLockHeld) resultsLock.ExitWriteLock();
                }
                beginCreateResource.Invoke(endCreate, null);
                return result;
            }
        }

        /// <summary>
        /// Actualy get an object from the pool.
        /// </summary>
        /// <param name="result">The result object tied to this call.</param>
        /// <returns>An item from the pool.</returns>
        public T EndGet(IAsyncResult result)
        {
            PoolResult res = (PoolResult)result;
            T returnee = null;
            if (null == res.Picked)
            {
                res.AsyncWaitHandle.WaitOne();
                if (disposing)
                {
                    throw new ObjectDisposedException(GetType().Name);
                }
            }
            returnee = res.Picked;

            return returnee;
        }

        /// <summary>
        /// Put an item no longer in use back in the pool.
        /// </summary>
        /// <param name="item">The item to place in the pool.</param>
        public void Put(T item)
        {
            try
            {
                poolLock.EnterWriteLock();
                if (disposing)
                {
                    throw new ObjectDisposedException(GetType().Name);
                }
                free.Enqueue(item);
            }
            finally
            {
                if (poolLock.IsWriteLockHeld) poolLock.ExitWriteLock();
            }
            ThreadPool.QueueUserWorkItem(checkOutstandingResults);
        }

        /// <summary>
        /// The callback for the create method. Put the item in the free pool.
        /// </summary>
        /// <param name="result">The result to pass to EndInvoke.</param>
        private void endCreate(IAsyncResult result)
        {
            T item = endCreateResource.Invoke(result);
            if (disposing) return;
            Put(item);
        }

        /// <summary>
        /// Dispose of resources, drain the pool, and if the pool items are IDisposable, dispose them.
        /// </summary>
        public void Dispose()
        {
            disposing = true;
            try
            {
                resultsLock.EnterWriteLock();
                while (0 < outstandingResults.Count)
                {
                    PoolResult result = outstandingResults.Dequeue();
                    ((EventWaitHandle)result.AsyncWaitHandle).Set();
                }
            }
            finally
            {
                if (resultsLock.IsWriteLockHeld) resultsLock.EnterWriteLock();
            }
            try
            {
                poolLock.EnterWriteLock();
                while (0 < free.Count)
                {
                    T item = free.Dequeue();
                    if (item is IDisposable)
                    {
                        ((IDisposable)item).Dispose();
                    }
                }
            }
            finally
            {
                if (poolLock.IsWriteLockHeld) poolLock.ExitWriteLock();
            }
            poolLock.Dispose();
            resultsLock.Dispose();
            drainTimer.Dispose();
        }

        /// <summary>
        /// Called by the pool draining timer to empty the pool
        /// </summary>
        /// <param name="state"></param>
        private void checkFreePool(Object state)
        {
            T item = null;
            if (disposing) return;
            try
            {
                resultsLock.EnterReadLock();
                // only drain if there are no outstanding results
                if (0 == outstandingResults.Count)
                {
                    try
                    {
                        poolLock.EnterWriteLock();
                        if (disposing) return;
                        if (1 < free.Count)
                        {
                            item = free.Dequeue();
                        }
                    }
                    finally
                    {
                        if (poolLock.IsWriteLockHeld) poolLock.ExitWriteLock();
                    }
                }
            }
            finally
            {
                if (resultsLock.IsReadLockHeld) resultsLock.ExitReadLock();
            }
            if (disposing) return;
            if (item != null && item is IDisposable)
            {
                ((IDisposable)item).Dispose();
            }
        }

        /// <summary>
        /// Analyze the outstanding results queue and if there's a free item in the pool serve up the result.
        /// </summary>
        /// <param name="state">Not used, just there to satisfy the callback</param>
        private void checkOutstandingResults(object state)
        {
            PoolResult result = null;
            T item = null;
            try
            {
                resultsLock.EnterWriteLock();
                if (0 < outstandingResults.Count)
                {
                    try
                    {
                        poolLock.EnterWriteLock();
                        if (0 < free.Count)
                        {
                            result = outstandingResults.Dequeue();
                            item = free.Dequeue();
                        }
                    }
                    finally
                    {
                        if (poolLock.IsWriteLockHeld) poolLock.ExitWriteLock();
                    }
                }
            }
            finally
            {
                if (resultsLock.IsWriteLockHeld) resultsLock.ExitWriteLock();
            }

            if (null != result)
            {
                result.Picked = item;
                if (null != result.Callback)
                {
                    result.Callback.Invoke(result);
                }
            }
        }

        /// <summary>
        /// The IASyncResult that controls the pool.
        /// </summary>
        private class PoolResult : IAsyncResult
        {
            private AsyncCallback callerCallback;
            private Object callerState;
            private T picked;
            private WaitHandle myHandle;
            private bool compSync, isComp;

            public PoolResult(AsyncCallback callback, Object state, T chosen)
            {
                callerCallback = callback;
                callerState = state;
                picked = chosen;
                if (null == picked)
                {
                    isComp = false;
                    compSync = false;
                }
                else
                {
                    isComp = true;
                    compSync = true;
                }
            }

            public T Picked
            {
                get { return picked; }
                internal set
                {
                    picked = value;
                    isComp = true;
                    compSync = false;
                    if (null == callerCallback)
                    {
                        ((ManualResetEvent)AsyncWaitHandle).Set();
                    }

                }
            }

            public object AsyncState
            {
                get { return callerState; }
            }

            public WaitHandle AsyncWaitHandle
            {
                get 
                {
                    if (null == myHandle)
                    {
                        myHandle = new ManualResetEvent(false);
                    }
                    return myHandle;
                }
            }

            public bool CompletedSynchronously
            {
                get
                {
                    return compSync;
                }
            }

            public bool IsCompleted
            {
                get
                {
                    return isComp;
                }
            }

            internal AsyncCallback Callback
            {
                get
                {
                    return callerCallback;
                }
            }
        }
    }
}
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: