Event Horizon

Get started with the Event Horizon

Welcome to the tutorial for Event Horizon, where you learn how to write a Microservice that produces public events of dishes prepared by chefs, and another microservice consumes those events.

After this tutorial you will have:

Use the tabs to switch between the C# and TypeScript code examples. Full tutorial code available on GitHub for C# and TypeScript.

Prerequisites

This tutorial builds directly upon the getting started guide and the files from the it.

Setup

This tutorial will have a setup with two microservices; one that produces public events, and a consumer that subscribes to those public events. Let’s make a folder structure that resembles that:

└── event-horizon-tutorial/
    ├── consumer/
    ├── producer/
    └── environment/
        └── docker-compose.yml

Go into both the consumer and the producer folders and initiate the project as we’ve gone through in our getting started guide. I.e copy over all the code from the getting started tutorial to the consumer and producer folders. You can choose different languages for the microservices if you want to.

We’ll come back to the docker-compose later in this tutorial.

Producer

Create a Public Filter

A public filter filters all public events that pass the filter into a public stream, which is special stream that another microservice can subscribe to.

A public filter is defined as a method that returns a partitioned filter result, which is an object with two properties:

  • a boolean that says whether the event should be included in the public stream
  • a partition id which is the partition that the event should belong to in the public stream.

Only public events get filtered through the public filters.

// Program.cs
using System;
using System.Threading.Tasks;
using Dolittle.SDK;
using Dolittle.SDK.Tenancy;
using Dolittle.SDK.Events;
using Dolittle.SDK.Events.Filters;

namespace Kitchen
{
    class Program
    {
        public static void Main()
        {
            var client = Client
                .ForMicroservice("f39b1f61-d360-4675-b859-53c05c87c0e6")
                .WithEventTypes(eventTypes =>
                    eventTypes.Register<DishPrepared>())
                .WithEventHandlers(builder =>
                    builder.RegisterEventHandler<DishHandler>())
                .WithFilters(filtersBuilder =>
                    filtersBuilder
                        .CreatePublicFilter("2c087657-b318-40b1-ae92-a400de44e507", filterBuilder =>
                            filterBuilder.Handle((@event, eventContext) =>
                            {
                                Console.WriteLine($"Filtering event {@event} to public stream");
                                return Task.FromResult(new PartitionedFilterResult(true, PartitionId.Unspecified));
                            })))
                .Build();
            // Rest of your code here...
        }
    }
}

// index.ts
import { Client } from '@dolittle/sdk';
import { EventContext, PartitionId } from '@dolittle/sdk.events';
import { PartitionedFilterResult } from '@dolittle/sdk.events.filtering';
import { TenantId } from '@dolittle/sdk.execution';
import { DishPrepared } from './DishPrepared';
import { DishHandler } from './DishHandler';

const client = Client
    .forMicroservice('f39b1f61-d360-4675-b859-53c05c87c0e6')
    .withEventTypes(eventTypes =>
        eventTypes.register(DishPrepared))
    .withEventHandlers(builder =>
        builder.register(DishHandler))
    .withFilters(filterBuilder =>
        filterBuilder
            .createPublicFilter('2c087657-b318-40b1-ae92-a400de44e507', fb =>
                fb.handle((event: any, context: EventContext) => {
                    console.log(`Filtering event ${JSON.stringify(event)} to public stream`);
                    return new PartitionedFilterResult(true, PartitionId.unspecified);
                })
            ))
    .build();
    // Rest of your code here...

Notice that the returned PartitionedFilterResult has true and an unspecified PartitionId (which is the same as an empty GUID). This means that this filter creates a public stream that includes all public events, and that they are put into the unspecified partition of that stream.

Commit the public event

Now that we have a public stream we can commit public events to start filtering them. Let’s commit a DishPrepared event as a public event from the producer microservice:

// Program.cs
using Dolittle.SDK;
using Dolittle.SDK.Tenancy;

namespace Kitchen
{
    class Program
    {
        public static void Main()
        {
            // Where you build the client...

            var preparedTaco = new DishPrepared("Bean Blaster Taco", "Mr. Taco");

            client.EventStore
                .ForTenant(TenantId.Development)
                .CommitPublicEvent(preparedTaco, "bfe6f6e4-ada2-4344-8a3b-65a3e1fe16e9")
                .GetAwaiter().GetResult();

            // Blocks until the EventHandlers are finished, i.e. forever
            client.Start().Wait();
        }
    }
}

// index.ts
import { Client } from '@dolittle/sdk';
import { TenantId } from '@dolittle/sdk.execution';
import { DishPrepared } from './DishPrepared';
import { DishHandler } from './DishHandler';

// Where you build the client...

const preparedTaco = new DishPrepared('Bean Blaster Taco', 'Mr. Taco');

client.eventStore
    .forTenant(TenantId.development)
    .commitPublic(preparedTaco, 'bfe6f6e4-ada2-4344-8a3b-65a3e1fe16e9');

})();

Now we have a producer microservice with a public stream of DishPrepared events.

Consumer

Subscribe to the public stream of events

Let’s create another microservice that subscribes to the producer’s public stream.

// Program.cs
using System;
using Dolittle.SDK;
using Dolittle.SDK.Tenancy;
using Dolittle.SDK.Events;

namespace Kitchen
{
    class Program
    {
        public static void Main()
        {
            var client = Client.ForMicroservice("a14bb24e-51f3-4d83-9eba-44c4cffe6bb9")
                .WithRuntimeOn("localhost", 50055)
                .WithEventTypes(eventTypes =>
                    eventTypes.Register<DishPrepared>())
                .WithEventHorizons(eventHorizons =>
                    eventHorizons.ForTenant(TenantId.Development, subscriptions =>
                        subscriptions
                            .FromProducerMicroservice("f39b1f61-d360-4675-b859-53c05c87c0e6")
                            .FromProducerTenant(TenantId.Development)
                            .FromProducerStream("2c087657-b318-40b1-ae92-a400de44e507")
                            .FromProducerPartition(PartitionId.Unspecified)
                            .ToScope("808ddde4-c937-4f5c-9dc2-140580f6919e")))
                .WithEventHandlers(_ =>
                    _.CreateEventHandler("6c3d358f-3ecc-4c92-a91e-5fc34cacf27e")
                        .InScope("808ddde4-c937-4f5c-9dc2-140580f6919e")
                        .Partitioned()
                        .Handle<DishPrepared>((@event, context) => Console.WriteLine($"Handled event {@event} from public stream")))
                .Build();
            // Blocks until the EventHandlers are finished, i.e. forever
            client.Start().Wait();
        }
    }
}

// index.ts
import { Client } from '@dolittle/sdk';
import { TenantId } from '@dolittle/sdk.execution';
import { PartitionId } from '@dolittle/sdk.events';
import { DishPrepared } from './DishPrepared';

const client = Client
    .forMicroservice('a14bb24e-51f3-4d83-9eba-44c4cffe6bb9')
    .withRuntimeOn('localhost', 50055)
    .withEventTypes(eventTypes =>
        eventTypes.register(DishPrepared))
    .withEventHorizons(_ => {
        _.forTenant(TenantId.development, ts =>
            ts.fromProducerMicroservice('f39b1f61-d360-4675-b859-53c05c87c0e6')
                .fromProducerTenant(TenantId.development)
                .fromProducerStream('2c087657-b318-40b1-ae92-a400de44e507')
                .fromProducerPartition(PartitionId.unspecified.value)
                .toScope('808ddde4-c937-4f5c-9dc2-140580f6919e'))})
    .withEventHandlers(eventHandlers =>
        eventHandlers
            .createEventHandler("6c3d358f-3ecc-4c92-a91e-5fc34cacf27e", _ =>
                _.inScope("808ddde4-c937-4f5c-9dc2-140580f6919e")
                .partitioned()
                .handle(DishPrepared, (event, context) => console.log(`Handled event ${JSON.stringify(event)} from public stream`))))
    .build();

Now we have a consumer microservice that:

  • Connects to another Runtime running on port 50055
  • Subscribes to the producer’s public stream with the id of 2c087657-b318-40b1-ae92-a400de44e507 (same as the producer’s public filter)
  • Puts those events into a Scope with id of 808ddde4-c937-4f5c-9dc2-140580f6919e
  • Handles them incoming events in a scoped event handler with an id of 6c3d358f-3ecc-4c92-a91e-5fc34cacf27e

There’s a lot of stuff going on the code so let’s break it down:

Connection to the Runtime

// Program.cs
.WithRuntimeOn("localhost", 50055)
// Rest of builder here...

// index.ts
.withRuntimeOn('localhost', 50055)
// Rest of builder here...

This line configures the hostname and port of the Runtime for this client. By default, it connects to the Runtimes default port of 50053 on localhost.

Since we in this tutorial will end up with two running instances of the Runtime, they will have to run with different ports. The producer Runtime will be running on the default 50053 port, and the consumer Runtime will be running on port 50055. We’ll see this reflected in the docker-compose.yml file later in this tutorial.

Event Horizon

// Program.cs
.WithEventHorizons(eventHorizons =>
    eventHorizons.ForTenant(TenantId.Development, subscriptions =>
        subscriptions
            .FromProducerMicroservice("f39b1f61-d360-4675-b859-53c05c87c0e6")
            .FromProducerTenant(TenantId.Development)
            .FromProducerStream("2c087657-b318-40b1-ae92-a400de44e507")
            .FromProducerPartition(PartitionId.Unspecified)
            .ToScope("808ddde4-c937-4f5c-9dc2-140580f6919e")))
// Rest of builder here...

// index.ts
.withEventHorizons(_ =>
    _.forTenant(TenantId.development, ts =>
        ts.fromProducerMicroservice('f39b1f61-d360-4675-b859-53c05c87c0e6')
            .fromProducerTenant(TenantId.development)
            .fromProducerStream('2c087657-b318-40b1-ae92-a400de44e507')
            .fromProducerPartition(PartitionId.unspecified.value)
            .toScope('808ddde4-c937-4f5c-9dc2-140580f6919e')))
// Rest of builder here...

Here we define an event horizon subscription. Each subscription is submitted and managed by the Runtime. A subscription defines:

When the consumer’s Runtime receives a subscription, it will send a subscription request to the producer’s Runtime. If the producer accepts that request, the producer’s Runtime will start sending the public stream over to the consumer’s Runtime, one event at a time.

The acceptance depends on two things:

  • The consumer needs to know where to access the other microservices, ie the URL address.
  • The producer needs to give formal Consent for a tenant in another microservice to subscribe to public streams of a tenant.

We’ll setup the consent later.

The consumer will receive events from the producer and put those events in a specialized event-log that is identified by the scope’s id, so that events received over the event horizon don’t mix with private events. We’ll talk more about the scope when we talk about the scoped event handler.

Scoped Event Handler

// Program.cs
.WithEventHandlers(_ =>
    _.CreateEventHandler("6c3d358f-3ecc-4c92-a91e-5fc34cacf27e")
        .InScope("808ddde4-c937-4f5c-9dc2-140580f6919e")
        .Partitioned()
        .Handle<DishPrepared>((@event, context) => Console.WriteLine($"Handled event {@event} from public stream")))
// Rest of builder here...

// index.ts
.withEventHandlers(eventHandlers =>
    eventHandlers
        .createEventHandler("6c3d358f-3ecc-4c92-a91e-5fc34cacf27e", _ =>
            _.inScope("808ddde4-c937-4f5c-9dc2-140580f6919e")
            .partitioned()
            .handle(DishPrepared, (event, context) => console.log(`Handled event ${JSON.stringify(event)} from public stream`))))
})
// Rest of builder here...

Here we use the opportunity to create an event handler inline by using the client’s builder function. This way we don’t need to create a class and register it as an event handler.

This code will create a partitioned event handler with id 6c3d358f-3ecc-4c92-a91e-5fc34cacf27e (same as from getting started) in a specific scope.

Remember, that the events from an event horizon subscription get put into a scoped event-log that is identified by the scope id. Having the scope id defined when creating an event handler signifies that it will only handle events in that scope and no other.

Setup your environment

Now we have the producer and consumer microservices Heads coded, we need to setup the environment for them to run in and configure their Runtimes to be connected.

Let’s go to the environment folder we created in the beginning of this tutorial. Here we’ll need to configure:

Resources

resources.json define a microservices event store. We have 2 microservices so they both need their own event store database. By default the database is called event_store.

Let’s create 2 files, consumer-resources.json and producer-resources.json:

//consumer-resources.json
{
    // the tenant to define this resource for
    "445f8ea8-1a6f-40d7-b2fc-796dba92dc44": {
        "eventStore": {
            "servers": [
                // hostname of the mongodb
                "mongo"
            ],
            // the database name for the event store
            "database": "consumer_event_store"
        }
    }
}
//producer-resources.json
{
    // the tenant to define this resource for
    "445f8ea8-1a6f-40d7-b2fc-796dba92dc44": {
        "eventStore": {
            "servers": [
                // hostname of the mongodb
                "mongo"
            ],
            // the database name for the event store
            "database": "producer_event_store"
        }
    }
}

Note that the development tenant is 445f8ea8-1a6f-40d7-b2fc-796dba92dc44 (same as TenantId.Development).

Endpoints

endpoints.json defines the private (where the SDK connects) and public port (where other Runtimes can connect) of the Runtime.

We can leave the producer with the default ports (50052 for public, 50053 for private), but let’s create consumer-endpoints.json to change the consumer’s ports:

//consumer-endpoints.json
{
    "public": {
        "port": 50054
    },
    "private": {
        "port": 50055
    }
}

The 50055 port is the port that we configured the consumer microservice earlier in the withRuntimeOn() method.

Microservices

microservices.json define where the producer microservices are so that the consumer can subscribe to them.

Let’s create a consumer-microservices.json file to define where the consumer can find the producer:

// consumer-microservices.json
{
    // the producer microservices id, hostname and port
    "f39b1f61-d360-4675-b859-53c05c87c0e6": {
        "host": "producer-runtime",
        "port": 50052
    }
}

event-horizon-consents.json defines the Consents that the producer gives to consumers.

Let’s create producer-event-horizon-consents.json where we give a consumer consent to subscribe to our public stream.

// producer-event-horizon-consents.json
{
    // the producer's tenant that gives the consent
    "445f8ea8-1a6f-40d7-b2fc-796dba92dc44": [
        {
            // the consumer's microservice and tenant to give consent to
            "microservice": "a14bb24e-51f3-4d83-9eba-44c4cffe6bb9",
            "tenant": "445f8ea8-1a6f-40d7-b2fc-796dba92dc44",
            // the producer's public stream and partition to give consent to subscribe to
            "stream": "2c087657-b318-40b1-ae92-a400de44e507",
            "partition": "00000000-0000-0000-0000-000000000000",
            // an identifier for this consent. This is random
            "consent": "ad57aa2b-e641-4251-b800-dd171e175d1f"
        }
    ]
}

Configure docker-compose.yml

Now we can glue all the configuration files together in the docker-compose.yml. The configuration files are mounted inside /app.dolittle/ inside the dolittle/runtime image.

version: '3.1'
services:
  mongo:
    image: dolittle/mongodb
    hostname: mongo
    ports:
      - 27017:27017
    logging:
      driver: none
 
  consumer-runtime:
    image: dolittle/runtime
    volumes:
      - ./consumer-resources.json:/app/.dolittle/resources.json
      - ./consumer-endpoints.json:/app/.dolittle/endpoints.json
      - ./consumer-microservices.json:/app/.dolittle/microservices.json
    ports:
      - 50054:50054
      - 50055:50055

  producer-runtime:
    image: dolittle/runtime
    volumes:
      - ./producer-resources.json:/app/.dolittle/resources.json
      - ./producer-event-horizon-consents.json:/app/.dolittle/event-horizon-consents.json
    ports:
      - 50052:50052
      - 50053:50053

Start the environment

Start the docker-compose with this command

$ docker-compose up

This will spin up a MongoDB container and two Runtimes.

Run your microservices

Run both the consumer and producer microservices in their respective folders, and see the consumer handle the events from the producer:

Producer

$ dotnet run
Filtering event EventHorizon.Producer.DishPrepared to public streams
Mr. Taco has prepared Bean Blaster Taco. Yummm!

Consumer

$ dotnet run
Handled event EventHorizon.Consumer.DishPrepared from public stream

Producer

$ npx ts-node index.ts
Filtering event {"Dish":"Bean Blaster Taco","Chef":"Mr. Taco"} to public stream
Mr. Taco has prepared Bean Blaster Taco. Yummm!

Consumer

$ npx ts-node index.ts
Handled event {"Dish":"Bean Blaster Taco","Chef":"Mr. Taco"} from public stream

What’s next

Last modified March 22, 2021: Remove extra event horizon header (65cf7e5)