25.11.18, Qwen, Channels, MediatR, BackGroundService, Projects
https://chat.qwen.ai/s/ad5f23ed-3004-44cd-8466-88bddfaf0b41?fev=0.0.245
https://giga.chat/link/gcsnijWCdI
F:\Projects\VS\QwenMediatrSln\QwenMediatrSln.sln
F:\Projects\VS\QwenMediatrSln\CaQwenMediatr01\CaQwenMediatr01.csproj
Да, вы описываете хорошо известный подход в асинхронном и распределённом программировании.
То, что вы описали, сочетает в себе несколько паттернов:
Асинхронное взаимодействие (Asynchronous Communication): Передатчик не блокируется, ожидая ответа. Он отправляет запрос и продолжает свою работу.
Паттерн "Сообщения запрос-ответ" (Request-Reply Messaging Pattern): В рамках асинхронного взаимодействия передается сообщение-запрос, на которое в будущем приходит сообщение-ответ.
Очередь запросов (Request Queue): Приемник использует очередь для буферизации входящих запросов, позволяя другому компоненту (обработчику) работать с ними асинхронно.
Паттерн "Команда" (Command Pattern): Запрос может быть представлен как объект команды, содержащий всю необходимую информацию для выполнения.
В совокупности это близко к паттернам, используемым в архитектуре, ориентированной на сообщения (Messaging Architectures), особенно в системах с асинхронной обработкой и слабой связанностью (loose coupling).
Как это реализовать на C#?
В .NET есть несколько встроенных и сторонних средств, которые позволяют реализовать подобное взаимодействие без "огородов":
1. System.Threading.Channels (Рекомендуется для межпроцессного/потокового взаимодействия внутри одного приложения)
Это современный, эффективный способ организации потокобезопасной очереди сообщений между асинхронными компонентами. Это именно то, что вы описываете на уровне одного приложения.
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
// Сообщение запроса
record RequestMessage(string Id, string Data, TaskCompletionSource<string> ResponseTcs);
// Класс, который отправляет запрос
class RequestSender
{
private readonly ChannelWriter<RequestMessage> _writer;
public RequestSender(ChannelWriter<RequestMessage> writer)
{
_writer = writer;
}
// Асинхронный вызов без ожидания
public async Task<string> SendRequestAsync(string data)
{
var tcs = new TaskCompletionSource<string>();
var request = new RequestMessage(Guid.NewGuid().ToString(), data, tcs);
await _writer.WriteAsync(request);
// Передатчик НЕ ЖДЕТ здесь
// Ждем ответа асинхронно через TaskCompletionSource
return await tcs.Task;
}
}
// Класс, который обрабатывает запросы
class RequestProcessor
{
private readonly ChannelReader<RequestMessage> _reader;
public RequestProcessor(ChannelReader<RequestMessage> reader)
{
_reader = reader;
}
public async Task ProcessRequestsAsync()
{
await foreach (var request in _reader.ReadAllAsync())
{
// Имитация обработки
await Task.Delay(100); // Симуляция работы
var result = $"Processed: {request.Data}";
// Отправляем ответ обратно через TaskCompletionSource
request.ResponseTcs.SetResult(result);
}
}
}
// Использование
class Program
{
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<RequestMessage>(10); // Ограниченная очередь
var sender = new RequestSender(channel.Writer);
var processor = new RequestProcessor(channel.Reader);
// Запускаем обработчик в фоне
var processorTask = Task.Run(async () => await processor.ProcessRequestsAsync());
// Отправляем запросы
var result1 = await sender.SendRequestAsync("Data1");
Console.WriteLine($"Response 1: {result1}");
var result2 = await sender.SendRequestAsync("Data2");
Console.WriteLine($"Response 2: {result2}");
// Закрываем канал для завершения
channel.Writer.Complete();
await processorTask; // Ждем завершения обработчика
}
}
Да, вы описываете хорошо известный подход в асинхронном и распределённом программировании.
То, что вы описали, сочетает в себе несколько паттернов:
1. **Асинхронное взаимодействие (Asynchronous Communication):** Передатчик не блокируется, ожидая ответа. Он отправляет запрос и продолжает свою работу.
2. **Паттерн "Сообщения запрос-ответ" (Request-Reply Messaging Pattern):** В рамках асинхронного взаимодействия передается сообщение-запрос, на которое в будущем приходит сообщение-ответ.
3. **Очередь запросов (Request Queue):** Приемник использует очередь для буферизации входящих запросов, позволяя другому компоненту (обработчику) работать с ними асинхронно.
4. **Паттерн "Команда" (Command Pattern):** Запрос может быть представлен как объект команды, содержащий всю необходимую информацию для выполнения.
В совокупности это близко к паттернам, используемым в **архитектуре, ориентированной на сообщения (Messaging Architectures)**, особенно в системах с **асинхронной обработкой** и **слабой связанностью (loose coupling)**.
---
### Как это реализовать на C#?
В .NET есть несколько встроенных и сторонних средств, которые позволяют реализовать подобное взаимодействие без "огородов":
#### 1. **`System.Threading.Channels` (Рекомендуется для межпроцессного/потокового взаимодействия внутри одного приложения)**
Это современный, эффективный способ организации потокобезопасной очереди сообщений между асинхронными компонентами. Это именно то, что вы описываете на уровне одного приложения.
**Пример:**
```csharp
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
// Сообщение запроса
record RequestMessage(string Id, string Data, TaskCompletionSource<string> ResponseTcs);
// Класс, который отправляет запрос
class RequestSender
{
private readonly ChannelWriter<RequestMessage> _writer;
public RequestSender(ChannelWriter<RequestMessage> writer)
{
_writer = writer;
}
// Асинхронный вызов без ожидания
public async Task<string> SendRequestAsync(string data)
{
var tcs = new TaskCompletionSource<string>();
var request = new RequestMessage(Guid.NewGuid().ToString(), data, tcs);
await _writer.WriteAsync(request);
// Передатчик НЕ ЖДЕТ здесь
// Ждем ответа асинхронно через TaskCompletionSource
return await tcs.Task;
}
}
// Класс, который обрабатывает запросы
class RequestProcessor
{
private readonly ChannelReader<RequestMessage> _reader;
public RequestProcessor(ChannelReader<RequestMessage> reader)
{
_reader = reader;
}
public async Task ProcessRequestsAsync()
{
await foreach (var request in _reader.ReadAllAsync())
{
// Имитация обработки
await Task.Delay(100); // Симуляция работы
var result = $"Processed: {request.Data}";
// Отправляем ответ обратно через TaskCompletionSource
request.ResponseTcs.SetResult(result);
}
}
}
// Использование
class Program
{
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<RequestMessage>(10); // Ограниченная очередь
var sender = new RequestSender(channel.Writer);
var processor = new RequestProcessor(channel.Reader);
// Запускаем обработчик в фоне
var processorTask = Task.Run(async () => await processor.ProcessRequestsAsync());
// Отправляем запросы
var result1 = await sender.SendRequestAsync("Data1");
Console.WriteLine($"Response 1: {result1}");
var result2 = await sender.SendRequestAsync("Data2");
Console.WriteLine($"Response 2: {result2}");
// Закрываем канал для завершения
channel.Writer.Complete();
await processorTask; // Ждем завершения обработчика
}
}
```
* `RequestSender` — ваш "Передатчик".
* `RequestProcessor` — ваш "Другой процесс Приемника".
* `Channel` — это "очередь", через которую они общаются.
* `TaskCompletionSource` позволяет асинхронно "вернуть" результат из обработчика в вызывающий код без блокировки.
#### 2. **MediatR (Для слабосвязанной архитектуры внутри приложения)**
Это библиотека, реализующая паттерны CQRS и Mediator. Она позволяет отправлять команды (`IRequest`) и получать ответы, при этом обработчики (`IRequestHandler`) выполняются асинхронно. Это более высокоуровневый подход, чем `Channels`, и удобен для организации бизнес-логики.
* Установка: `dotnet add package MediatR`
* Позволяет легко отправлять `Request` и получать `Response` асинхронно через `IMediator.Send()`.
#### 3. **Внешние очереди сообщений (Для распределенных приложений)**
Если ваши "Приложения/Сервисы" находятся на разных процессах или машинах, то `Channels` уже не подойдут напрямую. Тогда используются внешние брокеры сообщений:
* **RabbitMQ** (с плагином для запрос-ответа, например)
* **Apache Kafka** (чаще для потоков данных, но можно реализовать)
* **Azure Service Bus**
* **Amazon SQS/SNS**
Эти решения обеспечивают надежность, масштабируемость и persistence сообщений, но требуют дополнительной инфраструктуры.
---
### Вывод:
* Для взаимодействия **внутри одного приложения** (между потоками, задачами, сервисами) идеально подходит **`System.Threading.Channels`**.
* Для архитектурного подхода к обработке команд и запросов внутри приложения подходит библиотека **MediatR**.
* Для **распределенных приложений** используются внешние **брокеры сообщений**.
`System.Threading.Channels` — это как раз то, что позволяет "не городить огород" и реализовать именно тот паттерн, который вы описали.
Извините за путаницу. Ваш вопрос был адресован мне (Assistant), а не другому участнику. Я – тот самый "Coder" в данном контексте, и моя предыдущая детальная сводка была ответом на ваш вопрос.
Повторю краткий итог:
* **Название подхода:** Это асинхронное взаимодействие, часто описываемое как паттерн **Request-Reply** в рамках **Message Queue** или **Channel**.
* **Цель:** Обеспечить слабую связанность и асинхронную обработку, когда отправитель не блокируется ожиданием ответа.
* **Реализация на C#:**
* Для **внутрипроцессного** взаимодействия: `System.Threading.Channels`.
* Для **архитектурного** подхода внутри приложения: `MediatR`.
* Для **распределенных** приложений: `RabbitMQ`, `Azure Service Bus` и т.д.
Хорошо, если вы ищете готовые **библиотеки и пакеты**, а не низкоуровневые примитивы, вот список решений, которые реализуют описанный вами паттерн "асинхронный запрос-ответ" с делегированием обработки, не требуя самостоятельной реализации всей инфраструктуры (т.е. "городить огород"):
---
### 1. **MediatR**
* **Описание:** Это, пожалуй, самое популярное и близкое к "готовому решению" для описанного сценария **внутри одного приложения**. Он реализует паттерн Mediator и часто используется вместе с CQRS.
* **Как подходит:** Вы отправляете `IRequest<TResponse>` (запрос с ожидаемым ответом). MediatR находит соответствующий `IRequestHandler<IRequest<TResponse>, TResponse>`, который и является "другим процессом/обработчиком". Отправка запроса асинхронна (`await mediator.Send(request)`), но обработчик может быть реализован асинхронно, не блокируя основной поток.
* **Плюсы:**
* Простота интеграции с ASP.NET Core (DI).
* Поддержка посредников (Behaviors) для логики, транзакций, валидации и т.д.
* Хорошо подходит для слабосвязанной архитектуры.
* **Недостатки:**
* В основном для внутрипроцессного взаимодействия.
* Не предоставляет persistence сообщений "из коробки" (если не настраивать дополнительно).
* **NuGet:** `MediatR`
---
### 2. **Talos (от команды NServiceBus)**
* **Описание:** Это более комплексный фреймворк для построения распределенных систем. Он предоставляет готовые абстракции для команд, событий, запросов и ответов.
* **Как подходит:** Поддерживает асинхронные запросы-ответы с помощью `IHandleMessages<T>` и встроенной поддержки `TaskCompletionSource`-подобного механизма для ответов. Использует брокер сообщений (например, RabbitMQ, Azure Service Bus).
* **Плюсы:**
* Готовое решение для сложных распределенных сценариев.
* Встроенные функции надежности (retry, saga, etc.).
* **Недостатки:**
* Более тяжеловесный и сложный, чем MediatR.
* Требует настройки брокера сообщений.
* **NuGet:** `NServiceBus` (Talos - это часть экосистемы NServiceBus, его часто не выделяют отдельно как пакет, но это подход/архитектура, которую предоставляет NServiceBus).
---
### 3. **Rebus**
* **Описание:** Еще один фреймворк для шины сообщений в .NET. Проще, чем NServiceBus, но мощнее, чем простые каналы.
* **Как подходит:** Поддерживает `Send` (одностороннее сообщение) и `Request/Reply` (запрос-ответ). Вы отправляете сообщение, и оно обрабатывается зарегистрированным обработчиком. Можно настроить асинхронную обработку и отправку ответа.
* **Плюсы:**
* Простота настройки и использования.
* Поддерживает множество брокеров (RabbitMQ, Azure Service Bus, даже SQL Server как очередь).
* Поддерживает Saga (для сложных бизнес-процессов).
* **Недостатки:**
* Требует брокер сообщений для production-сценариев.
* **NuGet:** `Rebus`
---
### 4. **Akka.NET**
* **Описание:** Реализация модели акторов (Actor Model) для .NET. Это мощный фреймворк для создания распределенных, отказоустойчивых и масштабируемых систем.
* **Как подходит:** Акторы общаются через асинхронные сообщения. Вы можете отправить сообщение актору и получить ответ в будущем, используя `Ask` (запрос-ответ) или `Tell` (отправить и забыть). Обработка делегируется актору, который получает сообщение.
* **Плюсы:**
* Мощная модель для сложных сценариев.
* Встроенные механизмы отказоустойчивости.
* **Недостатки:**
* Сложность обучения и внедрения.
* Может быть "из пушки по воробьям" для простых задач.
* **NuGet:** `Akka`
---
### 5. **MassTransit**
* **Описание:** Один из самых популярных фреймворков для шины сообщений в .NET. Абстрагирует работу с брокерами сообщений.
* **Как подходит:** Поддерживает `Publish/Subscribe`, `Send/Reply` и `Request/Response`. Вы отправляете запрос, он попадает в очередь, обрабатывается зарегистрированным потребителем (`IRequestHandler` или `IConsumer`), и ответ возвращается.
* **Плюсы:**
* Богатый функционал.
* Интеграция с ASP.NET Core.
* Поддержка RabbitMQ, Azure Service Bus, Amazon SQS и других.
* **Недостатки:**
* Требует настройки брокера.
* Может быть сложнее MediatR для простых случаев.
* **NuGet:** `MassTransit`
---
### Вывод:
Если вы хотите **готовое решение "из коробки"**:
* Для **внутрипроцессного** взаимодействия с акцентом на архитектуру: **MediatR**.
* Для **распределенного** взаимодействия с брокером сообщений:
* **MassTransit** (широкие возможности, популярный выбор).
* **Rebus** (проще, чем MassTransit).
* **NServiceBus** (мощный, но сложный и дорогой).
* Для **сложных распределенных** систем с акцентом на отказоустойчивость: **Akka.NET**.
`MediatR` и `Rebus` часто рассматриваются как наиболее "легкие" и "готовые" решения для описанных вами сценариев, в зависимости от того, нужна ли вам распределенность.
Для описанного сценария, где у вас есть два BackgroundService (A и B), взаимодействующие внутри одного приложения ASP.NET Core, и где важно, чтобы отправитель не ждал ответа сразу, наилучшим и самым простым выбором будет System.Threading.Channels.
Это встроенный примитив .NET, но он гораздо более структурирован и готов к использованию, чем, например, BlockingCollection, и он абсолютно идеально подходит для описанного вами сценария передачи сообщений между сервисами без блокировки.
Он не требует внешнего брокера, легко настраивается, эффективен и полностью соответствует вашему паттерну: "отправил и забыл" -> "поставлено в очередь" -> "обработано асинхронно" -> "ответ отправлен".
Использование MediatR было бы избыточным, так как он больше подходит для обработки команд и запросов внутри бизнес-логики, а не для прямой коммуникации между BackgroundService.
Реализация на System.Threading.Channels
Мы создадим два канала:
AtoBChannel: Для передачи сообщений от SenderA к ReceiverB.
BtoAChannel: Для передачи ответов от SenderB к ReceiverA
// 1. Определяем класс сообщения
public record MessageData(string Id, string Content);
// 2. Создаем класс для хранения каналов как Singleton
public class MessageChannels
{
public Channel<MessageData> AtoB { get; } = Channel.CreateBounded<MessageData>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.DropOldest // Или Wait, на ваше усмотрение
});
public Channel<MessageData> BtoA { get; } = Channel.CreateBounded<MessageData>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.DropOldest
});
}
// 3. BackgroundService A
public class ServiceA : BackgroundService
{
private readonly ILogger<ServiceA> _logger;
private readonly MessageChannels _channels;
public ServiceA(ILogger<ServiceA> logger, MessageChannels channels)
{
_logger = logger;
_channels = channels;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Запускаем задачи ReceiverA и SenderA параллельно
await Task.WhenAll(
Task.Run(ReceiverA, stoppingToken),
Task.Run(SenderA, stoppingToken)
);
}
private async Task ReceiverA()
{
await foreach (var message in _channels.BtoA.Reader.ReadAllAsync())
{
Console.WriteLine($"[ServiceA - ReceiverA] Получено сообщение от B: Id={message.Id}, Content='{message.Content}'");
// Здесь можно обработать входящее сообщение от B
}
}
private async Task SenderA()
{
int counter = 0;
while (!true) // В реальном коде используйте CancellationToken
{
await Task.Delay(5000); // Отправляем сообщение каждые 5 секунд для демонстрации
counter++;
var message = new MessageData($"A_Msg_{counter}", $"Привет от ServiceA, итерация {counter}!");
Console.WriteLine($"[ServiceA - SenderA] Отправляет сообщение в B: Id={message.Id}, Content='{message.Content}'");
await _channels.AtoB.Writer.WriteAsync(message);
// SenderA выходит из функции и НЕ ЖДЕТ ответа
}
}
}
// 4. BackgroundService B
public class ServiceB : BackgroundService
{
private readonly ILogger<ServiceB> _logger;
private readonly MessageChannels _channels;
public ServiceB(ILogger<ServiceB> logger, MessageChannels channels)
{
_logger = logger;
_channels = channels;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Запускаем задачи ReceiverB и SenderB параллельно
await Task.WhenAll(
Task.Run(ReceiverB, stoppingToken),
Task.Run(SenderB, stoppingToken)
);
}
private async Task ReceiverB()
{
await foreach (var message in _channels.AtoB.Reader.ReadAllAsync())
{
Console.WriteLine($"[ServiceB - ReceiverB] Принял сообщение от A: Id={message.Id}, Content='{message.Content}'. Помещает в очередь (виртуально).");
// ReceiverB получает и "помещает в очередь" (реально, просто передает дальше в логике)
// В реальном коде здесь может быть запись в очередь в памяти (например, еще один Channel),
// или передача в очередь обработки, как вы описывали.
// Для простоты, сразу передаем в SenderB через внутренний вызов или другой канал.
// В данном примере, мы просто передадим сообщение в SenderB напрямую, с небольшим изменением.
// Лучше всего для этого подойдет еще один внутренний Channel, но для демонстрации просто вызовем метод.
await ProcessAndSendReply(message);
}
}
// В реальной жизни, это может быть отдельный BackgroundService или Task,
// который читает из внутренней очереди (например, ConcurrentQueue или другой Channel).
private async Task ProcessAndSendReply(MessageData originalMessage)
{
await Task.Delay(100); // Имитация обработки
var modifiedContent = $"[Обработано B] {originalMessage.Content}";
var replyMessage = new MessageData($"Reply_to_{originalMessage.Id}", modifiedContent);
Console.WriteLine($"[ServiceB - SenderB] Обработал сообщение. Генерирует ответ: Id={replyMessage.Id}, Content='{replyMessage.Content}'. Отправляет в A.");
await _channels.BtoA.Writer.WriteAsync(replyMessage);
}
private async Task SenderB()
{
// Этот метод не делает ничего в данном сценарии, так как отправка
// происходит внутри ProcessAndSendReply после обработки сообщения из очереди.
// В более сложной схеме, SenderB мог бы слушать другую очередь,
// содержащую "готовые к отправке ответы", и отправлять их.
// Но для простоты, мы просто делаем отправку сразу после обработки в ReceiverB.
await Task.CompletedTask; // Заглушка
}
}
// 5. Регистрация в Program.cs (или Startup.cs)
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Регистрируем MessageChannels как Singleton
builder.Services.AddSingleton<MessageChannels>();
// Регистрируем Background Services
builder.Services.AddHostedService<ServiceA>();
builder.Services.AddHostedService<ServiceB>();
var app = builder.Build();
app.Run();
}
}
Объяснение:
MessageChannels: Это Singleton-сервис, который предоставляет два Channel'а. Это центральная точка обмена сообщениями между ServiceA и ServiceB.ServiceA:SenderA: Периодически создает сообщения и отправляет их в AtoBChannel. Он не ждет ответа.ReceiverA: Асинхронно читает из BtoAChannel, получая ответы от ServiceB.
ServiceB:ReceiverB: Асинхронно читает из AtoBChannel. Как только получает сообщение, он выходит из цикла обработки, как вы и сказали. Он "помещает в очередь" (в данном случае, просто передает управление дальше).ProcessAndSendReply: Это "другой процесс Приемника" из вашего описания. Он получает сообщение, обрабатывает его и отправляет ответ в BtoAChannel.SenderB: В данном примере пустой, потому что отправка происходит сразу после обработки в ProcessAndSendReply. В более сложной схеме, SenderB мог бы слушать отдельную очередь готовых ответов.
- Консольный вывод: Каждый компонент (
SenderA, ReceiverA, ReceiverB, SenderB условно, через ProcessAndSendReply) выводит сообщение о своих действиях, как вы просили.
Этот код полностью реализует описанный вами сценарий с использованием System.Threading.Channels, который, несмотря на то, что это "примитив", предоставляет готовую, структурированную и эффективную модель для такого типа взаимодействия внутри одного приложения.
---
Хорошо, давайте рассмотрим, как можно использовать MediatR в сценарии, аналогичном нашему предыдущему примеру с Channels, и посмотрим, какие функции выполняет MediatR и почему он может быть избыточным для прямой коммуникации между BackgroundService.
Важно: MediatR — это Mediator. Его основная цель — уменьшить связанность (decoupling) между компонентами, которые хотят обмениваться сообщениями (командами, запросами, событиями). Он позволяет отправить сообщение, не зная, кто его получит и обработает. Это очень полезно в сложной бизнес-логике, где много разных обработчиков могут реагировать на одно и то же событие или команду.
В нашем случае с BackgroundService, где ServiceA явно хочет отправить что-то в ServiceB, и наоборот, MediatR добавляет уровень абстракции, который может быть избыточным, но при этом может быть полезен, если, например, обработчики не привязаны к конкретному BackgroundService, а просто зарегистрированы в DI-контейнере.
Сценарий для MediatR:
ServiceA (SenderA) хочет запросить у ServiceB какие-то данные (например, статус или вычисленное значение).
ServiceB (ReceiverB) получает этот запрос, обрабатывает его и возвращает результат.
ServiceA (ReceiverA) получает результат.
Это больше похоже на паттерн Request/Reply или CQRS (Command Query Responsibility Segregation), чем на простую передачу сообщений.
Пример: MediatR для Request/Reply между BackgroundService
dotnet add package MediatR
dotnet add package Microsoft.Extensions.DependencyInjection
using MediatR;
// Запрос от A к B
public record GetDataRequest(string RequestId, string Parameter) : IRequest<GetDataResponse>;
// Ответ от B к A
public record GetDataResponse(string RequestId, string ProcessedData, DateTime Timestamp);
using MediatR;
using Microsoft.Extensions.Logging;
// Этот класс будет зарегистрирован в DI и вызван MediatR
public class GetDataRequestHandler : IRequestHandler<GetDataRequest, GetDataResponse>
{
private readonly ILogger<GetDataRequestHandler> _logger;
public GetDataRequestHandler(ILogger<GetDataRequestHandler> logger)
{
_logger = logger;
}
public async Task<GetDataResponse> Handle(GetDataRequest request, CancellationToken cancellationToken)
{
// Имитация обработки
await Task.Delay(1000, cancellationToken); // Симуляция работы
var processedData = $"[Обработано B] Входной параметр: '{request.Parameter}', время обработки: {DateTime.UtcNow:HH:mm:ss}";
var response = new GetDataResponse(request.RequestId, processedData, DateTime.UtcNow);
Console.WriteLine($"[MediatR Handler (в ServiceB)] Обработал запрос {request.RequestId}. Ответ: {response.ProcessedData}");
// Возвращаем ответ
return response;
}
}
4. BackgroundService A (отправляет запрос и получает ответ):
using MediatR;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
public class ServiceA_MediatR : BackgroundService
{
private readonly ILogger<ServiceA_MediatR> _logger;
private readonly IMediator _mediator; // MediatR отвечает за отправку и получение ответа
public ServiceA_MediatR(ILogger<ServiceA_MediatR> logger, IMediator mediator)
{
_logger = logger;
_mediator = mediator; // MediatR внедряется через DI
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int counter = 0;
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(5000, stoppingToken); // Отправляем запрос каждые 5 секунд
counter++;
var requestId = $"Request_A_{counter}";
var request = new GetDataRequest(requestId, $"Параметр от A, итерация {counter}");
Console.WriteLine($"[ServiceA - SenderA via MediatR] Отправляет запрос {requestId} через MediatR.");
// MediatR: Отправка запроса и ожидание ответа
var response = await _mediator.Send(request, stoppingToken);
Console.WriteLine($"[ServiceA - ReceiverA via MediatR] Получен ответ на {response.RequestId}: {response.ProcessedData}");
}
}
}
5. BackgroundService B (только для демонстрации, что он может быть "ничем не примечательным" с точки зрения MediatR):
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
public class ServiceB_MediatR : BackgroundService
{
private readonly ILogger<ServiceB_MediatR> _logger;
public ServiceB_MediatR(ILogger<ServiceB_MediatR> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// ServiceB не делает ничего особенного в этом примере.
// Его "работа" происходит в.GetDataRequestHandler.
// MediatR вызывает обработчик независимо от того, в каком BGService он зарегистрирован.
Console.WriteLine("[ServiceB_MediatR] Запущен. Обработка запросов происходит через MediatR Handler.");
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken); // Просто живем
}
}
}
6. Регистрация в Program.cs:
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Регистрация MediatR
// AddMediatR автоматически находит и регистрирует все IRequestHandler'ы в сборке
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
// Регистрация Background Services
builder.Services.AddHostedService<ServiceA_MediatR>();
builder.Services.AddHostedService<ServiceB_MediatR>();
var app = builder.Build();
app.Run();
}
}
Функции MediatR в этом примере:
- Децентрализованная отправка:
ServiceA отправляет GetDataRequest, не зная, кто его обработает. Он просто вызывает _mediator.Send(...). - Поиск обработчика:
MediatR (через DI) находит зарегистрированный IRequestHandler<GetDataRequest, GetDataResponse>, которым является GetDataRequestHandler. - Вызов обработчика:
MediatR вызывает метод Handle обработчика. - Возврат результата:
MediatR возвращает результат из обработчика обратно в ServiceA. - Инъекция зависимостей:
MediatR сам получает через DI все зависимости, необходимые обработчику (например, ILogger).
Вывод:
MediatR отвечает за маршрутизацию запроса от отправителя к обработчику и возврат результата.- Он создает слабую связанность между
ServiceA и GetDataRequestHandler (который может быть "в ServiceB", но не обязан). - Он подходит для сценариев, где много разных обработчиков, или где важно избежать прямых зависимостей.
- В нашем простом случае (A <-> B) он добавляет абстракцию, которая может быть избыточной по сравнению с
Channels, где связь между отправителем и получателем более прозрачна и контролируема.
Channels — это более прямой и эффективный способ для точного взаимодействия между известными компонентами, как в вашем исходном сценарии. MediatR — это инструмент для архитектурной гибкости и декомпозиции бизнес-логики.
Хорошо, давайте разберем самый распространенный и "родной" use-case для MediatR и посмотрим, для чего он создан и что делает лучше всего.
Основное назначение MediatR
MediatR реализует паттерн Mediator. Его основная цель — уменьшить связанность (decoupling) между компонентами в приложении. Вместо того чтобы один класс напрямую вызывал методы другого (например, orderService.ProcessOrder(...), notificationService.Send(...)), он отправляет сообщение (например, ProcessOrderCommand), и MediatR находит и вызывает соответствующий обработчик (ProcessOrderCommandHandler).
Это особенно полезно в архитектуре, ориентированной на CQRS (Command Query Responsibility Segregation), где:
- Команды (Commands) — это запросы на изменение состояния системы (например,
CreateUserCommand, UpdateOrderStatusCommand). Они обрабатываются IRequestHandler<,> и не возвращают данные. - Запросы (Queries) — это запросы на получение данных (например,
GetUserByIdQuery, GetOrdersByUserQuery). Они обрабатываются IRequestHandler<, TResponse> и возвращают данные. - События (Events) — это оповещения о том, что что-то произошло (например,
OrderCreatedEvent, PaymentFailedEvent). Они обрабатываются INotificationHandler<> и могут вызываться множеством обработчиков.
Самый распространенный и "родной" Use-case: CQRS с обработкой команд и запросов в слое Application/Domain
Это классический сценарий использования MediatR — обработка бизнес-операций (команд) и запросов данных (queries) в веб-приложении, например, в ASP.NET Core MVC или Razor Pages.
Пример: Создание пользователя
Представим, что у нас есть ASP.NET Core приложение. Пользователь заполняет форму регистрации, и нам нужно:
- Принять данные из формы.
- Создать пользователя в системе.
- Отправить ему приветственное письмо.
- Записать событие в лог аудита.
Без MediatR это может выглядеть так:
// В контроллере или PageModel
public async Task<IActionResult> OnPostAsync(CreateUserModel model)
{
if (!ModelState.IsValid)
return Page();
// 1. Создание пользователя
var user = new User { Name = model.Name, Email = model.Email };
await _userRepository.AddAsync(user);
// 2. Отправка письма
await _emailService.SendWelcomeEmailAsync(user.Email);
// 3. Запись в лог аудита
await _auditLogService.LogAsync($"User {user.Name} created at {DateTime.UtcNow}");
return RedirectToPage("/Success");
}
Код в контроллере сильно связан с UserRepository, EmailService, AuditLogService. Это затрудняет тестирование и усложняет добавление новой логики (например, отправки SMS).
Как это выглядит с MediatR:
1. Определение команды:
using MediatR;
// Команда - запрос на изменение состояния
public record CreateUserCommand(string Name, string Email) : IRequest<Guid>; // Возвращаем ID созданного пользователя
using MediatR;
using Microsoft.Extensions.Logging;
2. Определение обработчика команды:
public class CreateUserCommandHandler : IRequestHandler<CreateUserCommand, Guid>
{
private readonly IUserRepository _userRepository;
private readonly IPublisher _publisher; // MediatR также предоставляет IPublisher для отправки событий
private readonly ILogger<CreateUserCommandHandler> _logger;
public CreateUserCommandHandler(
IUserRepository userRepository,
IPublisher publisher, // Для отправки событий после создания
ILogger<CreateUserCommandHandler> logger)
{
_userRepository = userRepository;
_publisher = publisher;
_logger = logger;
}
public async Task<Guid> Handle(CreateUserCommand request, CancellationToken cancellationToken)
{
// 1. Создание и сохранение пользователя
var user = new User { Id = Guid.NewGuid(), Name = request.Name, Email = request.Email };
await _userRepository.AddAsync(user, cancellationToken);
// 2. Логика завершена, отправляем событие
var @event = new UserCreatedEvent(user.Id, user.Name, user.Email);
await _publisher.Publish(@event, cancellationToken);
_logger.LogInformation("User {UserId} created via command handler.", user.Id);
// 3. Возвращаем ID созданного пользователя
return user.Id;
}
}
3. Определение события:
using MediatR;
// Событие - произошло что-то
public record UserCreatedEvent(Guid UserId, string UserName, string UserEmail) : INotification;
4. Определение обработчиков событий:
using MediatR;
using Microsoft.Extensions.Logging;
public class WelcomeEmailNotificationHandler : INotificationHandler<UserCreatedEvent>
{
private readonly IEmailService _emailService;
private readonly ILogger<WelcomeEmailNotificationHandler> _logger;
public WelcomeEmailNotificationHandler(IEmailService emailService, ILogger<WelcomeEmailNotificationHandler> logger)
{
_emailService = emailService;
_logger = logger;
}
public async Task Handle(UserCreatedEvent notification, CancellationToken cancellationToken)
{
// 1. Отправка письма
await _emailService.SendWelcomeEmailAsync(notification.UserEmail);
_logger.LogInformation("Welcome email sent to {UserEmail} for user {UserId}.", notification.UserEmail, notification.UserId);
}
}
public class AuditLogNotificationHandler : INotificationHandler<UserCreatedEvent>
{
private readonly IAuditLogService _auditLogService;
private readonly ILogger<AuditLogNotificationHandler> _logger;
public AuditLogNotificationHandler(IAuditLogService auditLogService, ILogger<AuditLogNotificationHandler> logger)
{
_auditLogService = auditLogService;
_logger = logger;
}
public async Task Handle(UserCreatedEvent notification, CancellationToken cancellationToken)
{
// 1. Запись в лог аудита
await _auditLogService.LogAsync($"User {notification.UserName} (ID: {notification.UserId}) created at {DateTime.UtcNow}");
_logger.LogInformation("Audit log entry created for user {UserId}.", notification.UserId);
}
}
5. Контроллер / PageModel становится чистым:
// В контроллере или PageModel
public class RegisterModel : PageModel
{
private readonly IMediator _mediator; // MediatR внедряется
public RegisterModel(IMediator mediator)
{
_mediator = mediator; // Получаем MediatR через DI
}
[BindProperty]
public CreateUserModel Input { get; set; } = new();
public async Task<IActionResult> OnPostAsync()
{
if (!ModelState.IsValid)
return Page();
// 1. Отправляем команду через MediatR
var command = new CreateUserCommand(Input.Name, Input.Email);
var userId = await _mediator.Send(command); // MediatR находит и вызывает CreateUserCommandHandler
// 2. Контроллер не знает и не заботится о том, что происходит внутри обработчика или его подписчиках
// Все побочные эффекты (email, audit log) происходят асинхронно через события.
return RedirectToPage("/Success", new { Id = userId });
}
}
6. Регистрация в Program.cs:
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// Добавляем MediatR. Он автоматически регистрирует IRequestHandler и INotificationHandler из сборки
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining<Program>());
// Регистрируем другие зависимости
builder.Services.AddScoped<IUserRepository, UserRepository>();
builder.Services.AddScoped<IEmailService, SmtpEmailService>();
builder.Services.AddScoped<IAuditLogService, DatabaseAuditLogService>();
var app = builder.Build();
app.MapRazorPages();
app.Run();
}
}
--
Что делает MediatR лучше всего:
- Децентрализация и слабая связанность: Контроллер не зависит от
EmailService или AuditLogService. Он просто отправляет команду. Обработчик команды и обработчики событий зависят от конкретных сервисов. - Чистота слоя представления: Контроллер/PageModel сосредоточен на обработке HTTP-запроса и возврате ответа, а не на бизнес-логике.
- Легкость добавления новой логики: Хотите отправлять SMS при создании пользователя? Просто создайте новый
INotificationHandler<UserCreatedEvent> и зарегистрируйте его. Контроллер и основной обработчик команды не изменяются. - Тестируемость: Легко тестировать обработчик команды, подставив моки репозитория и
IPublisher. - Четкое разделение ответственности (CQRS): Команды для изменений, Запросы для чтения. Это улучшает понимание архитектуры.
- Поддержка событийного взаимодействия: Позволяет легко реализовать паттерн "Event Sourcing" или просто реагировать на события.
Именно этот сценарий — обработка команд и запросов в бизнес-логике с использованием событий для побочных эффектов — является "родной" и наиболее распространенной областью применения MediatR.