Event Source Architecture: Exploring Concepts with Code

Event Source Architecture: Exploring Concepts with Code

Chris Sonnenberg Architecture, Development Technologies Leave a Comment

Event Source is an implementation of the Single Source of Truth architecture. Event Sourcing, at its core, just stores events as an ordered sequence of state changes.

Typically, there is an Event Store that houses multiple Event Streams. Each Event Stream is an ordered sequence of Events, with each Event being a state change for the system. Projections are just the system’s state after a projection function is applied to an Event Stream, typically containing all events in the stream but sometimes only a subset.

Having a persistent sequence of events means any new feature can be retroactively applied to all previous events and bugs can be fixed with a code change and re-applying the projection function. Want to check how the platform was used the first year without later data? Just filter the Events before applying the projection function.

To showcase Event Sourcing, we’ll make a simplified voting Event Stream in Erlang. Many parts will be simplified just so the core can be easily shown. Our example will just allow a single Event Stream, where we can either Vote or Stop Voting. After voting stops, we’ll then allow anyone to get all the events.

Event Stores and Message Brokers

Event Stores (i.e. EventStoreDB and Apache Kafka) are made to specifically handle Event Sourcing. Message Brokers (i.e. RabbitMQ) can also be an Event Store if messages are persisted and permanent.

In our example, an Erlang process will handle a single Event Stream. If you aren’t familiar with Erlang, a quick summary is: a distributed, concurrent, asynchronous message-passing, purely functional programming language. It’s similar to Google’s Go programming language, with more fault tolerance and a distributed system baked in. It was originally made for high-availability telecom platforms back in the 80’s.

Event Stream

Event Streams are an ordered sequence of Events. Each stream generally handles only a subset of total events. Each stream can have multiple instances that use it though, which keeps data separated between clients or actors. For example, we can have a voting poll Event Stream where each poll station has its own instance.

Events

Events are individual actions that happen within the system. In this case, we’ll just have a single simple voting stream. There are two events it accepts: vote and stop. Voting will just collect all the events, ignoring all other messages. After voting has stopped, we’ll allow requesting all of the events. In many systems, you will update the current state as new events come in, or if you have a batch ETL process, you can re-apply the projection function after the batch process finishes.

voting(Events) ->
    receive
        stop -> ok;
        stop_voting -> stopped(lists:reverse(Events));
        {vote, _User, _Choice}=Msg ->
            voting([Msg|Events]);
        _ -> voting(Events)
    end.

stopped(Events) ->
    receive
        stop -> ok;
        {get_events, Sender} ->
            Sender ! Events,
            stopped(Events);
        _ -> stopped(Events)
    end.

Then, we’ll need some way to interact with the system: starting, stopping, voting, stop voting, and getting the events.

start_link() -> spawn_link(fun () -> voting([]) end).
stop(Pid) -> Pid ! stop.

vote(Pid, User, Choice) -> Pid ! {vote, User, Choice}.
stop_voting(Pid) -> Pid ! stop_voting.
get_events(Pid) ->
    Pid ! {get_events, self()},
    receive Msg -> Msg end.

For simulation and testing, we’ll also need a “voter” and a simple test function. Our voter will also make some incorrect actions, such as 50% double voting and 20% bad voting choice. These specific bugs will be fixed by our projection function. We’ll have just 1,000 voters to start.

test() ->
    Choices = [
        "ice cream",
        "hot dogs",
        "chips",
        "steak",
        "salad"
    ],
    Pid = start_link(),
    [start_voter(Pid, U, Choices) || U <- lists:seq(1, 1000)],
    receive after 1_000 ->
        poll:stop_voting(Pid),
        Events = poll:get_events(Pid),
        poll:stop(Pid),
        % look over events here
        Events
    end.

start_voter(Server, User, Choices) ->
    spawn(
        fun () ->
            Choice = case rand:uniform(5) of
                1 -> lists:map(fun([H|_]) -> H end, Choices);
                _ -> lists:nth(rand:uniform(length(Choices)), Choices)
            end,
            poll:vote(Server, User, Choice),
            case rand:uniform(2) of
                1 -> poll:vote(Server, User, Choice);
                _ -> ok
            end
        end
    ).

Remember that Events are fixed actions. Getting them right the first time will save a big headache later. Adding and ignoring events are generally easy, but removing events will require some thought.

Removing events requires a few steps. First, stop generating the Event, and then decide how to handle the already existing events.

  1. In the projection function, ignore the Event.
  2. Before the projection function, have a filter that “removes” the Event.
  3. (Not Recommended, not always possible) destructively modify the Event Stream to delete the Events.

We don’t want to change the Event Stream just because it’s supposed to be an immutable data structure. Once we get into the habit of changing it, we’ll start losing all the good things about Event Sourced systems, like having multiple projection functions to see the data for different purposes.

Projections

A Projection is the view of the data after a projection function has been applied. It can be done with an empty db table to hold the current state, then just fold over the Event Stream, changing the table accordingly.

After applying the projection function, modify which table is used for querying data, and drop the old table (if applicable). The projection function can be changed to fix bugs or add new features easily, and a filter function can be composed with the projection function to remove ignored Events.

We’ll start with a simple summation of all the votes in our Event Stream.

basic_sum(Events) ->
    lists:foldl(
        fun({vote, _, Choice}, Counts) ->
            maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts)
        end,
        #{},
        Events).

Giving a possible return of:

#{
    "chips" => 216,
    "hot dogs" => 267,
    "ice cream" => 225,
    "ihcss" => 292,
    "salad" => 256,
    "steak" => 250
}

That doesn’t look quite right. There are far too many votes for our expected 1,000 voters. Let’s go ahead and change our projection function to make sure everyone gets only one vote.

one_vote_sum(Events) ->
    {_, Counts} = lists:foldl(
        fun({vote, User, Choice}, {Users, Counts}) ->
            maybe
                false ?= maps:is_key(User, Users),
                Cs = maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts),
                {Users#{User => true}, Cs}
            else
                _ -> {Users, Counts}
            end
        end,
        {#{}, #{}},
        Events),
    Counts.

This now changes our results to:

#{
    "chips" => 145,
    "hot dogs" => 173,
    "ice cream" => 159,
    "ihcss" => 187,
    "salad" => 169,
    "steak" => 167
}

Those numbers look much better. Unfortunately, there’s a choice in there that wasn’t part of our list of valid choices. Let’s do one final fix on this projection function:

valid_choice_sum(Events, Choices) ->
    {_, Counts} = lists:foldl(
        fun({vote, User, Choice}, {Users, Counts}) ->
            maybe
                false ?= maps:is_key(User, Users),
                true ?= lists:member(Choice, Choices),
                Cs = maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts),
                {Users#{User => true}, Cs}
            else
                _ -> {Users, Counts}
            end
        end,
        {#{}, #{}},
        Events),
    Counts.

Which gives us our final result:

#{
    "chips" => 145,
    "hot dogs" => 173,
    "ice cream" => 159,
    "salad" => 169,
    "steak" => 167
}

Those results look good. Our projection is on the right path.

As long as we have the data in the Events, we can modify our projection function to use it however we want. We can also take portions of the Event Stream to get historical data, answering questions such as, “what were the first 100 votes?”

Conclusion

Event Sourcing is a flexible Single Source of Truth architecture with good extensibility and bug-fixing capabilities. Event Streams handle a grouping of events in an isolated way. Events map all business domain handling into specific actions. All of this is easily distributed with many Event Stores, helping with scalability and fault tolerance.

Of course, it’s not all roses. Event Streams having Events from all time is both a blessing and a curse. Event Streams would eventually take up all the storage in the universe if allowed. However, having events from all time means you can also use it as a data warehouse, analyzing all the data from the beginning of time for trends and insights, answering questions and helping you make informed decisions.

Thanks for reading; I hope you found this summary and demonstrative example helpful. If you’re interested in learning more, take a look at what we have on the Keyhole Dev Blog.

Complete Code

-module(poll).
-feature(maybe_expr,enable).
-export([start_link/0, stop/1, vote/3, stop_voting/1, get_events/1]).
-export([test/0, basic_sum/1, one_vote_sum/1, valid_choice_sum/2]).

% testing
test() ->
    Choices = [
        "ice cream",
        "hot dogs",
        "chips",
        "steak",
        "salad"
    ],
    Pid = start_link(),
    [start_voter(Pid, U, Choices) || U <- lists:seq(1, 1000)],
    receive after 1_000 ->
        poll:stop_voting(Pid),
        Events = poll:get_events(Pid),
        poll:stop(Pid),
        Basic = basic_sum(Events),
        One = one_vote_sum(Events),
        Valid = valid_choice_sum(Events, Choices),
        {Basic, One, Valid}
    end.

start_voter(Server, User, Choices) ->
    spawn(
        fun () ->
            Choice = case rand:uniform(5) of
                1 -> lists:map(fun([H|_]) -> H end, Choices);
                _ -> lists:nth(rand:uniform(length(Choices)), Choices)
            end,
            poll:vote(Server, User, Choice),
            case rand:uniform(2) of
                1 -> poll:vote(Server, User, Choice);
                _ -> ok
            end
        end
    ).

% projections
basic_sum(Events) ->
    lists:foldl(
        fun({vote, _, Choice}, Counts) ->
            maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts)
        end,
        #{},
        Events).

one_vote_sum(Events) ->
    {_, Counts} = lists:foldl(
        fun({vote, User, Choice}, {Users, Counts}) ->
            maybe
                false ?= maps:is_key(User, Users),
                Cs = maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts),
                {Users#{User => true}, Cs}
            else
                _ -> {Users, Counts}
            end
        end,
        {#{}, #{}},
        Events),
    Counts.

valid_choice_sum(Events, Choices) ->
    {_, Counts} = lists:foldl(
        fun({vote, User, Choice}, {Users, Counts}) ->
            maybe
                false ?= maps:is_key(User, Users),
                true ?= lists:member(Choice, Choices),
                Cs = maps:update_with(Choice, fun(V) -> V + 1 end, 1, Counts),
                {Users#{User => true}, Cs}
            else
                _ -> {Users, Counts}
            end
        end,
        {#{}, #{}},
        Events),
    Counts.

% exported helpers
start_link() -> spawn_link(fun () -> voting([]) end).
stop(Pid) -> Pid ! stop.

vote(Pid, User, Choice) -> Pid ! {vote, User, Choice}.
stop_voting(Pid) -> Pid ! stop_voting.
get_events(Pid) ->
    Pid ! {get_events, self()},
    receive Msg -> Msg end.

% event stream capturing
voting(Events) ->
    receive
        stop -> ok;
        stop_voting -> stopped(lists:reverse(Events));
        {vote, _User, _Choice}=Msg ->
            voting([Msg|Events]);
        _ -> voting(Events)
    end.

stopped(Events) ->
    receive
        stop -> ok;
        {get_events, Sender} ->
            Sender ! Events,
            stopped(Events);
        _ -> stopped(Events)
    end.
4 1 vote
Article Rating
Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments