вторник, 12 июня 2012 г.

Создание своего ThreadPool'а на C# с фиксированным количеством потоков

Недавно, читая Хабр, я наткнулся на пост. В котором давался пример создания FixedThreadPool, с данными требованиями. Посмотрев код, я увидел, что при запуске каждой задачи создается новый поток (в данный момент автор поста поправил код). С примерными принципами работы пула потоков я знаком. Получается, что это никакой не пул, а просто некий планировщик задач по приоритетам.

Ну и тут мне захотелось самому попытаться создать свой способ решения поставленной задачи. Я не строго следовал требованиям, описанным в посте. Но и от основной задачи не отклонялся. С многопоточностью я знаком на базовом уровне так, что, возможно, я создал не самый оптимальный вариант. В итоге за несколько часов коддинга класс был готов.

Интерфейс класса FixedThreadPool я немного изменил. Теперь метод Execute принимает только один параметр Task, а приоритет был перенесен в класс Task. Такой вариант показался мне удобнее.

Source: github.com

TaskPriority:

    // Приоритет выполнения задачи.
    public enum TaskPriority
    {

        // Низкий приоритет.
        Low,
        // Средний приоритет.
        Normal,
        // Высокий приоритет.
        High

    }


Task:

    // Класс представляет задачу для выполнения в FixedThreadPool
    public class Task
    {

        private TaskPriority priority;
        private Action work;
        private bool isRunned;

        // Создает задачу с указанным приоритетом.
        public Task(Action work) : this(work, TaskPriority.Normal) { }

        // Создает задачу с указанным приоритетом.
        public Task(Action work, TaskPriority priority)
        {
            this.priority = priority;
            this.work = work;
        }

        // Запускает задачу.
        public void Execute()
        {
            lock (this)
            {
                isRunned = true;
            }
            work();
        }

        // Приоритет задачи. Устанавливается только при создании задачи.
        public TaskPriority Priority
        {
            get
            {
                return priority;
            }
        }

        // Запущена ли задача. (True - запущена, False - стоит в очереди на выполнение)
        public bool IsRunned
        {
            get
            {
                return isRunned;
            }
        }

    }

FixedThreadPool:

    // Пул потоков. В нем задача запускаются по приоритетам. Если количество потоков больше или равно 4, то на каждые 3 задачи с высоким приоритетом - будет запущена задача с нормальным приоритетом. Если количество потоков меньше 4, тогда задачи выполняются прямо следуя приоритетам. Задачи с низким приоритетом выполняются в последнюю очередь.
    public class FixedThreadPool : IDisposable
    {

        private int numberThreads;

        private ManualResetEvent stopEvent;
        private bool isStoping;
        private object stopLock;

        private Dictionary<int, ManualResetEvent> threadsEvent;
        private Thread[] threads;
        private List<task> tasks;

        private ManualResetEvent scheduleEvent;
        private Thread scheduleThread;

        private bool isDisposed;

        // Создает пул потоков с количеством потоков равным количеству ядер процессора.
        public FixedThreadPool() : this(Environment.ProcessorCount) { }

        // Создает пул потоков с указанным количеством потоков.
        public FixedThreadPool(int numberThreads)
        {
            if (numberThreads <= 0)
                throw new ArgumentException("numberThreads", "Количество потоков должно быть больше нуля.");

            this.numberThreads = numberThreads;

            this.stopLock = new object();
            this.stopEvent = new ManualResetEvent(false);

            this.scheduleEvent = new ManualResetEvent(false);
            this.scheduleThread = new Thread(SelectAndStartFreeThread) { Name = "Schedule Thread", IsBackground = true };
            scheduleThread.Start();

            this.threads = new Thread[numberThreads];
            this.threadsEvent = new Dictionary<int, ManualResetEvent>(numberThreads);

            for (int i = 0; i < numberThreads; i++)
            {
                threads[i] = new Thread(ThreadWork) { Name = "Pool Thread", IsBackground = true };
                threadsEvent.Add(threads[i].ManagedThreadId, new ManualResetEvent(false));

                threads[i].Start();
            }

            this.tasks = new List<task>();
        }

        // Прерывает выполнение всех потоков, не дожидаясь их завершения и уничтожает за собой все ресурсы.
        ~FixedThreadPool()
        {
            Dispose(false);
        }

        // Высвобождает ресурсы, которые используются пулом потоков.
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        // Высвобождает ресурсы, которые используются пулом потоков.
        protected virtual void Dispose(bool disposing)
        {
            if (!isDisposed)
            {
                if (disposing)
                {
                    scheduleThread.Abort();
                    scheduleEvent.Dispose();

                    for (int i = 0; i < numberThreads; i++)
                    {
                        threads[i].Abort();
                        threadsEvent[threads[i].ManagedThreadId].Dispose();
                    }
                }

                isDisposed = true;
            }
        }

        private Task SelectTask()
        {
            lock (tasks)
            {
                if (tasks.Count == 0)
                    throw new ArgumentException();

                var waitingTasks = tasks.Where(t => !t.IsRunned);
                var highTasks = waitingTasks.Where(t => t.Priority == TaskPriority.High);
                var normalTasks = waitingTasks.Where(t => t.Priority == TaskPriority.Normal);

                if (highTasks.Count() > 0)
                {
                    if (numberThreads >= 4)
                    {
                        var runnedHighTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.High);
                        var runnedNormalTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.Normal);

                        if (runnedHighTasks.Count() / (runnedNormalTasks.Count() + 1) < 3)
                        {
                            return highTasks.First();
                        }
                        else
                        {
                            return normalTasks.First();
                        }
                    }
                    else
                    {
                        return highTasks.First();
                    }
                }
                else
                {
                    if (normalTasks.Count() > 0)
                    {
                        return normalTasks.First();
                    }
                    else
                    {
                        var lowTasks = tasks.Where(t => t.Priority == TaskPriority.Low).ToArray();
                        return lowTasks.FirstOrDefault();
                    }
                }
            }
        }

        private void ThreadWork()
        {
            while (true)
            {
                threadsEvent[Thread.CurrentThread.ManagedThreadId].WaitOne();

                Task task = SelectTask();
                if (task != null)
                {
                    try
                    {
                        task.Execute();
                    }
                    finally
                    {
                        RemoveTask(task);
                        if (isStoping)
                            stopEvent.Set();
                        threadsEvent[Thread.CurrentThread.ManagedThreadId].Reset();
                    }
                }
            }
        }

        private void SelectAndStartFreeThread()
        {
            while (true)
            {
                scheduleEvent.WaitOne();
                lock (threads)
                {
                    foreach (var thread in threads)
                    {
                        if (threadsEvent[thread.ManagedThreadId].WaitOne(0) == false)
                        {
                            threadsEvent[thread.ManagedThreadId].Set();
                            break;
                        }
                    }
                }

                scheduleEvent.Reset();
            }
        }

        private void AddTask(Task task)
        {
            lock (tasks)
            {
                tasks.Add(task);
            }

            scheduleEvent.Set();
        }

        private void RemoveTask(Task task)
        {
            lock (tasks)
            {
                tasks.Remove(task);
            }

            if (tasks.Count > 0 && tasks.Where(t => !t.IsRunned).Count() > 0)
            {
                scheduleEvent.Set();
            }
        }

        // Ставит задачу в очередь.
        public bool Execute(Task task)
        {
            if (task == null)
                throw new ArgumentNullException("task", "The Task can't be null.");

            lock (stopLock)
            {
                if (isStoping)
                {
                    return false;
                }

                AddTask(task);
                return true;
            }
        }

        // Ставить несколько задачь в очередь.
        public bool ExecuteRange(IEnumerable<task> tasks)
        {
            bool result = true;
            foreach (var task in tasks)
            {
                if (!Execute(task))
                    result = false;
            }

            return result;
        }

        // Останавливает работу пула потоков. Ожидает завершения всех задач (запущенных и стоящих в очереди) и уничтожает все ресурсы.
        public void Stop()
        {
            lock (stopLock)
            {
                isStoping = true;
            }

            while (tasks.Count > 0)
            {
                stopEvent.WaitOne();
                stopEvent.Reset();
            }

            Dispose(true);
        }

    }

1 комментарий: