Decomposing Asynchronous Workers in Ruby

Decomposing Asynchronous Workers in Ruby

Kevin Buchanan
Kevin Buchanan

May 04, 2015

I'm a proponent of building small, composable pieces over one large piece that does many things. The large piece usually works well, but it eliminates the possiblity of using its components to do something new and unforeseen. Smaller pieces leave open the potential for a system that truly is greater than the sum of its parts. If we take a look at some common pieces of our systems, we can potentially come away with new ways to compose the same functionality with multiple smaller pieces.

Take asynchronous workers, for example. A lot of times, we have work that we want to get done asynchronously. In Ruby, using Redis through Sidekiq is a great way to do this. Sidekiq is simple to work with, hides the implementation details of Redis, and does a great job managing your worker processes. If we wanted to import data from some type of file as a background process, we could write a Sidekiq worker that looks like this:

class FileImportWorker
		include Sidekiq::Worker

		sidekiq_options retry: 5

		sidekiq_retry_in { |count| count + 30 }

		sidekiq_retries_exhausted do |message|
				Logger.error("Out of retries!", error: message["error_message"])
		end

		def perform(file_id)
				file = PaymentFile.find(file_id)
				# Code that imports all the data
		end
end

And we could use Sidekiq's simple API to enqueue a job:

file = PaymentFile.create(data: params[:data])

FileImportWorker.perform_async(file.id)

That was easy. You'll notice that we've configured our worker to retry a file import job 5 times if we encounter exceptions when performing the job. We've also configured how we calculate the backoff, or interval between our retries, and what to do when we've exhausted our retry attempts for a specific job. So, Sidekiq will call the perform method, retry our job 5 times if it fails, each time backing off 30 seconds plus 1 second for each time we've already tried, and when we're out of retries we want to simply log that the job has failed.

But, that's starting to seem like a lot of behavior, and maybe that behavior is crucial enough to our application that we want to have more control over it, or want one place to go to change how we retry asynchronous tasks in our application. If we consider this retry, backoff, failure logic a key feature of our app, we probably don't want to be coupled to using Sidekiq for this. What if tomorrow we want to switch to a different background processing framework? What if we want to perform the file import outside of our Sidekiq worker? What if we want to start our retry process without queueing a job first?

Let's consider how we can break up our worker into smaller, composable pieces. We could think about this process as four separate pieces. I like to do this by thinking about the what, when, where, and how of our feature. The where is Sidekiq and Redis, which is where we want to put and take jobs. The what is the business logic of importing a file, previously implemented in the worker itself. The how is our retry logic and then when is our backoff interval. We probably have other notions of when in our domain as well—like performing in a certain amount of time, or performing at a specified time.

What

Let's move the file import code into a separate Command object.

class FileImport
		def self.perform(file)
				new(file).perform
		end

		def initialize(file)
				@file = file
		end

		def perform
				# Code that imports all the data
		end
end

When

We can also create some small classes that are solely responsible for determining when things get done.

class PerformAt
		def initialize(options)
				@options = options
				@queue = options.fetch(:queue, FileQueue)
				@time = options.fetch(:time, Time.now)
		end

		def perform(command, *args)
				@queue.schedule_job(command, *args, @options.merge({ time: @time }))
		end
end

class PerformIn
		def initialize(options)
				@options = options
				@queue = options.fetch(:queue, FileQueue)
				@interval = options.fetch(:interval, 5)
		end

		def perform(command, *args)
				time = Time.now + @interval
				@queue.schedule_job(command, *args, @options.merge({ time: time }))
		end
end

class Backoff
		def initialize(options)
				@options = options
				@queue = options.fetch(:queue, FileQueue)
		end

		def perform(command, *args)
				backoff = 40 + @options.fetch(:count, 0)
				time = Time.now + backoff.seconds
				@queue.schedule_job(command, *args, @options.merge({ time: time }))
		end
end

These classes are very similar, so there is potential to refactor towards reducing duplication between them.

How

How we execute a command involves performing the command with the arguments, rescuing exceptions, and either retrying if there are remaining retries or failing if there are no remaining retries.

class PerformWithRetry
		DEFAULT_RETRIES = 5

		def initialize(options)
				@options = options
				@retries = options.fetch(:retries, DEFAULT_RETRIES)
				@count = DEFAULT_RETRIES - @retries
		end

		def perform(command, *args)
				command.perform(*args)
				rescue => e
						if @retries > 0
								Logger.info("Retrying...", error: e, command: command, args: args)
								new_options = @options.merge({ retries: @retries - 1, count: @count + 1 })
								Backoff.new(new_options).perform(command, *args)
						else
								Logger.error("Out of retries!", error: e, command: command, args: args)
						end
		end
end

Where

We can simplify our job enqueuing by creating a worker that takes a command, a file, and some options, and simply serializes and enqueues, then dequeues and deserializes our job data. Here we'll use Sidekiq's .perform_at method to actually enqueue the job and Sidekiq will call the worker's perform method when it dequeues a job.

class FileQueue
		include Sidekiq::Worker

		sidekiq_options retry: false

		def self.schedule_job(command, file, options)
				file_data = { id: file.id }
				time = options.fetch(:time, Time.now)
				perform_at(time, command, file_data, options)
		end

		def perform(command_name, file_data, options)
				file = File.find(file_data["id"])
				options = options.reduce({}) { |m, (k, v)| m[k.to_sym] = v; m }
				PerformWithRetry.new(options).perform(command_name.constantize, file)
		end
end

Now, when we want to queue an asynchronous job, we just use PerformAt, PerformIn, Backoff, or PerformWithRetry, with a file command and its arguments:

file = PaymentFile.create(data: params[:data])

PerformAt.new({ time: Time.now + 1.day }).perform(FileImport, file)
PerformIn.new({ time: 10.minutes }).perform(FileImport, file)
Backoff.new({}).perform(FileImport, file)
PerformWithRetry.new({ retries: 10 }).perform(FileExport, file)

There are a few benefits here. First, we now have a coherent interface for performing an abstract piece of work at an abstract time—either asynchronously, or synchronously with asynchronous retries. We've now completely hidden the specifics of our job queueing implementation in Sidekiq behind our own domain objects. We could swap out Sidekiq for something else tomorrow and know that our core app logic is going to stay the same, without having to make more changes. We also get the benefit of making it clear to everyone working in our codebase how our retry logic works, and where to go if we want to change how we backoff or handle failed jobs.

There are more pieces here, and the overhead might not be worth it for domains that aren't very dependent on this behavior. But, if we have a crucial feature coupled to a larger piece of our codebase, refactoring toward smaller pieces can often insulate our code from changes to our lower level implementation details, and allow us to compose different behavior from these small pieces.