Life is a succession of asynchronous events which could depend on each other.

Some weeks ago my friends and I wanted to play a football match, therefore we booked a football pitch of a sport centre for the next Saturday at 17pm. On that Saturday, I arrived to the sport centre at 16:44 and other guys were already there and they had arrived between 16:31 and 16:42. Five minutes later arrived more people. At 16:50 only three people were not there, we were 19 and and we needed 22 in order to enjoy a proper football match. We called them but only one was contacted and he arrived 3 minutes later. At 17:00 we were 20 people and we decided to wait 5 minutes more before beginning the football match. They did not arrive and we began to play a football match with teams of 10 people.

As you can see, the football match is an event that depends on other events like booking the football pitch or people arriving. When the event has to be processed, you have already checked all your dependencies, if not, you can wait or try to get them. Finally you can perform the event, cancel it or adapt it.






Streaming Data

Nowadays I have been dealing with a new problem: Working with a streaming of data where the received messages could have relation between them. When I work with a stream of data and my chosen language has been Elixir, I try to take advantage of the concurrency offered by the Erlang Virtual Machine. That approach help me to parallelize as much as possible improving the performance but also other challenges are introduced, for instance, the data is not ordered, hence; trying to process messages in concurrent flows, could increase the difficulties.

Meanwhile I was reading Designing for Scalability with Erlang/OTP and it explains how the synchronous and asynchronous calls to a gen_server are implemented. It gave me an idea. If I am going to process each received event in a different process, why don't I check/request its dependencies and the process will wait for a message with:
  • The dependency : when the received event needs its dependencies to be processed.
  • A reference to the dependency : when the received event only needs to know that the dependencies are there in order to be processed later.
  • (Optional but recommended) A time out message when the dependency does not arrive. In this case you will decide if waiting more, request the dependency (if it is possible), continue the processing without the dependency (if it is possible), discard the event, ...


Conductor for our orchestra of processes

Barenboim is a OTP application which will help you with this task.

In the process where the received event is being processed, define how to check/request your dependency and call the function get_data() of Barenboim module.



fun = fn(dependency_ref) -> MyDataModule.get(dependency_ref) end
{:ok, data} = Barenboim.get_data(dependency_ref, fun)
The checking phase will be executed in the same process, but if the dependency is not available yet, the process will wait for a message from Barenboim which will send the dependency when this one is ready.

Meanwhile, another process receives the wanted event and process it. When this event is processed, this process has to notify to Barenboim that the event is ready.



# Only the reference
Barenboim.notify({:reference, dependency_ref})

# Or the data
Barenboim.notify({:data, dependency_ref, dependency_data})
I highly recommend to pass a time out to the function get_data(), if not, it will wait forever (maybe it fits your needs because you are very sure that all the events will arrive). In that case, you can receive a time out message:

case Barenboim.get_data(dependency_ref, fun, 5_000) do
 {:ok, data} -> # go on
  {:timeout, any} -> # decide what you can do
end
Barenboim uses a pool of process ( poolboy ) and can be configured it (check the docs ).

Enjoy Streaming !!!