Недавно, читая Хабр, я наткнулся на пост. В котором давался пример создания FixedThreadPool, с данными требованиями. Посмотрев код, я увидел, что при запуске каждой задачи создается новый поток (в данный момент автор поста поправил код).
С примерными принципами работы пула потоков я знаком.
Получается, что это никакой не пул, а просто некий планировщик задач по приоритетам.
Ну и тут мне захотелось самому попытаться создать свой способ решения поставленной задачи. Я не строго следовал требованиям, описанным в посте. Но и от основной задачи не отклонялся. С многопоточностью я знаком на базовом уровне так, что, возможно, я создал не самый оптимальный вариант. В итоге за несколько часов коддинга класс был готов.
Интерфейс класса FixedThreadPool я немного изменил. Теперь метод Execute принимает только один параметр Task, а приоритет был перенесен в класс Task. Такой вариант показался мне удобнее.
Source: github.com
TaskPriority:
Task:
FixedThreadPool:
Ну и тут мне захотелось самому попытаться создать свой способ решения поставленной задачи. Я не строго следовал требованиям, описанным в посте. Но и от основной задачи не отклонялся. С многопоточностью я знаком на базовом уровне так, что, возможно, я создал не самый оптимальный вариант. В итоге за несколько часов коддинга класс был готов.
Интерфейс класса FixedThreadPool я немного изменил. Теперь метод Execute принимает только один параметр Task, а приоритет был перенесен в класс Task. Такой вариант показался мне удобнее.
Source: github.com
TaskPriority:
// Приоритет выполнения задачи. public enum TaskPriority { // Низкий приоритет. Low, // Средний приоритет. Normal, // Высокий приоритет. High }
Task:
// Класс представляет задачу для выполнения в FixedThreadPool public class Task { private TaskPriority priority; private Action work; private bool isRunned; // Создает задачу с указанным приоритетом. public Task(Action work) : this(work, TaskPriority.Normal) { } // Создает задачу с указанным приоритетом. public Task(Action work, TaskPriority priority) { this.priority = priority; this.work = work; } // Запускает задачу. public void Execute() { lock (this) { isRunned = true; } work(); } // Приоритет задачи. Устанавливается только при создании задачи. public TaskPriority Priority { get { return priority; } } // Запущена ли задача. (True - запущена, False - стоит в очереди на выполнение) public bool IsRunned { get { return isRunned; } } }
FixedThreadPool:
// Пул потоков. В нем задача запускаются по приоритетам. Если количество потоков больше или равно 4, то на каждые 3 задачи с высоким приоритетом - будет запущена задача с нормальным приоритетом. Если количество потоков меньше 4, тогда задачи выполняются прямо следуя приоритетам. Задачи с низким приоритетом выполняются в последнюю очередь. public class FixedThreadPool : IDisposable { private int numberThreads; private ManualResetEvent stopEvent; private bool isStoping; private object stopLock; private Dictionary<int, ManualResetEvent> threadsEvent; private Thread[] threads; private List<task> tasks; private ManualResetEvent scheduleEvent; private Thread scheduleThread; private bool isDisposed; // Создает пул потоков с количеством потоков равным количеству ядер процессора. public FixedThreadPool() : this(Environment.ProcessorCount) { } // Создает пул потоков с указанным количеством потоков. public FixedThreadPool(int numberThreads) { if (numberThreads <= 0) throw new ArgumentException("numberThreads", "Количество потоков должно быть больше нуля."); this.numberThreads = numberThreads; this.stopLock = new object(); this.stopEvent = new ManualResetEvent(false); this.scheduleEvent = new ManualResetEvent(false); this.scheduleThread = new Thread(SelectAndStartFreeThread) { Name = "Schedule Thread", IsBackground = true }; scheduleThread.Start(); this.threads = new Thread[numberThreads]; this.threadsEvent = new Dictionary<int, ManualResetEvent>(numberThreads); for (int i = 0; i < numberThreads; i++) { threads[i] = new Thread(ThreadWork) { Name = "Pool Thread", IsBackground = true }; threadsEvent.Add(threads[i].ManagedThreadId, new ManualResetEvent(false)); threads[i].Start(); } this.tasks = new List<task>(); } // Прерывает выполнение всех потоков, не дожидаясь их завершения и уничтожает за собой все ресурсы. ~FixedThreadPool() { Dispose(false); } // Высвобождает ресурсы, которые используются пулом потоков. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } // Высвобождает ресурсы, которые используются пулом потоков. protected virtual void Dispose(bool disposing) { if (!isDisposed) { if (disposing) { scheduleThread.Abort(); scheduleEvent.Dispose(); for (int i = 0; i < numberThreads; i++) { threads[i].Abort(); threadsEvent[threads[i].ManagedThreadId].Dispose(); } } isDisposed = true; } } private Task SelectTask() { lock (tasks) { if (tasks.Count == 0) throw new ArgumentException(); var waitingTasks = tasks.Where(t => !t.IsRunned); var highTasks = waitingTasks.Where(t => t.Priority == TaskPriority.High); var normalTasks = waitingTasks.Where(t => t.Priority == TaskPriority.Normal); if (highTasks.Count() > 0) { if (numberThreads >= 4) { var runnedHighTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.High); var runnedNormalTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.Normal); if (runnedHighTasks.Count() / (runnedNormalTasks.Count() + 1) < 3) { return highTasks.First(); } else { return normalTasks.First(); } } else { return highTasks.First(); } } else { if (normalTasks.Count() > 0) { return normalTasks.First(); } else { var lowTasks = tasks.Where(t => t.Priority == TaskPriority.Low).ToArray(); return lowTasks.FirstOrDefault(); } } } } private void ThreadWork() { while (true) { threadsEvent[Thread.CurrentThread.ManagedThreadId].WaitOne(); Task task = SelectTask(); if (task != null) { try { task.Execute(); } finally { RemoveTask(task); if (isStoping) stopEvent.Set(); threadsEvent[Thread.CurrentThread.ManagedThreadId].Reset(); } } } } private void SelectAndStartFreeThread() { while (true) { scheduleEvent.WaitOne(); lock (threads) { foreach (var thread in threads) { if (threadsEvent[thread.ManagedThreadId].WaitOne(0) == false) { threadsEvent[thread.ManagedThreadId].Set(); break; } } } scheduleEvent.Reset(); } } private void AddTask(Task task) { lock (tasks) { tasks.Add(task); } scheduleEvent.Set(); } private void RemoveTask(Task task) { lock (tasks) { tasks.Remove(task); } if (tasks.Count > 0 && tasks.Where(t => !t.IsRunned).Count() > 0) { scheduleEvent.Set(); } } // Ставит задачу в очередь. public bool Execute(Task task) { if (task == null) throw new ArgumentNullException("task", "The Task can't be null."); lock (stopLock) { if (isStoping) { return false; } AddTask(task); return true; } } // Ставить несколько задачь в очередь. public bool ExecuteRange(IEnumerable<task> tasks) { bool result = true; foreach (var task in tasks) { if (!Execute(task)) result = false; } return result; } // Останавливает работу пула потоков. Ожидает завершения всех задач (запущенных и стоящих в очереди) и уничтожает все ресурсы. public void Stop() { lock (stopLock) { isStoping = true; } while (tasks.Count > 0) { stopEvent.WaitOne(); stopEvent.Reset(); } Dispose(true); } }
Можно проще https://github.com/AlexanderSysoev/FixedThreadPool
ОтветитьУдалить