Table of Contents

Marten Integration Guide

Overview

The Book Store API uses Marten - a .NET library that turns PostgreSQL into a powerful document database and event store. Marten provides the foundation for our event sourcing architecture.

Why Marten?

Benefits

Event Store on PostgreSQL: No additional infrastructure needed
Document Database: Store and query JSON documents
Event Sourcing: Built-in support for event streams
Async Projections: Automatically build read models from events
Strong .NET Integration: LINQ support, type-safe queries
Metadata Tracking: Correlation/causation IDs for distributed tracing

What Marten Provides

  1. Event Store: Append-only event log with stream management
  2. Document Store: Store and query JSON documents
  3. Projections: Transform events into read models
  4. Session Management: Unit of work pattern
  5. Schema Management: Automatic database schema creation

Event Sourcing Basics

Note

For a comprehensive introduction to event sourcing concepts and patterns, see the Event Sourcing Guide. This section focuses on Marten-specific implementation.

What is Event Sourcing?

Instead of storing current state, we store all changes as immutable events.

Traditional Approach (CRUD):

// Update book title
book.Title = "New Title";
await db.SaveChangesAsync();
// Old title is lost forever

Event Sourcing Approach:

// Append event to stream
var @event = new BookUpdated(bookId, "New Title", ...);
session.Events.Append(bookId, @event);
await session.SaveChangesAsync();
// All history preserved

Event Store Structure

Marten creates these PostgreSQL tables:

mt_events        → All events (append-only)
mt_streams       → Stream metadata (version, type, etc.)
mt_doc_*         → Document tables (projections)

Example Event Record:

SELECT id, stream_id, type, data, version, timestamp 
FROM mt_events 
WHERE stream_id = '018d5e4a-7b2c-7000-8000-123456789abc';
id stream_id type data version timestamp
1 book-123 BookAdded {...} 1 2025-01-01
2 book-123 BookUpdated {...} 2 2025-01-02

Event Sourcing for Analytics

One of the most powerful benefits of event sourcing is the ability to perform real-time and offline data analysis on the complete event history.

Real-Time Analytics

Stream events to analytics systems as they happen:

// Example: Real-time book sales tracking
public class BookSalesAnalytics
{
    public async Task ProcessEvent(IEvent @event)
    {
        switch (@event.Data)
        {
            case BookAdded added:
                await UpdateCatalogMetrics(added);
                break;
            case BookUpdated updated:
                await TrackPriceChanges(updated);
                break;
            case BookSoftDeleted deleted:
                await RecordDeletion(deleted);
                break;
        }
    }
}

Use Cases:

  • Live Dashboards: Real-time inventory, sales, and user activity
  • Alerting: Trigger notifications on specific events (low stock, price changes)
  • Streaming Analytics: Process events with tools like Apache Kafka, Azure Event Hubs
  • Monitoring: Track system health and business metrics

Offline Analytics

Query the complete event history for deep analysis:

-- Example: Analyze book pricing trends over time
SELECT 
    DATE_TRUNC('month', timestamp) as month,
    data->>'title' as book_title,
    AVG((data->>'price')::numeric) as avg_price,
    COUNT(*) as price_changes
FROM mt_events
WHERE type = 'BookUpdated'
    AND data->>'price' IS NOT NULL
    AND timestamp > NOW() - INTERVAL '1 year'
GROUP BY month, book_title
ORDER BY month DESC;

-- Example: Find books with most frequent updates
SELECT 
    stream_id,
    COUNT(*) as total_events,
    MAX(timestamp) as last_updated
FROM mt_events
WHERE type IN ('BookUpdated', 'BookAdded')
GROUP BY stream_id
ORDER BY total_events DESC
LIMIT 10;

Use Cases:

  • Business Intelligence: Historical trends, patterns, and insights
  • Data Warehousing: Export events to data lakes (Snowflake, BigQuery, Databricks)
  • Machine Learning: Train models on historical behavior patterns
  • Compliance & Auditing: Complete audit trail for regulatory requirements
  • A/B Testing: Analyze impact of changes over time

Event Replay for New Projections

Create new analytics projections from existing events:

// Example: Build a new "Popular Books" projection from historical data
public class PopularBooksProjection : MultiStreamProjection<PopularBook, Guid>
{
    public PopularBooksProjection()
    {
        Identity<BookAdded>(x => x.Id);
        Identity<BookViewed>(x => x.BookId);  // New event we're tracking
    }
    
    public PopularBook Create(BookAdded @event)
    {
        return new PopularBook
        {
            Id = @event.Id,
            Title = @event.Title,
            ViewCount = 0
        };
    }
    
    public void Apply(BookViewed @event, PopularBook projection)
    {
        projection.ViewCount++;
        projection.LastViewed = @event.Timestamp;
    }
}

// Rebuild projection from all historical events
await daemon.RebuildProjectionAsync<PopularBooksProjection>(CancellationToken.None);

Benefits:

  • No Data Loss: All historical data is preserved
  • Retroactive Analysis: Answer questions you didn't think to ask before
  • Schema Evolution: Add new projections without migrating old data
  • Time Travel: Reconstruct system state at any point in history

Integration with Analytics Tools

Export to Data Warehouse:

// Example: Stream events to Snowflake/BigQuery
public class EventExporter
{
    public async Task ExportEvents(DateTime since)
    {
        var events = await session.Events
            .QueryAllRawEvents()
            .Where(e => e.Timestamp > since)
            .ToListAsync();
        
        foreach (var evt in events)
        {
            await dataWarehouse.InsertAsync(new
            {
                EventId = evt.Id,
                EventType = evt.EventType,
                StreamId = evt.StreamId,
                Data = evt.Data,
                Timestamp = evt.Timestamp,
                CorrelationId = evt.CorrelationId
            });
        }
    }
}

Real-Time Streaming:

// Example: Publish events to Kafka for real-time processing
public class KafkaEventPublisher
{
    public async Task PublishEvent(IEvent @event)
    {
        await kafkaProducer.ProduceAsync("book-events", new Message
        {
            Key = @event.StreamId.ToString(),
            Value = JsonSerializer.Serialize(@event.Data),
            Headers = new Headers
            {
                { "event-type", Encoding.UTF8.GetBytes(@event.EventType) },
                { "correlation-id", Encoding.UTF8.GetBytes(@event.CorrelationId) }
            }
        });
    }
}

Analytics Queries:

// Example: Complex analytics query
var bookLifecycleStats = await session.Events
    .QueryAllRawEvents()
    .Where(e => e.StreamId == bookId)
    .Select(e => new
    {
        EventType = e.EventType,
        Timestamp = e.Timestamp,
        DaysSinceCreation = (e.Timestamp - streamCreated).TotalDays
    })
    .ToListAsync();

// Analyze: How long until first update? How many updates per month?

Best Practices for Analytics

  1. Use Projections for Performance: Don't query raw events for real-time dashboards
  2. Partition by Time: Use PostgreSQL table partitioning for large event stores
  3. Index Strategically: Add indexes on timestamp, event type, correlation ID
  4. Archive Old Events: Move historical events to cold storage (S3, Azure Blob)
  5. Stream to Analytics Platform: Use CDC (Change Data Capture) for real-time sync

Configuration

Program.cs Setup

using Marten;
using Marten.Events.Daemon;
using Marten.Events.Projections;
using Wolverine.Marten;

builder.Services.AddMarten(sp =>
{
    var connectionString = builder.Configuration.GetConnectionString("bookstore")!;
    
    var options = new StoreOptions();
    options.Connection(connectionString);
    
    // Enable metadata tracking
    options.Events.MetadataConfig.CorrelationIdEnabled = true;
    options.Events.MetadataConfig.CausationIdEnabled = true;
    options.Events.MetadataConfig.HeadersEnabled = true;
    
    // Configure JSON serialization
    options.UseSystemTextJsonForSerialization(
        EnumStorage.AsString, 
        Casing.CamelCase);
    
    // Enable full-text search
    options.Advanced.UseNGramSearchWithUnaccent = true;
    
    // Register event types
    options.Events.AddEventType<BookAdded>();
    options.Events.AddEventType<BookUpdated>();
    
    // Configure projections
    options.Projections.Add<BookSearchProjectionBuilder>(
        ProjectionLifecycle.Async);
    
    // Configure indexes
    options.Schema.For<BookSearchProjection>()
        .Index(x => x.Title)
        .NgramIndex(x => x.Title);
    
    return options;
})
.UseLightweightSessions()      // Faster sessions (no identity map)
.IntegrateWithWolverine();     // Wolverine integration

Key Configuration Options

Option Purpose
MetadataConfig.CorrelationIdEnabled Track request workflows
MetadataConfig.CausationIdEnabled Track event chains
UseSystemTextJsonForSerialization JSON settings for events
UseNGramSearchWithUnaccent Multilingual full-text search
UseLightweightSessions() Better performance
IntegrateWithWolverine() Auto-commit transactions

Events

Creating Events

Events are immutable records that represent facts:

namespace BookStore.ApiService.Events;

/// <summary>
/// Event: A book was added to the catalog
/// </summary>
public record BookAdded(
    Guid Id,
    string Title,
    string? Isbn,
    string? Description,
    DateOnly? PublicationDate,
    Guid? PublisherId,
    List<Guid> AuthorIds,
    List<Guid> CategoryIds);

/// <summary>
/// Event: A book's information was updated
/// </summary>
public record BookUpdated(
    Guid Id,
    string Title,
    string? Isbn,
    string? Description,
    DateOnly? PublicationDate,
    Guid? PublisherId,
    List<Guid> AuthorIds,
    List<Guid> CategoryIds);

/// <summary>
/// Event: A book was soft-deleted
/// </summary>
public record BookSoftDeleted(Guid Id);

/// <summary>
/// Event: A deleted book was restored
/// </summary>
public record BookRestored(Guid Id);

Event Best Practices

  1. Use record types: Immutable by default
  2. Past tense names: BookAdded, not AddBook
  3. Include all data: Events should be self-contained
  4. Never modify events: Events are immutable facts
  5. Add XML documentation: Explain business meaning

Registering Events

options.Events.AddEventType<BookAdded>();
options.Events.AddEventType<BookUpdated>();
options.Events.AddEventType<BookSoftDeleted>();
options.Events.AddEventType<BookRestored>();

Aggregates

What is an Aggregate?

An aggregate is a domain object that:

  • Enforces business rules
  • Builds state from events
  • Generates new events

Creating an Aggregate

using Marten;

namespace BookStore.ApiService.Aggregates;

public class BookAggregate
{
    // Current state
    public Guid Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public string? Isbn { get; set; }
    public bool IsDeleted { get; set; }
    
    // Apply methods: Marten calls these to rebuild state
    void Apply(BookAdded @event)
    {
        Id = @event.Id;
        Title = @event.Title;
        Isbn = @event.Isbn;
        IsDeleted = false;
    }
    
    void Apply(BookUpdated @event)
    {
        Title = @event.Title;
        Isbn = @event.Isbn;
    }
    
    void Apply(BookSoftDeleted @event)
    {
        IsDeleted = true;
    }
    
    void Apply(BookRestored @event)
    {
        IsDeleted = false;
    }
    
    // Command methods: Generate events
    public static BookAdded Create(
        Guid id,
        string title,
        string? isbn,
        ...)
    {
        // Validate business rules
        ValidateTitle(title);
        ValidateIsbn(isbn);
        
        // Return event (not saved yet)
        return new BookAdded(id, title, isbn, ...);
    }
    
    public BookUpdated Update(string title, string? isbn, ...)
    {
        // Business rule: cannot update deleted book
        if (IsDeleted)
            throw new InvalidOperationException("Cannot update a deleted book");
        
        ValidateTitle(title);
        ValidateIsbn(isbn);
        
        return new BookUpdated(Id, title, isbn, ...);
    }
    
    public BookSoftDeleted SoftDelete()
    {
        if (IsDeleted)
            throw new InvalidOperationException("Book is already deleted");
        
        return new BookSoftDeleted(Id);
    }
    
    // Validation helpers
    static void ValidateTitle(string title)
    {
        if (string.IsNullOrWhiteSpace(title))
            throw new ArgumentException("Title is required", nameof(title));
        
        if (title.Length > 500)
            throw new ArgumentException("Title cannot exceed 500 characters", nameof(title));
    }
    
    static void ValidateIsbn(string? isbn)
    {
        if (string.IsNullOrWhiteSpace(isbn))
            return; // ISBN is optional
        
        var cleanIsbn = new string(isbn.Where(char.IsDigit).ToArray());
        
        if (cleanIsbn.Length != 10 && cleanIsbn.Length != 13)
            throw new ArgumentException("ISBN must be 10 or 13 digits", nameof(isbn));
    }
}

Aggregate Patterns

Pattern 1: Static Factory Method (for creation):

public static BookAdded Create(Guid id, string title, ...)
{
    // Validation
    // Return event
}

Pattern 2: Instance Method (for updates):

public BookUpdated Update(string title, ...)
{
    // Check current state
    if (IsDeleted) throw ...
    
    // Validation
    // Return event
}

Pattern 3: Apply Methods (for state):

void Apply(BookAdded @event)
{
    // Update state from event
    Id = @event.Id;
    Title = @event.Title;
}

Working with Streams

Note

For command/handler patterns with automatic transaction management, see the Wolverine Guide.

Starting a New Stream

Create a new event stream for an aggregate:

public static IResult Handle(CreateBook command, IDocumentSession session)
{
    // Generate event from aggregate
    var @event = BookAggregate.Create(
        command.Id,
        command.Title,
        command.Isbn,
        command.Description,
        command.PublicationDate,
        command.PublisherId,
        command.AuthorIds,
        command.CategoryIds);
    
    // Start new stream with initial event
    session.Events.StartStream<BookAggregate>(command.Id, @event);
    
    // With Wolverine: auto-commits
    // Without Wolverine: await session.SaveChangesAsync();
    
    return Results.Created(
        $"/api/admin/books/{command.Id}",
        new { id = command.Id, correlationId = session.CorrelationId });
}

Key Points:

  • Stream ID is typically the aggregate ID
  • Use Guid.CreateVersion7() for IDs (see performance section below)
  • First event initializes the stream
  • Stream type (<BookAggregate>) is metadata for Marten

Appending to Existing Stream

Add events to an existing stream:

public static async Task<IResult> Handle(
    UpdateBook command,
    IDocumentSession session)
{
    // Load current aggregate state from all events
    var aggregate = await session.Events
        .AggregateStreamAsync<BookAggregate>(command.Id);
    
    if (aggregate == null)
        return Results.NotFound();
    
    // Generate event (aggregate validates business rules)
    var @event = aggregate.Update(
        command.Title,
        command.Isbn,
        command.Description,
        command.PublicationDate,
        command.PublisherId,
        command.AuthorIds,
        command.CategoryIds);
    
    // Append event to stream
    session.Events.Append(command.Id, @event);
    
    // With Wolverine: auto-commits
    // Without Wolverine: await session.SaveChangesAsync();
    
    return Results.NoContent();
}

Key Points:

  • AggregateStreamAsync replays all events to build current state
  • Aggregate validates business rules before returning event
  • Append adds event to stream (increments version)
  • Stream version automatically increments

Appending Multiple Events

Append multiple events in a single transaction:

public static async Task<IResult> Handle(
    PublishBook command,
    IDocumentSession session)
{
    var aggregate = await session.Events
        .AggregateStreamAsync<BookAggregate>(command.Id);
    
    if (aggregate == null)
        return Results.NotFound();
    
    // Generate multiple events
    var updateEvent = aggregate.Update(...);
    var publishEvent = aggregate.Publish();
    
    // Append all events atomically
    session.Events.Append(command.Id, updateEvent, publishEvent);
    
    return Results.NoContent();
}

Loading Stream State

Get stream metadata without loading the aggregate:

// Get stream metadata (lightweight)
var streamState = await session.Events
    .FetchStreamStateAsync(bookId);

if (streamState is not null)
{
    Console.WriteLine($"Stream ID: {streamState.Id}");
    Console.WriteLine($"Stream version: {streamState.Version}");
    Console.WriteLine($"Aggregate type: {streamState.AggregateType}");
    Console.WriteLine($"Created: {streamState.Created}");
    Console.WriteLine($"Last updated: {streamState.LastTimestamp}");
    Console.WriteLine($"Is archived: {streamState.IsArchived}");
}
else
{
    Console.WriteLine("Stream not found");
}

Use Cases:

  • Check if stream exists
  • Get current version for ETags
  • Validate stream before operations
  • Audit trail queries

Querying Events

// Get all events for a stream (as domain events)
var events = await session.Events
    .FetchStreamAsync(bookId);

foreach (var evt in events)
{
    Console.WriteLine($"Version {evt.Version}: {evt.EventType}");
    
    // Access event data
    if (evt.Data is BookAdded added)
    {
        Console.WriteLine($"Book added: {added.Title}");
    }
}

// Get events with full metadata
var eventsWithMetadata = await session.Events
    .QueryAllRawEvents()
    .Where(e => e.StreamId == bookId)
    .OrderBy(e => e.Version)
    .ToListAsync();

foreach (var evt in eventsWithMetadata)
{
    Console.WriteLine($"Event: {evt.EventType}");
    Console.WriteLine($"Version: {evt.Version}");
    Console.WriteLine($"Timestamp: {evt.Timestamp}");
    Console.WriteLine($"Correlation ID: {evt.CorrelationId}");
    Console.WriteLine($"Causation ID: {evt.CausationId}");
}

// Query events across all streams
var recentEvents = await session.Events
    .QueryAllRawEvents()
    .Where(e => e.Timestamp > DateTime.UtcNow.AddHours(-1))
    .OrderByDescending(e => e.Timestamp)
    .Take(100)
    .ToListAsync();

Stream Versioning for Optimistic Concurrency

Use stream versions to prevent lost updates:

public static async Task<IResult> Handle(
    UpdateBook command,
    IDocumentSession session,
    HttpContext context)
{
    // Get current stream version
    var streamState = await session.Events.FetchStreamStateAsync(command.Id);
    if (streamState == null)
        return Results.NotFound();
    
    // Check expected version (from ETag)
    var currentETag = $"\"{streamState.Version}\"";
    var ifMatch = context.Request.Headers["If-Match"].FirstOrDefault();
    
    if (!string.IsNullOrEmpty(ifMatch) && ifMatch != currentETag)
    {
        return Results.Problem(
            statusCode: 412,
            title: "Precondition Failed",
            detail: "The resource has been modified since you last retrieved it.");
    }
    
    // Update aggregate
    var aggregate = await session.Events.AggregateStreamAsync<BookAggregate>(command.Id);
    var @event = aggregate.Update(...);
    session.Events.Append(command.Id, @event);
    
    // Return new ETag
    var newStreamState = await session.Events.FetchStreamStateAsync(command.Id);
    context.Response.Headers["ETag"] = $"\"{newStreamState!.Version}\"";
    
    return Results.NoContent();
}

See the ETag Guide for more details on optimistic concurrency.

Performance: Guid.CreateVersion7()

This project uses Version 7 GUIDs (time-ordered) for all aggregate IDs to automatically improve database performance.

Sequential vs Random GUIDs

Random GUIDs (Guid.NewGuid() - Version 4):

f47ac10b-58cc-4372-a567-0e02b2c3d479  ← Created first
a1b2c3d4-e5f6-4789-a012-3456789abcde  ← Created second
2c9de4f1-8a3b-4c5d-9e7f-1a2b3c4d5e6f  ← Created third
  • Completely random order
  • Causes database page splits
  • Poor cache locality
  • Scattered across B-tree index

Time-ordered GUIDs (Guid.CreateVersion7() - Version 7):

018d5e4a-7b2c-7000-8000-123456789abc  ← Created first
018d5e4a-7b2d-7000-8000-234567890def  ← Created second  
018d5e4a-7b2e-7000-8000-345678901234  ← Created third
  • Naturally sequential (first 48 bits = Unix timestamp)
  • Minimal page splits
  • Better cache locality
  • Appended to end of B-tree index

Automatic Performance Benefits

No configuration required! PostgreSQL automatically benefits because:

  1. Better Insert Performance

    • Sequential inserts at end of B-tree
    • Fewer page splits (10-30% faster on large tables)
    • Less write amplification
  2. Smaller Index Size

    • Better packing efficiency
    • Less fragmentation
    • Reduced storage costs
  3. Improved Query Performance

    • Related records stored together
    • Better cache hit rates
    • Faster range scans
  4. Temporal Queries

    • GUID embeds timestamp
    • Can filter by ID for time ranges
    // Get books created in the last hour
    var oneHourAgo = Guid.CreateVersion7(DateTime.UtcNow.AddHours(-1));
    var recentBooks = await session.Query<BookSearchProjection>()
        .Where(b => b.Id.CompareTo(oneHourAgo) > 0)
        .ToListAsync();
    

Real-World Impact

Inserting 1 million books:

With Random GUIDs:

  • Records scattered across 1000s of database pages
  • Constant page splits and reorganization
  • Poor sequential read performance
  • Higher I/O overhead

With Time-ordered GUIDs:

  • Records appended sequentially
  • Minimal page splits
  • Excellent sequential read performance
  • Lower I/O overhead

Standard Compliance

  • Follows RFC 9562 (UUID Version 7)
  • Native support in .NET 9+
  • Backward compatible with standard GUIDs
  • No special database configuration needed
  • Works with any PostgreSQL version

Usage in Commands

public record CreateBook(
    string Title,
    string? Isbn,
    ...)
{
    // Auto-generate time-ordered ID
    public Guid Id { get; init; } = Guid.CreateVersion7();
}

See the Wolverine Guide for more on command patterns.

Projections

What are Projections?

Projections transform events into read models optimized for queries.

Events (Write Model)          Projections (Read Model)
─────────────────────         ────────────────────────
BookAdded                  →  BookSearchProjection
BookUpdated                →  - Title
AuthorUpdated              →  - AuthorNames (denormalized)
PublisherUpdated           →  - PublisherName (denormalized)
                              - SearchText (computed)

Creating a Simple Projection

using Marten.Events.Aggregation;
using Marten.Events.Projections;

namespace BookStore.ApiService.Projections;

// Read model
public class AuthorProjection
{
    public Guid Id { get; set; }
    public string Name { get; set; } = string.Empty;
    public string? Biography { get; set; }
    public DateOnly? BirthDate { get; set; }
}

// Projection builder
public class AuthorProjectionBuilder : SingleStreamProjection<AuthorProjection>
{
    // Create projection from first event
    public AuthorProjection Create(AuthorAdded @event)
    {
        return new AuthorProjection
        {
            Id = @event.Id,
            Name = @event.Name,
            Biography = @event.Biography,
            BirthDate = @event.BirthDate
        };
    }
    
    // Update projection from subsequent events
    public void Apply(AuthorUpdated @event, AuthorProjection projection)
    {
        projection.Name = @event.Name;
        projection.Biography = @event.Biography;
        projection.BirthDate = @event.BirthDate;
    }
    
    // Delete projection
    public void Apply(AuthorSoftDeleted @event, AuthorProjection projection)
    {
        // Projection will be deleted
    }
}

Multi-Stream Projections

Listen to events from multiple streams:

public class BookSearchProjectionBuilder : MultiStreamProjection<BookSearchProjection, Guid>
{
    public BookSearchProjectionBuilder()
    {
        // Listen to book events
        Identity<BookAdded>(x => x.Id);
        Identity<BookUpdated>(x => x.Id);
        DeleteEvent<BookSoftDeleted>();
        
        // Listen to publisher events (to update denormalized data)
        Identity<PublisherUpdated>(x => x.Id);
        
        // Listen to author events
        Identity<AuthorUpdated>(x => x.Id);
    }
    
    public async Task<BookSearchProjection> Create(
        BookAdded @event,
        IQuerySession session)
    {
        var projection = new BookSearchProjection
        {
            Id = @event.Id,
            Title = @event.Title,
            PublisherId = @event.PublisherId,
            AuthorIds = @event.AuthorIds
        };
        
        // Denormalize publisher name
        if (projection.PublisherId.HasValue)
        {
            var publisher = await session
                .LoadAsync<PublisherProjection>(projection.PublisherId.Value);
            projection.PublisherName = publisher?.Name;
        }
        
        // Denormalize author names
        if (projection.AuthorIds.Any())
        {
            var authors = await session.Query<AuthorProjection>()
                .Where(a => projection.AuthorIds.Contains(a.Id))
                .ToListAsync();
            projection.AuthorNames = string.Join(", ", authors.Select(a => a.Name));
        }
        
        // Compute search text
        projection.SearchText = $"{projection.Title} {projection.AuthorNames} {projection.PublisherName}";
        
        return projection;
    }
    
    public async Task Apply(
        BookUpdated @event,
        BookSearchProjection projection,
        IQuerySession session)
    {
        projection.Title = @event.Title;
        projection.PublisherId = @event.PublisherId;
        projection.AuthorIds = @event.AuthorIds;
        
        // Re-denormalize (IDs may have changed)
        // ... same logic as Create
    }
    
    // Update when publisher name changes
    void Apply(PublisherUpdated @event, BookSearchProjection projection)
    {
        if (projection.PublisherId == @event.Id)
        {
            projection.PublisherName = @event.Name;
            projection.SearchText = $"{projection.Title} {projection.AuthorNames} {projection.PublisherName}";
        }
    }
}

Projection Lifecycle

Lifecycle When to Use
Inline Update immediately (slower writes)
Async Update asynchronously (faster writes, eventual consistency)
Live Real-time updates via WebSockets

Our Choice: Async for all projections

options.Projections.Add<BookSearchProjectionBuilder>(ProjectionLifecycle.Async);

Querying Projections

// Simple query
var books = await session.Query<BookSearchProjection>()
    .Where(b => !b.IsDeleted)
    .OrderBy(b => b.Title)
    .ToListAsync();

// Full-text search
var results = await session.Query<BookSearchProjection>()
    .Where(b => b.SearchText.PlainTextSearch("clean code"))
    .ToListAsync();

// Pagination
var page = await session.Query<BookSearchProjection>()
    .Where(b => !b.IsDeleted)
    .OrderBy(b => b.Title)
    .Skip((pageNumber - 1) * pageSize)
    .Take(pageSize)
    .ToListAsync();

Metadata Tracking

Correlation and Causation IDs

Track request workflows and event chains:

// Enable in configuration
options.Events.MetadataConfig.CorrelationIdEnabled = true;
options.Events.MetadataConfig.CausationIdEnabled = true;

Correlation ID: Tracks entire workflow

User Request → CreateBook → BookAdded → UpdateSearchProjection
     ↓              ↓            ↓                ↓
  corr-123      corr-123     corr-123         corr-123

Causation ID: Tracks event chains

BookAdded (event-1)
    ↓ causes
UpdateSearchProjection (event-2, causation: event-1)
    ↓ causes
SendNotification (event-3, causation: event-2)

Setting Metadata

Use middleware to set correlation ID:

public class MartenMetadataMiddleware
{
    public async Task InvokeAsync(HttpContext context, RequestDelegate next)
    {
        var session = context.RequestServices.GetService<IDocumentSession>();
        if (session is not null)
        {
            // Set correlation ID from header or generate new one
            var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault()
                ?? Guid.NewGuid().ToString();
            
            session.CorrelationId = correlationId;
            
            // Add to response headers
            context.Response.Headers["X-Correlation-ID"] = correlationId;
        }
        
        await next(context);
    }
}

// Register middleware
app.UseMartenMetadata();

Querying Metadata

var events = await session.Events
    .QueryAllRawEvents()
    .Where(e => e.CorrelationId == "corr-123")
    .ToListAsync();

foreach (var evt in events)
{
    Console.WriteLine($"Event: {evt.EventType}");
    Console.WriteLine($"Correlation: {evt.CorrelationId}");
    Console.WriteLine($"Causation: {evt.CausationId}");
    Console.WriteLine($"Timestamp: {evt.Timestamp}");
}

Optimistic Concurrency

Using Stream Versions as ETags

public static async Task<IResult> Handle(
    UpdateBook command,
    IDocumentSession session,
    HttpContext context)
{
    // Get current stream version
    var streamState = await session.Events.FetchStreamStateAsync(command.Id);
    if (streamState == null)
        return Results.NotFound();
    
    // Generate ETag from version
    var currentETag = ETagHelper.GenerateETag(streamState.Version);
    
    // Check If-Match header
    if (!string.IsNullOrEmpty(command.ETag) && 
        !ETagHelper.CheckIfMatch(context, currentETag))
    {
        return ETagHelper.PreconditionFailed();
    }
    
    // Update aggregate
    var aggregate = await session.Events.AggregateStreamAsync<BookAggregate>(command.Id);
    var @event = aggregate.Update(...);
    session.Events.Append(command.Id, @event);
    
    // Return new ETag
    var newStreamState = await session.Events.FetchStreamStateAsync(command.Id);
    var newETag = ETagHelper.GenerateETag(newStreamState!.Version);
    ETagHelper.AddETagHeader(context, newETag);
    
    return Results.NoContent();
}

ETag Helper

public static class ETagHelper
{
    public static string GenerateETag(int version)
        => $"\"{version}\"";
    
    public static bool CheckIfMatch(HttpContext context, string currentETag)
    {
        var ifMatch = context.Request.Headers["If-Match"].FirstOrDefault();
        return ifMatch == currentETag;
    }
    
    public static void AddETagHeader(HttpContext context, string etag)
    {
        context.Response.Headers["ETag"] = etag;
    }
    
    public static IResult PreconditionFailed()
        => Results.Problem(
            statusCode: 412,
            title: "Precondition Failed",
            detail: "The resource has been modified since you last retrieved it.");
}

NGram Search Configuration

// Enable NGram search with unaccent (multilingual)
options.Advanced.UseNGramSearchWithUnaccent = true;

// Add NGram indexes
options.Schema.For<BookSearchProjection>()
    .NgramIndex(x => x.Title)
    .NgramIndex(x => x.AuthorNames);

Searching

// Plain text search (uses NGram indexes)
var results = await session.Query<BookSearchProjection>()
    .Where(b => b.SearchText.PlainTextSearch("clean code"))
    .ToListAsync();

// Search with ranking
var rankedResults = await session.Query<BookSearchProjection>()
    .Where(b => b.SearchText.PlainTextSearch("clean code"))
    .OrderByDescending(b => b.SearchText.PlainTextSearch("clean code"))
    .ToListAsync();

// Accent-insensitive search (automatic with UseNGramSearchWithUnaccent)
var results = await session.Query<BookSearchProjection>()
    .Where(b => b.Title.PlainTextSearch("café"))  // Matches "cafe", "café", "Café"
    .ToListAsync();

Advanced Features

Document Store

Marten can also store documents (not just events):

// Store document
var book = new Book { Id = Guid.NewGuid(), Title = "Clean Code" };
session.Store(book);
await session.SaveChangesAsync();

// Load document
var loaded = await session.LoadAsync<Book>(book.Id);

// Query documents
var books = await session.Query<Book>()
    .Where(b => b.Title.Contains("Code"))
    .ToListAsync();

Soft Deletes

// Configure soft deletes
options.Schema.For<BookSearchProjection>()
    .SoftDeleted();

// Soft delete
session.Delete(book);  // Sets deleted flag

// Query non-deleted
var active = await session.Query<Book>()
    .Where(b => !b.Deleted)
    .ToListAsync();

Indexes

// B-tree index (exact matches, sorting)
options.Schema.For<BookSearchProjection>()
    .Index(x => x.Title);

// GIN index (JSON fields)
options.Schema.For<BookSearchProjection>()
    .GinIndexJsonData();

// NGram index (full-text search)
options.Schema.For<BookSearchProjection>()
    .NgramIndex(x => x.Title);

// Composite index
options.Schema.For<BookSearchProjection>()
    .Index(x => x.PublisherId, x => x.Title);

Best Practices

1. Event Design

  • ✅ Use past tense names (BookAdded, not AddBook)
  • ✅ Include all necessary data in events
  • ✅ Never modify events (immutable)
  • ✅ Version events when schema changes needed

2. Aggregate Design

  • ✅ Keep aggregates focused (single responsibility)
  • ✅ Validate in command methods
  • ✅ Apply methods should only update state
  • ✅ Use static factory methods for creation

3. Projection Design

  • ✅ Denormalize for query performance
  • ✅ Use async projections for scalability
  • ✅ Keep projections simple (no business logic)
  • ✅ Create separate projections for different queries

4. Session Management

  • ✅ Use UseLightweightSessions() for better performance
  • ✅ Let Wolverine manage transactions
  • ✅ Don't manually call SaveChangesAsync() with Wolverine
  • ✅ Use IQuerySession for read-only operations

5. Performance

  • ✅ Use indexes on frequently queried fields
  • ✅ Use NGram indexes for full-text search
  • ✅ Denormalize data in projections
  • ✅ Use async projections to avoid blocking writes

Troubleshooting

Projection Not Updating

Problem: Projection not reflecting latest events

Solution:

  1. Check projection daemon status:
    GET /api/admin/projections/status
    
  2. Rebuild projection:
    GET /api/admin/projections/rebuild/BookSearchProjection
    
  3. Check for exceptions in logs

Events Not Saving

Problem: Events not appearing in database

Solution:

  • Verify IntegrateWithWolverine() is called
  • Check for exceptions in handler
  • Ensure event types are registered
  • Verify connection string

Slow Queries

Problem: Projection queries are slow

Solution:

  • Add indexes on queried fields
  • Use NGram indexes for text search
  • Denormalize more data
  • Check query execution plan

Summary

Marten provides:

  • ✅ Event store on PostgreSQL
  • ✅ Document database capabilities
  • ✅ Async projections for read models
  • ✅ Full-text search with NGram
  • ✅ Metadata tracking (correlation/causation)
  • ✅ Optimistic concurrency with stream versions

Key Concepts:

  1. Events = Immutable facts
  2. Aggregates = Domain logic + state
  3. Streams = Event sequences
  4. Projections = Read models
  5. Sessions = Unit of work

Next Steps