Emitting events from your application without breaking everything
There are times when you'll need to emit events from your application code for consumption by other applications. Sometimes you'll want to schedule tasks to run in the background with resque, or notify integration partners about workflow events to that they can query your API. There are some concurrency issues that you'll need to deal with, though.
The naïve approach
The first quick and dirty approach is to just emit your events straight from your application code. It's simple and at first glance, it works.
def create_widget(params) widget = Widget.new(params) WidgetRepository.persist!(widget) NotificationService.notify("widget_created", widget) return widget end
Everything should work. But its effectiveness is limited to extremely simple cases, because bad things will happen if we try to use this method in a more complicated workflow.
Eventually an application will grow to the point where doing only one thing in a transaction is no longer sufficient. This is where we can start to introduce concurrency problems.
Consider the case where we re-use the previous method in the context of a contrived elaborate workflow:
module InventoryManagement extend self def store_widgets(widgets) raise InsufficentSpace unless sufficent_space_for?(widgets) InventoryRepository.persist!(widgets) end def sufficent_space_for?(widgets) # ... end class InsufficentSpace < StandardError end end def produce_widgets(requested_widgets) Database.transaction do produced_widgets = requested_widgets.map do |w| create_widget(w) # Persist the widgets, and also notify interested parties end InventoryManagement.store_widgets(produced_widgets) # whoops, this will raise and roll back everything if there isn't enough space! end end
There are two problems that will arise when we transmit events before the database has had a chance to commit the transaction:
- If the transaction takes a long time to commit, the consumers will receive the notification about the event before the point of truth about that event has actually been saved. Consumers that attempt to react to that notification (e.g. by requesting additional information) won't be able to find any information yet. They'll error out and have to try again, either with manual intervention or a hacky auto-retry mechanism. Suboptimal.
- The transaction might roll back, but the notifications have already been sent. Unfortunately, this manifests in the same way as the above problem. The information that you're searching for isn't there! And it never will be!
There's also some problems related to tying directly into an external service:
- Requests to the external service will block the user's request, with the potential to be a very significant delay when that external service experiences degraded performance
- When that external dependency goes down, your app will go down with it, but only for requests that need to communicate with the notification service
- If the external service fails to deliver the messages, we have no record of the events that should have been sent, and no way to retransmit them
We can avoid all these problems with an architectural change.
The notification store
Instead of emitting the notification right away, we can store the notification data to the same transactional database. These events will be written atomically along with the rest of the application's transaction.
module NotificationService extend self def store(event_name, item) content = notification_content_for(item) Database.execute("INSERT INTO notifications VALUES($1)", [content]) end def notification_content_for(item) # ... end end
We're free to litter calls to NotificationService.store all throughout our workflow code, confident that it will integrate seamlessly into any complex transactional actions. This also implicitly means we can also call functions that use NotificationService.store with the same confidence. There aren't any unsynchronized side effects to consider anymore.
Getting the data to its destination (aka The Shovel)
Now that we're sure that we're only generating events for transactions that have finished, how do we actually notify our consumers?
We've got a worker polling like this:
def tick Database.transaction do results = Database.execute(<<-SQL, ) UPDATE notifications SET processed = true WHERE id IN (SELECT id FROM notifications WHERE processed = false ORDER BY id ASC LIMIT $1) RETURNING * SQL results.each do |row| NotificationService.send_notification!(row["content"]) end end end
In this example, we're talking to the external service from within the context of a database transaction. If anything goes wrong during transmission, like a network error, the UPDATE will be rolled back and we'll have the opportunity to retransmit the events. Since this is a background task, if something goes wrong, we'll have the opportunity to retry it until the service comes back, with nothing lost. The users may experience a delay in delivery until the external service recovers, but the app won't crash in their face and no notifications will be lost.
There's a couple of implementation details that are noteworthy:
- The consumers of the notifications in this example must be *idempotent*. They should be able to safely process duplicate notifications. We achieved this in practice by adding a uuid to each notification. For the purposes of our example, this uuid would have been added by NotificationService.notification_content_for. If a consumer receives a notification with a uuid they've seen before, they can safely discard it.
- The notifications table has a processed flag that's been set to false by default. This is not the only way to maintain this state, but it is probably the simplest. Whatever you choose to do, make sure that you use a set-based approach. Either maintain a list of notifications you have already completed, or a list of notifications that you haven't completed and use inclusion or exclusion to query for the notifications that still need to be processed. Don't rely on the ids of events to be gapless or in chronological order, because that will also introduce concurrency problems.
Addendum: Don't like polling?
If you don't like polling, you could look into using something like PostgreSQL's LISTEN/NOTIFY. Relevant excerpt:
if a NOTIFY is executed inside a transaction, the notify events are not delivered until and unless the transaction is committed
We haven't used this, because we find polling to be sufficient for our purposes. But it should provide a good mechanism to provide both publish/subscribe and persistent, transaction safe notifications, if you're in the market for that.