Marten Integration Guide
Overview
The Book Store API uses Marten - a .NET library that turns PostgreSQL into a powerful document database and event store. Marten provides the foundation for our event sourcing architecture.
Why Marten?
Benefits
✅ Event Store on PostgreSQL: No additional infrastructure needed
✅ Document Database: Store and query JSON documents
✅ Event Sourcing: Built-in support for event streams
✅ Async Projections: Automatically build read models from events
✅ Strong .NET Integration: LINQ support, type-safe queries
✅ Metadata Tracking: Correlation/causation IDs for distributed tracing
What Marten Provides
- Event Store: Append-only event log with stream management
- Document Store: Store and query JSON documents
- Projections: Transform events into read models
- Session Management: Unit of work pattern
- Schema Management: Automatic database schema creation
Event Sourcing Basics
Note
For a comprehensive introduction to event sourcing concepts and patterns, see the Event Sourcing Guide. This section focuses on Marten-specific implementation.
What is Event Sourcing?
Instead of storing current state, we store all changes as immutable events.
Traditional Approach (CRUD):
// Update book title
book.Title = "New Title";
await db.SaveChangesAsync();
// Old title is lost forever
Event Sourcing Approach:
// Append event to stream
var @event = new BookUpdated(bookId, "New Title", ...);
session.Events.Append(bookId, @event);
await session.SaveChangesAsync();
// All history preserved
Event Store Structure
Marten creates these PostgreSQL tables:
mt_events → All events (append-only)
mt_streams → Stream metadata (version, type, etc.)
mt_doc_* → Document tables (projections)
Example Event Record:
SELECT id, stream_id, type, data, version, timestamp
FROM mt_events
WHERE stream_id = '018d5e4a-7b2c-7000-8000-123456789abc';
| id | stream_id | type | data | version | timestamp |
|---|---|---|---|---|---|
| 1 | book-123 | BookAdded | {...} | 1 | 2025-01-01 |
| 2 | book-123 | BookUpdated | {...} | 2 | 2025-01-02 |
Event Sourcing for Analytics
One of the most powerful benefits of event sourcing is the ability to perform real-time and offline data analysis on the complete event history.
Real-Time Analytics
Stream events to analytics systems as they happen:
// Example: Real-time book sales tracking
public class BookSalesAnalytics
{
public async Task ProcessEvent(IEvent @event)
{
switch (@event.Data)
{
case BookAdded added:
await UpdateCatalogMetrics(added);
break;
case BookUpdated updated:
await TrackPriceChanges(updated);
break;
case BookSoftDeleted deleted:
await RecordDeletion(deleted);
break;
}
}
}
Use Cases:
- Live Dashboards: Real-time inventory, sales, and user activity
- Alerting: Trigger notifications on specific events (low stock, price changes)
- Streaming Analytics: Process events with tools like Apache Kafka, Azure Event Hubs
- Monitoring: Track system health and business metrics
Offline Analytics
Query the complete event history for deep analysis:
-- Example: Analyze book pricing trends over time
SELECT
DATE_TRUNC('month', timestamp) as month,
data->>'title' as book_title,
AVG((data->>'price')::numeric) as avg_price,
COUNT(*) as price_changes
FROM mt_events
WHERE type = 'BookUpdated'
AND data->>'price' IS NOT NULL
AND timestamp > NOW() - INTERVAL '1 year'
GROUP BY month, book_title
ORDER BY month DESC;
-- Example: Find books with most frequent updates
SELECT
stream_id,
COUNT(*) as total_events,
MAX(timestamp) as last_updated
FROM mt_events
WHERE type IN ('BookUpdated', 'BookAdded')
GROUP BY stream_id
ORDER BY total_events DESC
LIMIT 10;
Use Cases:
- Business Intelligence: Historical trends, patterns, and insights
- Data Warehousing: Export events to data lakes (Snowflake, BigQuery, Databricks)
- Machine Learning: Train models on historical behavior patterns
- Compliance & Auditing: Complete audit trail for regulatory requirements
- A/B Testing: Analyze impact of changes over time
Event Replay for New Projections
Create new analytics projections from existing events:
// Example: Build a new "Popular Books" projection from historical data
public class PopularBooksProjection : MultiStreamProjection<PopularBook, Guid>
{
public PopularBooksProjection()
{
Identity<BookAdded>(x => x.Id);
Identity<BookViewed>(x => x.BookId); // New event we're tracking
}
public PopularBook Create(BookAdded @event)
{
return new PopularBook
{
Id = @event.Id,
Title = @event.Title,
ViewCount = 0
};
}
public void Apply(BookViewed @event, PopularBook projection)
{
projection.ViewCount++;
projection.LastViewed = @event.Timestamp;
}
}
// Rebuild projection from all historical events
await daemon.RebuildProjectionAsync<PopularBooksProjection>(CancellationToken.None);
Benefits:
- No Data Loss: All historical data is preserved
- Retroactive Analysis: Answer questions you didn't think to ask before
- Schema Evolution: Add new projections without migrating old data
- Time Travel: Reconstruct system state at any point in history
Integration with Analytics Tools
Export to Data Warehouse:
// Example: Stream events to Snowflake/BigQuery
public class EventExporter
{
public async Task ExportEvents(DateTime since)
{
var events = await session.Events
.QueryAllRawEvents()
.Where(e => e.Timestamp > since)
.ToListAsync();
foreach (var evt in events)
{
await dataWarehouse.InsertAsync(new
{
EventId = evt.Id,
EventType = evt.EventType,
StreamId = evt.StreamId,
Data = evt.Data,
Timestamp = evt.Timestamp,
CorrelationId = evt.CorrelationId
});
}
}
}
Real-Time Streaming:
// Example: Publish events to Kafka for real-time processing
public class KafkaEventPublisher
{
public async Task PublishEvent(IEvent @event)
{
await kafkaProducer.ProduceAsync("book-events", new Message
{
Key = @event.StreamId.ToString(),
Value = JsonSerializer.Serialize(@event.Data),
Headers = new Headers
{
{ "event-type", Encoding.UTF8.GetBytes(@event.EventType) },
{ "correlation-id", Encoding.UTF8.GetBytes(@event.CorrelationId) }
}
});
}
}
Analytics Queries:
// Example: Complex analytics query
var bookLifecycleStats = await session.Events
.QueryAllRawEvents()
.Where(e => e.StreamId == bookId)
.Select(e => new
{
EventType = e.EventType,
Timestamp = e.Timestamp,
DaysSinceCreation = (e.Timestamp - streamCreated).TotalDays
})
.ToListAsync();
// Analyze: How long until first update? How many updates per month?
Best Practices for Analytics
- Use Projections for Performance: Don't query raw events for real-time dashboards
- Partition by Time: Use PostgreSQL table partitioning for large event stores
- Index Strategically: Add indexes on timestamp, event type, correlation ID
- Archive Old Events: Move historical events to cold storage (S3, Azure Blob)
- Stream to Analytics Platform: Use CDC (Change Data Capture) for real-time sync
Configuration
Program.cs Setup
using Marten;
using Marten.Events.Daemon;
using Marten.Events.Projections;
using Wolverine.Marten;
builder.Services.AddMarten(sp =>
{
var connectionString = builder.Configuration.GetConnectionString("bookstore")!;
var options = new StoreOptions();
options.Connection(connectionString);
// Enable metadata tracking
options.Events.MetadataConfig.CorrelationIdEnabled = true;
options.Events.MetadataConfig.CausationIdEnabled = true;
options.Events.MetadataConfig.HeadersEnabled = true;
// Configure JSON serialization
options.UseSystemTextJsonForSerialization(
EnumStorage.AsString,
Casing.CamelCase);
// Enable full-text search
options.Advanced.UseNGramSearchWithUnaccent = true;
// Register event types
options.Events.AddEventType<BookAdded>();
options.Events.AddEventType<BookUpdated>();
// Configure projections
options.Projections.Add<BookSearchProjectionBuilder>(
ProjectionLifecycle.Async);
// Configure indexes
options.Schema.For<BookSearchProjection>()
.Index(x => x.Title)
.NgramIndex(x => x.Title);
return options;
})
.UseLightweightSessions() // Faster sessions (no identity map)
.IntegrateWithWolverine(); // Wolverine integration
Key Configuration Options
| Option | Purpose |
|---|---|
MetadataConfig.CorrelationIdEnabled |
Track request workflows |
MetadataConfig.CausationIdEnabled |
Track event chains |
UseSystemTextJsonForSerialization |
JSON settings for events |
UseNGramSearchWithUnaccent |
Multilingual full-text search |
UseLightweightSessions() |
Better performance |
IntegrateWithWolverine() |
Auto-commit transactions |
Events
Creating Events
Events are immutable records that represent facts:
namespace BookStore.ApiService.Events;
/// <summary>
/// Event: A book was added to the catalog
/// </summary>
public record BookAdded(
Guid Id,
string Title,
string? Isbn,
string? Description,
DateOnly? PublicationDate,
Guid? PublisherId,
List<Guid> AuthorIds,
List<Guid> CategoryIds);
/// <summary>
/// Event: A book's information was updated
/// </summary>
public record BookUpdated(
Guid Id,
string Title,
string? Isbn,
string? Description,
DateOnly? PublicationDate,
Guid? PublisherId,
List<Guid> AuthorIds,
List<Guid> CategoryIds);
/// <summary>
/// Event: A book was soft-deleted
/// </summary>
public record BookSoftDeleted(Guid Id);
/// <summary>
/// Event: A deleted book was restored
/// </summary>
public record BookRestored(Guid Id);
Event Best Practices
- Use
recordtypes: Immutable by default - Past tense names:
BookAdded, notAddBook - Include all data: Events should be self-contained
- Never modify events: Events are immutable facts
- Add XML documentation: Explain business meaning
Registering Events
options.Events.AddEventType<BookAdded>();
options.Events.AddEventType<BookUpdated>();
options.Events.AddEventType<BookSoftDeleted>();
options.Events.AddEventType<BookRestored>();
Aggregates
What is an Aggregate?
An aggregate is a domain object that:
- Enforces business rules
- Builds state from events
- Generates new events
Creating an Aggregate
using Marten;
namespace BookStore.ApiService.Aggregates;
public class BookAggregate
{
// Current state
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string? Isbn { get; set; }
public bool IsDeleted { get; set; }
// Apply methods: Marten calls these to rebuild state
void Apply(BookAdded @event)
{
Id = @event.Id;
Title = @event.Title;
Isbn = @event.Isbn;
IsDeleted = false;
}
void Apply(BookUpdated @event)
{
Title = @event.Title;
Isbn = @event.Isbn;
}
void Apply(BookSoftDeleted @event)
{
IsDeleted = true;
}
void Apply(BookRestored @event)
{
IsDeleted = false;
}
// Command methods: Generate events
public static BookAdded Create(
Guid id,
string title,
string? isbn,
...)
{
// Validate business rules
ValidateTitle(title);
ValidateIsbn(isbn);
// Return event (not saved yet)
return new BookAdded(id, title, isbn, ...);
}
public BookUpdated Update(string title, string? isbn, ...)
{
// Business rule: cannot update deleted book
if (IsDeleted)
throw new InvalidOperationException("Cannot update a deleted book");
ValidateTitle(title);
ValidateIsbn(isbn);
return new BookUpdated(Id, title, isbn, ...);
}
public BookSoftDeleted SoftDelete()
{
if (IsDeleted)
throw new InvalidOperationException("Book is already deleted");
return new BookSoftDeleted(Id);
}
// Validation helpers
static void ValidateTitle(string title)
{
if (string.IsNullOrWhiteSpace(title))
throw new ArgumentException("Title is required", nameof(title));
if (title.Length > 500)
throw new ArgumentException("Title cannot exceed 500 characters", nameof(title));
}
static void ValidateIsbn(string? isbn)
{
if (string.IsNullOrWhiteSpace(isbn))
return; // ISBN is optional
var cleanIsbn = new string(isbn.Where(char.IsDigit).ToArray());
if (cleanIsbn.Length != 10 && cleanIsbn.Length != 13)
throw new ArgumentException("ISBN must be 10 or 13 digits", nameof(isbn));
}
}
Aggregate Patterns
Pattern 1: Static Factory Method (for creation):
public static BookAdded Create(Guid id, string title, ...)
{
// Validation
// Return event
}
Pattern 2: Instance Method (for updates):
public BookUpdated Update(string title, ...)
{
// Check current state
if (IsDeleted) throw ...
// Validation
// Return event
}
Pattern 3: Apply Methods (for state):
void Apply(BookAdded @event)
{
// Update state from event
Id = @event.Id;
Title = @event.Title;
}
Working with Streams
Note
For command/handler patterns with automatic transaction management, see the Wolverine Guide.
Starting a New Stream
Create a new event stream for an aggregate:
public static IResult Handle(CreateBook command, IDocumentSession session)
{
// Generate event from aggregate
var @event = BookAggregate.Create(
command.Id,
command.Title,
command.Isbn,
command.Description,
command.PublicationDate,
command.PublisherId,
command.AuthorIds,
command.CategoryIds);
// Start new stream with initial event
session.Events.StartStream<BookAggregate>(command.Id, @event);
// With Wolverine: auto-commits
// Without Wolverine: await session.SaveChangesAsync();
return Results.Created(
$"/api/admin/books/{command.Id}",
new { id = command.Id, correlationId = session.CorrelationId });
}
Key Points:
- Stream ID is typically the aggregate ID
- Use
Guid.CreateVersion7()for IDs (see performance section below) - First event initializes the stream
- Stream type (
<BookAggregate>) is metadata for Marten
Appending to Existing Stream
Add events to an existing stream:
public static async Task<IResult> Handle(
UpdateBook command,
IDocumentSession session)
{
// Load current aggregate state from all events
var aggregate = await session.Events
.AggregateStreamAsync<BookAggregate>(command.Id);
if (aggregate == null)
return Results.NotFound();
// Generate event (aggregate validates business rules)
var @event = aggregate.Update(
command.Title,
command.Isbn,
command.Description,
command.PublicationDate,
command.PublisherId,
command.AuthorIds,
command.CategoryIds);
// Append event to stream
session.Events.Append(command.Id, @event);
// With Wolverine: auto-commits
// Without Wolverine: await session.SaveChangesAsync();
return Results.NoContent();
}
Key Points:
AggregateStreamAsyncreplays all events to build current state- Aggregate validates business rules before returning event
Appendadds event to stream (increments version)- Stream version automatically increments
Appending Multiple Events
Append multiple events in a single transaction:
public static async Task<IResult> Handle(
PublishBook command,
IDocumentSession session)
{
var aggregate = await session.Events
.AggregateStreamAsync<BookAggregate>(command.Id);
if (aggregate == null)
return Results.NotFound();
// Generate multiple events
var updateEvent = aggregate.Update(...);
var publishEvent = aggregate.Publish();
// Append all events atomically
session.Events.Append(command.Id, updateEvent, publishEvent);
return Results.NoContent();
}
Loading Stream State
Get stream metadata without loading the aggregate:
// Get stream metadata (lightweight)
var streamState = await session.Events
.FetchStreamStateAsync(bookId);
if (streamState is not null)
{
Console.WriteLine($"Stream ID: {streamState.Id}");
Console.WriteLine($"Stream version: {streamState.Version}");
Console.WriteLine($"Aggregate type: {streamState.AggregateType}");
Console.WriteLine($"Created: {streamState.Created}");
Console.WriteLine($"Last updated: {streamState.LastTimestamp}");
Console.WriteLine($"Is archived: {streamState.IsArchived}");
}
else
{
Console.WriteLine("Stream not found");
}
Use Cases:
- Check if stream exists
- Get current version for ETags
- Validate stream before operations
- Audit trail queries
Querying Events
// Get all events for a stream (as domain events)
var events = await session.Events
.FetchStreamAsync(bookId);
foreach (var evt in events)
{
Console.WriteLine($"Version {evt.Version}: {evt.EventType}");
// Access event data
if (evt.Data is BookAdded added)
{
Console.WriteLine($"Book added: {added.Title}");
}
}
// Get events with full metadata
var eventsWithMetadata = await session.Events
.QueryAllRawEvents()
.Where(e => e.StreamId == bookId)
.OrderBy(e => e.Version)
.ToListAsync();
foreach (var evt in eventsWithMetadata)
{
Console.WriteLine($"Event: {evt.EventType}");
Console.WriteLine($"Version: {evt.Version}");
Console.WriteLine($"Timestamp: {evt.Timestamp}");
Console.WriteLine($"Correlation ID: {evt.CorrelationId}");
Console.WriteLine($"Causation ID: {evt.CausationId}");
}
// Query events across all streams
var recentEvents = await session.Events
.QueryAllRawEvents()
.Where(e => e.Timestamp > DateTime.UtcNow.AddHours(-1))
.OrderByDescending(e => e.Timestamp)
.Take(100)
.ToListAsync();
Stream Versioning for Optimistic Concurrency
Use stream versions to prevent lost updates:
public static async Task<IResult> Handle(
UpdateBook command,
IDocumentSession session,
HttpContext context)
{
// Get current stream version
var streamState = await session.Events.FetchStreamStateAsync(command.Id);
if (streamState == null)
return Results.NotFound();
// Check expected version (from ETag)
var currentETag = $"\"{streamState.Version}\"";
var ifMatch = context.Request.Headers["If-Match"].FirstOrDefault();
if (!string.IsNullOrEmpty(ifMatch) && ifMatch != currentETag)
{
return Results.Problem(
statusCode: 412,
title: "Precondition Failed",
detail: "The resource has been modified since you last retrieved it.");
}
// Update aggregate
var aggregate = await session.Events.AggregateStreamAsync<BookAggregate>(command.Id);
var @event = aggregate.Update(...);
session.Events.Append(command.Id, @event);
// Return new ETag
var newStreamState = await session.Events.FetchStreamStateAsync(command.Id);
context.Response.Headers["ETag"] = $"\"{newStreamState!.Version}\"";
return Results.NoContent();
}
See the ETag Guide for more details on optimistic concurrency.
Performance: Guid.CreateVersion7()
This project uses Version 7 GUIDs (time-ordered) for all aggregate IDs to automatically improve database performance.
Sequential vs Random GUIDs
Random GUIDs (Guid.NewGuid() - Version 4):
f47ac10b-58cc-4372-a567-0e02b2c3d479 ← Created first
a1b2c3d4-e5f6-4789-a012-3456789abcde ← Created second
2c9de4f1-8a3b-4c5d-9e7f-1a2b3c4d5e6f ← Created third
- Completely random order
- Causes database page splits
- Poor cache locality
- Scattered across B-tree index
Time-ordered GUIDs (Guid.CreateVersion7() - Version 7):
018d5e4a-7b2c-7000-8000-123456789abc ← Created first
018d5e4a-7b2d-7000-8000-234567890def ← Created second
018d5e4a-7b2e-7000-8000-345678901234 ← Created third
- Naturally sequential (first 48 bits = Unix timestamp)
- Minimal page splits
- Better cache locality
- Appended to end of B-tree index
Automatic Performance Benefits
No configuration required! PostgreSQL automatically benefits because:
Better Insert Performance
- Sequential inserts at end of B-tree
- Fewer page splits (10-30% faster on large tables)
- Less write amplification
Smaller Index Size
- Better packing efficiency
- Less fragmentation
- Reduced storage costs
Improved Query Performance
- Related records stored together
- Better cache hit rates
- Faster range scans
Temporal Queries
- GUID embeds timestamp
- Can filter by ID for time ranges
// Get books created in the last hour var oneHourAgo = Guid.CreateVersion7(DateTime.UtcNow.AddHours(-1)); var recentBooks = await session.Query<BookSearchProjection>() .Where(b => b.Id.CompareTo(oneHourAgo) > 0) .ToListAsync();
Real-World Impact
Inserting 1 million books:
With Random GUIDs:
- Records scattered across 1000s of database pages
- Constant page splits and reorganization
- Poor sequential read performance
- Higher I/O overhead
With Time-ordered GUIDs:
- Records appended sequentially
- Minimal page splits
- Excellent sequential read performance
- Lower I/O overhead
Standard Compliance
- Follows RFC 9562 (UUID Version 7)
- Native support in .NET 9+
- Backward compatible with standard GUIDs
- No special database configuration needed
- Works with any PostgreSQL version
Usage in Commands
public record CreateBook(
string Title,
string? Isbn,
...)
{
// Auto-generate time-ordered ID
public Guid Id { get; init; } = Guid.CreateVersion7();
}
See the Wolverine Guide for more on command patterns.
Projections
What are Projections?
Projections transform events into read models optimized for queries.
Events (Write Model) Projections (Read Model)
───────────────────── ────────────────────────
BookAdded → BookSearchProjection
BookUpdated → - Title
AuthorUpdated → - AuthorNames (denormalized)
PublisherUpdated → - PublisherName (denormalized)
- SearchText (computed)
Creating a Simple Projection
using Marten.Events.Aggregation;
using Marten.Events.Projections;
namespace BookStore.ApiService.Projections;
// Read model
public class AuthorProjection
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
public string? Biography { get; set; }
public DateOnly? BirthDate { get; set; }
}
// Projection builder
public class AuthorProjectionBuilder : SingleStreamProjection<AuthorProjection>
{
// Create projection from first event
public AuthorProjection Create(AuthorAdded @event)
{
return new AuthorProjection
{
Id = @event.Id,
Name = @event.Name,
Biography = @event.Biography,
BirthDate = @event.BirthDate
};
}
// Update projection from subsequent events
public void Apply(AuthorUpdated @event, AuthorProjection projection)
{
projection.Name = @event.Name;
projection.Biography = @event.Biography;
projection.BirthDate = @event.BirthDate;
}
// Delete projection
public void Apply(AuthorSoftDeleted @event, AuthorProjection projection)
{
// Projection will be deleted
}
}
Multi-Stream Projections
Listen to events from multiple streams:
public class BookSearchProjectionBuilder : MultiStreamProjection<BookSearchProjection, Guid>
{
public BookSearchProjectionBuilder()
{
// Listen to book events
Identity<BookAdded>(x => x.Id);
Identity<BookUpdated>(x => x.Id);
DeleteEvent<BookSoftDeleted>();
// Listen to publisher events (to update denormalized data)
Identity<PublisherUpdated>(x => x.Id);
// Listen to author events
Identity<AuthorUpdated>(x => x.Id);
}
public async Task<BookSearchProjection> Create(
BookAdded @event,
IQuerySession session)
{
var projection = new BookSearchProjection
{
Id = @event.Id,
Title = @event.Title,
PublisherId = @event.PublisherId,
AuthorIds = @event.AuthorIds
};
// Denormalize publisher name
if (projection.PublisherId.HasValue)
{
var publisher = await session
.LoadAsync<PublisherProjection>(projection.PublisherId.Value);
projection.PublisherName = publisher?.Name;
}
// Denormalize author names
if (projection.AuthorIds.Any())
{
var authors = await session.Query<AuthorProjection>()
.Where(a => projection.AuthorIds.Contains(a.Id))
.ToListAsync();
projection.AuthorNames = string.Join(", ", authors.Select(a => a.Name));
}
// Compute search text
projection.SearchText = $"{projection.Title} {projection.AuthorNames} {projection.PublisherName}";
return projection;
}
public async Task Apply(
BookUpdated @event,
BookSearchProjection projection,
IQuerySession session)
{
projection.Title = @event.Title;
projection.PublisherId = @event.PublisherId;
projection.AuthorIds = @event.AuthorIds;
// Re-denormalize (IDs may have changed)
// ... same logic as Create
}
// Update when publisher name changes
void Apply(PublisherUpdated @event, BookSearchProjection projection)
{
if (projection.PublisherId == @event.Id)
{
projection.PublisherName = @event.Name;
projection.SearchText = $"{projection.Title} {projection.AuthorNames} {projection.PublisherName}";
}
}
}
Projection Lifecycle
| Lifecycle | When to Use |
|---|---|
Inline |
Update immediately (slower writes) |
Async |
Update asynchronously (faster writes, eventual consistency) |
Live |
Real-time updates via WebSockets |
Our Choice: Async for all projections
options.Projections.Add<BookSearchProjectionBuilder>(ProjectionLifecycle.Async);
Querying Projections
// Simple query
var books = await session.Query<BookSearchProjection>()
.Where(b => !b.IsDeleted)
.OrderBy(b => b.Title)
.ToListAsync();
// Full-text search
var results = await session.Query<BookSearchProjection>()
.Where(b => b.SearchText.PlainTextSearch("clean code"))
.ToListAsync();
// Pagination
var page = await session.Query<BookSearchProjection>()
.Where(b => !b.IsDeleted)
.OrderBy(b => b.Title)
.Skip((pageNumber - 1) * pageSize)
.Take(pageSize)
.ToListAsync();
Metadata Tracking
Correlation and Causation IDs
Track request workflows and event chains:
// Enable in configuration
options.Events.MetadataConfig.CorrelationIdEnabled = true;
options.Events.MetadataConfig.CausationIdEnabled = true;
Correlation ID: Tracks entire workflow
User Request → CreateBook → BookAdded → UpdateSearchProjection
↓ ↓ ↓ ↓
corr-123 corr-123 corr-123 corr-123
Causation ID: Tracks event chains
BookAdded (event-1)
↓ causes
UpdateSearchProjection (event-2, causation: event-1)
↓ causes
SendNotification (event-3, causation: event-2)
Setting Metadata
Use middleware to set correlation ID:
public class MartenMetadataMiddleware
{
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
var session = context.RequestServices.GetService<IDocumentSession>();
if (session is not null)
{
// Set correlation ID from header or generate new one
var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault()
?? Guid.NewGuid().ToString();
session.CorrelationId = correlationId;
// Add to response headers
context.Response.Headers["X-Correlation-ID"] = correlationId;
}
await next(context);
}
}
// Register middleware
app.UseMartenMetadata();
Querying Metadata
var events = await session.Events
.QueryAllRawEvents()
.Where(e => e.CorrelationId == "corr-123")
.ToListAsync();
foreach (var evt in events)
{
Console.WriteLine($"Event: {evt.EventType}");
Console.WriteLine($"Correlation: {evt.CorrelationId}");
Console.WriteLine($"Causation: {evt.CausationId}");
Console.WriteLine($"Timestamp: {evt.Timestamp}");
}
Optimistic Concurrency
Using Stream Versions as ETags
public static async Task<IResult> Handle(
UpdateBook command,
IDocumentSession session,
HttpContext context)
{
// Get current stream version
var streamState = await session.Events.FetchStreamStateAsync(command.Id);
if (streamState == null)
return Results.NotFound();
// Generate ETag from version
var currentETag = ETagHelper.GenerateETag(streamState.Version);
// Check If-Match header
if (!string.IsNullOrEmpty(command.ETag) &&
!ETagHelper.CheckIfMatch(context, currentETag))
{
return ETagHelper.PreconditionFailed();
}
// Update aggregate
var aggregate = await session.Events.AggregateStreamAsync<BookAggregate>(command.Id);
var @event = aggregate.Update(...);
session.Events.Append(command.Id, @event);
// Return new ETag
var newStreamState = await session.Events.FetchStreamStateAsync(command.Id);
var newETag = ETagHelper.GenerateETag(newStreamState!.Version);
ETagHelper.AddETagHeader(context, newETag);
return Results.NoContent();
}
ETag Helper
public static class ETagHelper
{
public static string GenerateETag(int version)
=> $"\"{version}\"";
public static bool CheckIfMatch(HttpContext context, string currentETag)
{
var ifMatch = context.Request.Headers["If-Match"].FirstOrDefault();
return ifMatch == currentETag;
}
public static void AddETagHeader(HttpContext context, string etag)
{
context.Response.Headers["ETag"] = etag;
}
public static IResult PreconditionFailed()
=> Results.Problem(
statusCode: 412,
title: "Precondition Failed",
detail: "The resource has been modified since you last retrieved it.");
}
Full-Text Search
NGram Search Configuration
// Enable NGram search with unaccent (multilingual)
options.Advanced.UseNGramSearchWithUnaccent = true;
// Add NGram indexes
options.Schema.For<BookSearchProjection>()
.NgramIndex(x => x.Title)
.NgramIndex(x => x.AuthorNames);
Searching
// Plain text search (uses NGram indexes)
var results = await session.Query<BookSearchProjection>()
.Where(b => b.SearchText.PlainTextSearch("clean code"))
.ToListAsync();
// Search with ranking
var rankedResults = await session.Query<BookSearchProjection>()
.Where(b => b.SearchText.PlainTextSearch("clean code"))
.OrderByDescending(b => b.SearchText.PlainTextSearch("clean code"))
.ToListAsync();
// Accent-insensitive search (automatic with UseNGramSearchWithUnaccent)
var results = await session.Query<BookSearchProjection>()
.Where(b => b.Title.PlainTextSearch("café")) // Matches "cafe", "café", "Café"
.ToListAsync();
Advanced Features
Document Store
Marten can also store documents (not just events):
// Store document
var book = new Book { Id = Guid.NewGuid(), Title = "Clean Code" };
session.Store(book);
await session.SaveChangesAsync();
// Load document
var loaded = await session.LoadAsync<Book>(book.Id);
// Query documents
var books = await session.Query<Book>()
.Where(b => b.Title.Contains("Code"))
.ToListAsync();
Soft Deletes
// Configure soft deletes
options.Schema.For<BookSearchProjection>()
.SoftDeleted();
// Soft delete
session.Delete(book); // Sets deleted flag
// Query non-deleted
var active = await session.Query<Book>()
.Where(b => !b.Deleted)
.ToListAsync();
Indexes
// B-tree index (exact matches, sorting)
options.Schema.For<BookSearchProjection>()
.Index(x => x.Title);
// GIN index (JSON fields)
options.Schema.For<BookSearchProjection>()
.GinIndexJsonData();
// NGram index (full-text search)
options.Schema.For<BookSearchProjection>()
.NgramIndex(x => x.Title);
// Composite index
options.Schema.For<BookSearchProjection>()
.Index(x => x.PublisherId, x => x.Title);
Best Practices
1. Event Design
- ✅ Use past tense names (
BookAdded, notAddBook) - ✅ Include all necessary data in events
- ✅ Never modify events (immutable)
- ✅ Version events when schema changes needed
2. Aggregate Design
- ✅ Keep aggregates focused (single responsibility)
- ✅ Validate in command methods
- ✅ Apply methods should only update state
- ✅ Use static factory methods for creation
3. Projection Design
- ✅ Denormalize for query performance
- ✅ Use async projections for scalability
- ✅ Keep projections simple (no business logic)
- ✅ Create separate projections for different queries
4. Session Management
- ✅ Use
UseLightweightSessions()for better performance - ✅ Let Wolverine manage transactions
- ✅ Don't manually call
SaveChangesAsync()with Wolverine - ✅ Use
IQuerySessionfor read-only operations
5. Performance
- ✅ Use indexes on frequently queried fields
- ✅ Use NGram indexes for full-text search
- ✅ Denormalize data in projections
- ✅ Use async projections to avoid blocking writes
Troubleshooting
Projection Not Updating
Problem: Projection not reflecting latest events
Solution:
- Check projection daemon status:
GET /api/admin/projections/status - Rebuild projection:
GET /api/admin/projections/rebuild/BookSearchProjection - Check for exceptions in logs
Events Not Saving
Problem: Events not appearing in database
Solution:
- Verify
IntegrateWithWolverine()is called - Check for exceptions in handler
- Ensure event types are registered
- Verify connection string
Slow Queries
Problem: Projection queries are slow
Solution:
- Add indexes on queried fields
- Use NGram indexes for text search
- Denormalize more data
- Check query execution plan
Summary
Marten provides:
- ✅ Event store on PostgreSQL
- ✅ Document database capabilities
- ✅ Async projections for read models
- ✅ Full-text search with NGram
- ✅ Metadata tracking (correlation/causation)
- ✅ Optimistic concurrency with stream versions
Key Concepts:
- Events = Immutable facts
- Aggregates = Domain logic + state
- Streams = Event sequences
- Projections = Read models
- Sessions = Unit of work
Next Steps
- Event Sourcing Guide - Event sourcing concepts and patterns
- Wolverine Guide - Command/handler pattern with automatic transaction management
- Architecture Overview - System design and patterns
- ETag Guide - Optimistic concurrency with stream versions
- Correlation/Causation Guide - Distributed tracing with metadata