Недавно, читая Хабр, я наткнулся на
пост. В котором давался пример создания FixedThreadPool, с данными требованиями. Посмотрев код, я увидел, что при запуске каждой задачи создается новый поток (в данный момент автор поста поправил код).
С примерными принципами работы пула потоков я знаком.
Получается, что это никакой не пул, а просто некий планировщик задач по приоритетам.
Ну и тут мне захотелось самому попытаться создать свой способ решения поставленной задачи. Я не строго следовал требованиям, описанным в посте. Но и от основной задачи не отклонялся. С многопоточностью я знаком на базовом уровне так, что, возможно, я создал не самый оптимальный вариант. В итоге за несколько часов коддинга класс был готов.
Интерфейс класса FixedThreadPool я немного изменил. Теперь метод
Execute принимает только один параметр Task, а приоритет был перенесен в класс Task. Такой вариант показался мне удобнее.
Source:
github.com
TaskPriority:
// Приоритет выполнения задачи.
public enum TaskPriority
{
// Низкий приоритет.
Low,
// Средний приоритет.
Normal,
// Высокий приоритет.
High
}
Task:
// Класс представляет задачу для выполнения в FixedThreadPool
public class Task
{
private TaskPriority priority;
private Action work;
private bool isRunned;
// Создает задачу с указанным приоритетом.
public Task(Action work) : this(work, TaskPriority.Normal) { }
// Создает задачу с указанным приоритетом.
public Task(Action work, TaskPriority priority)
{
this.priority = priority;
this.work = work;
}
// Запускает задачу.
public void Execute()
{
lock (this)
{
isRunned = true;
}
work();
}
// Приоритет задачи. Устанавливается только при создании задачи.
public TaskPriority Priority
{
get
{
return priority;
}
}
// Запущена ли задача. (True - запущена, False - стоит в очереди на выполнение)
public bool IsRunned
{
get
{
return isRunned;
}
}
}
FixedThreadPool:
// Пул потоков. В нем задача запускаются по приоритетам. Если количество потоков больше или равно 4, то на каждые 3 задачи с высоким приоритетом - будет запущена задача с нормальным приоритетом. Если количество потоков меньше 4, тогда задачи выполняются прямо следуя приоритетам. Задачи с низким приоритетом выполняются в последнюю очередь.
public class FixedThreadPool : IDisposable
{
private int numberThreads;
private ManualResetEvent stopEvent;
private bool isStoping;
private object stopLock;
private Dictionary<int, ManualResetEvent> threadsEvent;
private Thread[] threads;
private List<task> tasks;
private ManualResetEvent scheduleEvent;
private Thread scheduleThread;
private bool isDisposed;
// Создает пул потоков с количеством потоков равным количеству ядер процессора.
public FixedThreadPool() : this(Environment.ProcessorCount) { }
// Создает пул потоков с указанным количеством потоков.
public FixedThreadPool(int numberThreads)
{
if (numberThreads <= 0)
throw new ArgumentException("numberThreads", "Количество потоков должно быть больше нуля.");
this.numberThreads = numberThreads;
this.stopLock = new object();
this.stopEvent = new ManualResetEvent(false);
this.scheduleEvent = new ManualResetEvent(false);
this.scheduleThread = new Thread(SelectAndStartFreeThread) { Name = "Schedule Thread", IsBackground = true };
scheduleThread.Start();
this.threads = new Thread[numberThreads];
this.threadsEvent = new Dictionary<int, ManualResetEvent>(numberThreads);
for (int i = 0; i < numberThreads; i++)
{
threads[i] = new Thread(ThreadWork) { Name = "Pool Thread", IsBackground = true };
threadsEvent.Add(threads[i].ManagedThreadId, new ManualResetEvent(false));
threads[i].Start();
}
this.tasks = new List<task>();
}
// Прерывает выполнение всех потоков, не дожидаясь их завершения и уничтожает за собой все ресурсы.
~FixedThreadPool()
{
Dispose(false);
}
// Высвобождает ресурсы, которые используются пулом потоков.
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
// Высвобождает ресурсы, которые используются пулом потоков.
protected virtual void Dispose(bool disposing)
{
if (!isDisposed)
{
if (disposing)
{
scheduleThread.Abort();
scheduleEvent.Dispose();
for (int i = 0; i < numberThreads; i++)
{
threads[i].Abort();
threadsEvent[threads[i].ManagedThreadId].Dispose();
}
}
isDisposed = true;
}
}
private Task SelectTask()
{
lock (tasks)
{
if (tasks.Count == 0)
throw new ArgumentException();
var waitingTasks = tasks.Where(t => !t.IsRunned);
var highTasks = waitingTasks.Where(t => t.Priority == TaskPriority.High);
var normalTasks = waitingTasks.Where(t => t.Priority == TaskPriority.Normal);
if (highTasks.Count() > 0)
{
if (numberThreads >= 4)
{
var runnedHighTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.High);
var runnedNormalTasks = tasks.Where(t => t.IsRunned && t.Priority == TaskPriority.Normal);
if (runnedHighTasks.Count() / (runnedNormalTasks.Count() + 1) < 3)
{
return highTasks.First();
}
else
{
return normalTasks.First();
}
}
else
{
return highTasks.First();
}
}
else
{
if (normalTasks.Count() > 0)
{
return normalTasks.First();
}
else
{
var lowTasks = tasks.Where(t => t.Priority == TaskPriority.Low).ToArray();
return lowTasks.FirstOrDefault();
}
}
}
}
private void ThreadWork()
{
while (true)
{
threadsEvent[Thread.CurrentThread.ManagedThreadId].WaitOne();
Task task = SelectTask();
if (task != null)
{
try
{
task.Execute();
}
finally
{
RemoveTask(task);
if (isStoping)
stopEvent.Set();
threadsEvent[Thread.CurrentThread.ManagedThreadId].Reset();
}
}
}
}
private void SelectAndStartFreeThread()
{
while (true)
{
scheduleEvent.WaitOne();
lock (threads)
{
foreach (var thread in threads)
{
if (threadsEvent[thread.ManagedThreadId].WaitOne(0) == false)
{
threadsEvent[thread.ManagedThreadId].Set();
break;
}
}
}
scheduleEvent.Reset();
}
}
private void AddTask(Task task)
{
lock (tasks)
{
tasks.Add(task);
}
scheduleEvent.Set();
}
private void RemoveTask(Task task)
{
lock (tasks)
{
tasks.Remove(task);
}
if (tasks.Count > 0 && tasks.Where(t => !t.IsRunned).Count() > 0)
{
scheduleEvent.Set();
}
}
// Ставит задачу в очередь.
public bool Execute(Task task)
{
if (task == null)
throw new ArgumentNullException("task", "The Task can't be null.");
lock (stopLock)
{
if (isStoping)
{
return false;
}
AddTask(task);
return true;
}
}
// Ставить несколько задачь в очередь.
public bool ExecuteRange(IEnumerable<task> tasks)
{
bool result = true;
foreach (var task in tasks)
{
if (!Execute(task))
result = false;
}
return result;
}
// Останавливает работу пула потоков. Ожидает завершения всех задач (запущенных и стоящих в очереди) и уничтожает все ресурсы.
public void Stop()
{
lock (stopLock)
{
isStoping = true;
}
while (tasks.Count > 0)
{
stopEvent.WaitOne();
stopEvent.Reset();
}
Dispose(true);
}
}