Skip to content

A message broker implementation from scratch in elixir

Notifications You must be signed in to change notification settings

nichitaa/ex_message_broker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

99 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Pasecinic Nichita

Real-Time Programming in Elixir

A message broker implementation from scratch in Elixir

This mono-repo contains several mix projects for a message broker implementation and simulation. More information can be found in dedicated readme.md and /docs directory for each project

  • rtp_sse - scalable tweets SSE handler with multiple worker pools with MongoDB bulk operations
  • message_broker - message broker implementation with persistent messages (saved in JSON logs), events priority, subscriber acknowledgements
  • change_stream - CDC project that acts as publisher for topics (database collections) from rtp_sse
  • subscriber - client subscriber for message_broker that subscribes to tweets and users topics and automatically acknowledges the received events (for a message_broker stress test)

Getting Started

Prerequisites: A running MongoDB replica set instance on mongodb://localhost:27017/rtp_sse_db

  1. Pull Tweets SSE server
# pull the docker image
$ docker pull alexburlacu/rtp-server:faf18x
# start the docker container on port 4000
$ docker run -p 4000:4000 alexburlacu/rtp-server:faf18x
  1. Clone & dependency install for each project
$ git clone https://github.com/nichitaa/rtp_sse 
$ # similarly for `change_stream`, `rtp_sse` and `subscriber`
$ cd message_broker # cd in each project root directory
$ mix deps.get # and install required dependices
  1. Start a new iex session for each project in the next order:

    1. message_broker

    2. change_stream

    3. rtp_sse

    4. subscriber

$ # similarly for `change_stream`, `rtp_sse` and `subscriber`
$ cd message_broker
$ iex -S mix
  1. You can inspect the processes with elixirs' powerful observer tool (for each projects separatelly)
# Start the builtin observer tool
iex(1)> :observer.start()

After you've got all 4 apps up and running you can connect to message broker and subscribe to a topic:

$ telnet localhost 8000
$ sub tweets # subscribe to `tweets` topic
$ # other available topic to subscribe to: `logger_stats`, `users`, `tweets_stats`, `users_stats`

Once created a subscriber connection to the message broker you can similarly connect and send publish commands

$ telnet localhost 8000
$ pub tweets {"id":"1", "priority": 3, "msg":"tweets topic test message"}
$ pub users {"id":"2", "priority": 4, "msg":"users topic test message"}

To start subscribers that will automatically ack each received events:

  1. Start receiving tweets withrtp_sse project
$ telnet localhost 8080 # connect to `rtp_sse` server
$ twitter # send `twitter` command to start receiving tweets from pulled docker container

To run it with docker-compose up change localhost s from each project config.exs file to related docker service from docker-compose.yml. For example in change_stream/config/config.exs replace:

mongo_srv: "mongodb://localhost:27017/rtp_sse_db?replicaSet=rs0",
mb_host: 'localhost'

with:

mongo_srv: "mongodb://mongodb_service:27017/rtp_sse_db?replicaSet=rs0",
mb_host: 'message_broker'

Obviously, there is much more to the project itself, but I'm sure you can

figure-it-out