martinothamar / Mediator
- четверг, 10 ноября 2022 г. в 00:36:36
A high performance implementation of Mediator pattern in .NET using source generators.
This is a high performance .NET implementation of the Mediator pattern using the source generators feature introduced in .NET 5. The API and usage is mostly based on the great MediatR library, with some deviations to allow for better performance. Packages are .NET Standard 2.1 compatible.
The mediator pattern is great for implementing cross cutting concern (logging, metrics, etc) and avoiding "fat" constructors due to lots of injected services.
Goals for this library
In particular, source generators in this library is used to
IMediator
implementation
Send
methods are monomorphized (1 method per T), the generic ISender.Send
methods rely on theseIMediator
and Mediator
, the latter allows for better performanceNote
I am currently working on 2.0 version of Mediator, I recommend using the preview releases at this point. This version is currently in preview and includes a lot of improvements:
AddMediator
call (assembly attribute still works) (#21, #24)ArgumentNullException
for null messages (#22)Publish<INotification>
calls to Publish(object)
(#22)IRequest<>
typeIPipelineBehavior<,>
message with open genericsThis benchmark exposes the perf overhead of the libraries. Mediator (this library) and MediatR methods show the overhead of the respective mediator implementations. I've also included the MessagePipe library as it also has great performance.
<SendRequest | Stream>_Baseline
: simple method call into the handler class<SendRequest | Stream>_Mediator
: the concrete Mediator
class generated by this library<SendRequest | Stream>_MessagePipe
: the MessagePipe library<SendRequest | Stream>_IMediator
: call through the IMediator
interface in this library<SendRequest | Stream>_MediatR
: the MediatR librarySee benchmarks code for more details on the measurement.
There are two NuGet packages needed to use this library
IMediator
implementation and dependency injection setup.IRequest<,>
, INotification
), handler types (IRequestHandler<,>
, INotificationHandler<>
), pipeline types (IPipelineBehavior
)You install the source generator package into your edge/outermost project (i.e. ASP.NET Core application, Background worker project),
and then use the Mediator
package wherever you define message types and handlers.
Standard message handlers are automatically picked up and added to the DI container in the generated AddMediator
method.
Pipeline behaviors need to be added manually.
For example implementations, see the /samples folder. See the ASP.NET sample for a more real world setup.
IMessage
- marker interfaceIStreamMessage
- marker interfaceIBaseRequest
- market interface for requestsIRequest
- a request message, no return value (ValueTask<Unit>
)IRequest<out TResponse>
- a request message with a response (ValueTask<TResponse>
)IStreamRequest<out TResponse>
- a request message with a streaming response (IAsyncEnumerable<TResponse>
)IBaseCommand
- marker interface for commandsICommand
- a command message, no return value (ValueTask<Unit>
)ICommand<out TResponse>
- a command message with a response (ValueTask<TResponse>
)IStreamCommand<out TResponse>
- a command message with a streaming response (IAsyncEnumerable<TResponse>
)IBaseQuery
- marker interface for queriesIQuery<out TResponse>
- a query message with a response (ValueTask<TResponse>
)IStreamQuery<out TResponse>
- a query message with a streaming response (IAsyncEnumerable<TResponse>
)INotification
- a notification message, no return value (ValueTask
)As you can see, you can achieve the exact same thing with requests, commands and queries. But I find the distinction in naming useful if you for example use the CQRS pattern or for some reason have a preference on naming in your application.
IRequestHandler<in TRequest>
IRequestHandler<in TRequest, TResponse>
IStreamRequestHandler<in TRequest, out TResponse>
ICommandHandler<in TCommand>
ICommandHandler<in TCommand, TResponse>
IStreamCommandHandler<in TCommand, out TResponse>
IQueryHandler<in TQuery, TResponse>
IStreamQueryHandler<in TQuery, out TResponse>
INotificationHandler<in TNotification>
These types are used in correlation with the message types above.
IPipelineBehavior<TMessage, TResponse>
IStreamPipelineBehavior<TMessage, TResponse>
public sealed class GenericHandler<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage
{
public ValueTask<TResponse> Handle(TMessage message, CancellationToken cancellationToken, MessageHandlerDelegate<TMessage, TResponse> next)
{
// ...
return next(message, cancellationToken);
}
}
public sealed class GenericStreamHandler<TMessage, TResponse> : IStreamPipelineBehavior<TMessage, TResponse>
where TMessage : IStreamMessage
{
public IAsyncEnumerable<TResponse> Handle(TMessage message, CancellationToken cancellationToken, StreamHandlerDelegate<TMessage, TResponse> next)
{
// ...
return next(message, cancellationToken);
}
}
There are two ways to configure Mediator. Configuration values are needed during compile-time since this is a source generator:
MediatorOptionsAttribute
AddMediator
function.services.AddMediator(options =>
{
options.Namespace = "SimpleConsole.Mediator";
options.DefaultServiceLifetime = ServiceLifetime.Transient;
});
// or
[assembly: MediatorOptions(Namespace = "SimpleConsole.Mediator", DefaultServiceLifetime = ServiceLifetime.Transient)]
Namespace
- where the IMediator
implementation is generatedDefaultServiceLifetime
- the DI service lifetime
Singleton
- (default value) everything registered as singletons, minimal allocationsTransient
- handlers registered as transient, IMediator
/Mediator
/ISender
/IPublisher
still singletonScoped
- mediator and handlers registered as scopedIn this section we will get started with Mediator and go through a sample illustrating the various ways the Mediator pattern can be used in an application.
See the full runnable sample code in the SimpleEndToEnd sample.
dotnet add package Mediator.SourceGenerator --version 1.0.*
dotnet add package Mediator.Abstractions --version 1.0.*
or
<PackageReference Include="Mediator.SourceGenerator" Version="1.0.*">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Mediator.Abstractions" Version="1.0.*" />
In ConfigureServices
or equivalent, call AddMediator
(unless MediatorOptions
is configured, default namespace is Mediator
).
This registers your handler below.
using Mediator;
using Microsoft.Extensions.DependencyInjection;
using System;
var services = new ServiceCollection(); // Most likely IServiceCollection comes from IHostBuilder/Generic host abstraction in Microsoft.Extensions.Hosting
services.AddMediator();
var serviceProvider = services.BuildServiceProvider();
IRequest<>
typevar mediator = serviceProvider.GetRequiredService<IMediator>();
var ping = new Ping(Guid.NewGuid());
var pong = await mediator.Send(ping);
Debug.Assert(ping.Id == pong.Id);
// ...
public sealed record Ping(Guid Id) : IRequest<Pong>;
public sealed record Pong(Guid Id);
public sealed class PingHandler : IRequestHandler<Ping, Pong>
{
public ValueTask<Pong> Handle(Ping request, CancellationToken cancellationToken)
{
return new ValueTask<Pong>(new Pong(request.Id));
}
}
As soon as you code up message types, the source generator will add DI registrations automatically (inside AddMediator
).
P.S - You can inspect the code yourself - open Mediator.g.cs
in VS from Project -> Dependencies -> Analyzers -> Mediator.SourceGenerator -> Mediator.SourceGenerator.MediatorGenerator,
or just F12 through the code.
The pipeline behavior below validates all incoming Ping
messages.
Pipeline behaviors currently must be added manually.
services.AddMediator();
services.AddSingleton<IPipelineBehavior<Ping, Pong>, PingValidator>();
public sealed class PingValidator : IPipelineBehavior<Ping, Pong>
{
public ValueTask<Pong> Handle(Ping request, CancellationToken cancellationToken, MessageHandlerDelegate<Ping, Pong> next)
{
if (request is null || request.Id == default)
throw new ArgumentException("Invalid input");
return next(request, cancellationToken);
}
}
IPipelineBehavior<,>
message with open genericsAdd open generic handler to process all or a subset of messages passing through Mediator.
This handler will log any error that is thrown from message handlers (IRequest
, ICommand
, IQuery
).
It also publishes a notification allowing notification handlers to react to errors.
services.AddMediator();
services.AddSingleton(typeof(IPipelineBehavior<,>), typeof(ErrorLoggerHandler<,>));
public sealed record ErrorMessage(Exception Exception) : INotification;
public sealed record SuccessfulMessage() : INotification;
public sealed class ErrorLoggerHandler<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage // Constrained to IMessage, or constrain to IBaseCommand or any custom interface you've implemented
{
private readonly ILogger<ErrorLoggerHandler<TMessage, TResponse>> _logger;
private readonly IMediator _mediator;
public ErrorLoggerHandler(ILogger<ErrorLoggerHandler<TMessage, TResponse>> logger, IMediator mediator)
{
_logger = logger;
_mediator = mediator;
}
public async ValueTask<TResponse> Handle(TMessage message, CancellationToken cancellationToken, MessageHandlerDelegate<TMessage, TResponse> next)
{
try
{
var response = await next(message, cancellationToken);
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling message");
await _mediator.Publish(new ErrorMessage(ex));
throw;
}
}
}
We can define a notification handler to catch errors from the above pipeline behavior.
// Notification handlers are automatically added to DI container
public sealed class ErrorNotificationHandler : INotificationHandler<ErrorMessage>
{
public ValueTask Handle(ErrorMessage error, CancellationToken cancellationToken)
{
// Could log to application insights or something...
return default;
}
}
We can also define a notification handler that receives all notifications.
public sealed class StatsNotificationHandler : INotificationHandler<INotification> // or any other interface deriving from INotification
{
private long _messageCount;
private long _messageErrorCount;
public (long MessageCount, long MessageErrorCount) Stats => (_messageCount, _messageErrorCount);
public ValueTask Handle(INotification notification, CancellationToken cancellationToken)
{
Interlocked.Increment(ref _messageCount);
if (notification is ErrorMessage)
Interlocked.Increment(ref _messageErrorCount);
return default;
}
}
public sealed class GenericNotificationHandler<TNotification> : INotificationHandler<TNotification>
where TNotification : INotification // Generic notification handlers will be registered as open constrained types automatically
{
public ValueTask Handle(TNotification notification, CancellationToken cancellationToken)
{
return default;
}
}
Since version 1.* of this library there is support for streaming using IAsyncEnumerable
.
var mediator = serviceProvider.GetRequiredService<IMediator>();
var ping = new StreamPing(Guid.NewGuid());
await foreach (var pong in mediator.CreateStream(ping))
{
Debug.Assert(ping.Id == pong.Id);
Console.WriteLine("Received pong!"); // Should log 5 times
}
// ...
public sealed record StreamPing(Guid Id) : IStreamRequest<Pong>;
public sealed record Pong(Guid Id);
public sealed class PingHandler : IStreamRequestHandler<StreamPing, Pong>
{
public async IAsyncEnumerable<Pong> Handle(StreamPing request, [EnumeratorCancellation] CancellationToken cancellationToken)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(1000, cancellationToken);
yield return new Pong(request.Id);
}
}
}
Since this is a source generator, diagnostics are also included. Examples below
This is a work in progress list on the differences between this library and MediatR.
RequestHandlerDelegate<TResponse>()
-> MessageHandlerDelegate<TMessage, TResponse>(TMessage message, CancellationToken cancellationToken)
ServiceFactory
Microsoft.Extensions.DependencyInjection
, so it only works with DI containers that integrate with those abstractions.MediatR.Extensions.Microsoft.DependencyInjection
does transient service registration by default, which leads to a lot of allocations. Even if it is configured for singleton lifetime, IMediator
and ServiceFactory
services are registered as transient (not configurable).