There are times when you'll need to emit events from your application code for consumption by other applications. Sometimes you'll want to schedule tasks to run in the background with resque, or notify integration partners about workflow events to that they can query your API. There are some concurrency issues that you'll need to deal with, though.

The naïve approach

The first quick and dirty approach is to just emit your events straight from your application code. It's simple and at first glance, it works.

def create_widget(params)
  widget =
  NotificationService.notify("widget_created", widget)
  return widget

Everything should work. But its effectiveness is limited to extremely simple cases, because bad things will happen if we try to use this method in a more complicated workflow.

Complicated transactions

Eventually an application will grow to the point where doing only one thing in a transaction is no longer sufficient. This is where we can start to introduce concurrency problems.

Consider the case where we re-use the previous method in the context of a contrived elaborate workflow:

module InventoryManagement
extend self

  def store_widgets(widgets)
    raise InsufficentSpace unless sufficent_space_for?(widgets)


  def sufficent_space_for?(widgets)
    # ...

  class InsufficentSpace < StandardError

def produce_widgets(requested_widgets)
  Database.transaction do
    produced_widgets = do |w|
    create_widget(w) # Persist the widgets, and also notify interested parties
  # whoops, this will raise and roll back everything if there isn't enough space!

There are two problems that will arise when we transmit events before the database has had a chance to commit the transaction:

  1. If the transaction takes a long time to commit, the consumers will receive the notification about the event before the point of truth about that event has actually been saved. Consumers that attempt to react to that notification (e.g. by requesting additional information) won't be able to find any information yet. They'll error out and have to try again, either with manual intervention or a hacky auto-retry mechanism. Suboptimal.
  2. The transaction might roll back, but the notifications have already been sent. Unfortunately, this manifests in the same way as the above problem. The information that you're searching for isn't there! And it never will be!

There's also some problems related to tying directly into an external service:

  1. Requests to the external service will block the user's request, with the potential to be a very significant delay when that external service experiences degraded performance
  2. When that external dependency goes down, your app will go down with it, but only for requests that need to communicate with the notification service
  3. If the external service fails to deliver the messages, we have no record of the events that should have been sent, and no way to retransmit them

We can avoid all these problems with an architectural change.

The notification store

Instead of emitting the notification right away, we can store the notification data to the same transactional database. These events will be written atomically along with the rest of the application's transaction.

module NotificationService
extend self

  def store(event_name, item)
    content = notification_content_for(item)
    Database.execute("INSERT INTO notifications VALUES($1)", [content])

  def notification_content_for(item)
    # ...

We're free to litter calls to all throughout our workflow code, confident that it will integrate seamlessly into any complex transactional actions. This also implicitly means we can also call functions that use with the same confidence. There aren't any unsynchronized side effects to consider anymore.

Getting the data to its destination (aka The Shovel)

Now that we're sure that we're only generating events for transactions that have finished, how do we actually notify our consumers?

We've got a worker polling like this:

def tick
  Database.transaction do
    results = Database.execute(<<-SQL, [1000])
      UPDATE notifications SET processed = true
      WHERE id IN
        (SELECT id FROM notifications
        WHERE processed = false
        ORDER BY id ASC
        LIMIT $1)
    results.each do |row|

In this example, we're talking to the external service from within the context of a database transaction. If anything goes wrong during transmission, like a network error, the UPDATE will be rolled back and we'll have the opportunity to retransmit the events. Since this is a background task, if something goes wrong, we'll have the opportunity to retry it until the service comes back, with nothing lost. The users may experience a delay in delivery until the external service recovers, but the app won't crash in their face and no notifications will be lost.

There's a couple of implementation details that are noteworthy:

Addendum: Don't like polling?

If you don't like polling, you could look into using something like PostgreSQL's LISTEN/NOTIFY. Relevant excerpt:

if a NOTIFY is executed inside a transaction, the notify events are not delivered until and unless the transaction is committed

We haven't used this, because we find polling to be sufficient for our purposes. But it should provide a good mechanism to provide both publish/subscribe and persistent, transaction safe notifications, if you're in the market for that.