I'm a proponent of building small, composable pieces over one large piece that does many things. The large piece usually works well, but it eliminates the possiblity of using its components to do something new and unforeseen. Smaller pieces leave open the potential for a system that truly is greater than the sum of its parts. If we take a look at some common pieces of our systems, we can potentially come away with new ways to compose the same functionality with multiple smaller pieces.
Take asynchronous workers, for example. A lot of times, we have work that we want to get done asynchronously. In Ruby, using Redis through Sidekiq is a great way to do this. Sidekiq is simple to work with, hides the implementation details of Redis, and does a great job managing your worker processes. If we wanted to import data from some type of file as a background process, we could write a Sidekiq worker that looks like this:
class FileImportWorker
include Sidekiq::Worker
sidekiq_options retry: 5
sidekiq_retry_in { |count| count + 30 }
sidekiq_retries_exhausted do |message|
Logger.error("Out of retries!", error: message["error_message"])
end
def perform(file_id)
file = PaymentFile.find(file_id)
# Code that imports all the data
end
end
And we could use Sidekiq's simple API to enqueue a job:
file = PaymentFile.create(data: params[:data])
FileImportWorker.perform_async(file.id)
That was easy. You'll notice that we've configured our worker to retry a file import job 5 times if we encounter exceptions when performing the job. We've also configured how we calculate the backoff, or interval between our retries, and what to do when we've exhausted our retry attempts for a specific job. So, Sidekiq will call the perform
method, retry our job 5 times if it fails, each time backing off 30 seconds plus 1 second for each time we've already tried, and when we're out of retries we want to simply log that the job has failed.
But, that's starting to seem like a lot of behavior, and maybe that behavior is crucial enough to our application that we want to have more control over it, or want one place to go to change how we retry asynchronous tasks in our application. If we consider this retry, backoff, failure logic a key feature of our app, we probably don't want to be coupled to using Sidekiq for this. What if tomorrow we want to switch to a different background processing framework? What if we want to perform the file import outside of our Sidekiq worker? What if we want to start our retry process without queueing a job first?
Let's consider how we can break up our worker into smaller, composable pieces. We could think about this process as four separate pieces. I like to do this by thinking about the what, when, where, and how of our feature. The where is Sidekiq and Redis, which is where we want to put and take jobs. The what is the business logic of importing a file, previously implemented in the worker itself. The how is our retry logic and then when is our backoff interval. We probably have other notions of when in our domain as well—like performing in a certain amount of time, or performing at a specified time.
What
Let's move the file import code into a separate Command object.
class FileImport
def self.perform(file)
new(file).perform
end
def initialize(file)
@file = file
end
def perform
# Code that imports all the data
end
end
When
We can also create some small classes that are solely responsible for determining when things get done.
class PerformAt
def initialize(options)
@options = options
@queue = options.fetch(:queue, FileQueue)
@time = options.fetch(:time, Time.now)
end
def perform(command, *args)
@queue.schedule_job(command, *args, @options.merge({ time: @time }))
end
end
class PerformIn
def initialize(options)
@options = options
@queue = options.fetch(:queue, FileQueue)
@interval = options.fetch(:interval, 5)
end
def perform(command, *args)
time = Time.now + @interval
@queue.schedule_job(command, *args, @options.merge({ time: time }))
end
end
class Backoff
def initialize(options)
@options = options
@queue = options.fetch(:queue, FileQueue)
end
def perform(command, *args)
backoff = 40 + @options.fetch(:count, 0)
time = Time.now + backoff.seconds
@queue.schedule_job(command, *args, @options.merge({ time: time }))
end
end
These classes are very similar, so there is potential to refactor towards reducing duplication between them.
How
How we execute a command involves performing the command with the arguments, rescuing exceptions, and either retrying if there are remaining retries or failing if there are no remaining retries.
class PerformWithRetry
DEFAULT_RETRIES = 5
def initialize(options)
@options = options
@retries = options.fetch(:retries, DEFAULT_RETRIES)
@count = DEFAULT_RETRIES - @retries
end
def perform(command, *args)
command.perform(*args)
rescue => e
if @retries > 0
Logger.info("Retrying...", error: e, command: command, args: args)
new_options = @options.merge({ retries: @retries - 1, count: @count + 1 })
Backoff.new(new_options).perform(command, *args)
else
Logger.error("Out of retries!", error: e, command: command, args: args)
end
end
end
Where
We can simplify our job enqueuing by creating a worker that takes a command, a file, and some options, and simply serializes and enqueues, then dequeues and deserializes our job data. Here we'll use Sidekiq's .perform_at
method to actually enqueue the job and Sidekiq will call the worker's perform
method when it dequeues a job.
class FileQueue
include Sidekiq::Worker
sidekiq_options retry: false
def self.schedule_job(command, file, options)
file_data = { id: file.id }
time = options.fetch(:time, Time.now)
perform_at(time, command, file_data, options)
end
def perform(command_name, file_data, options)
file = File.find(file_data["id"])
options = options.reduce({}) { |m, (k, v)| m[k.to_sym] = v; m }
PerformWithRetry.new(options).perform(command_name.constantize, file)
end
end
Now, when we want to queue an asynchronous job, we just use PerformAt
, PerformIn
, Backoff
, or PerformWithRetry
, with a file command and its arguments:
file = PaymentFile.create(data: params[:data])
PerformAt.new({ time: Time.now + 1.day }).perform(FileImport, file)
PerformIn.new({ time: 10.minutes }).perform(FileImport, file)
Backoff.new({}).perform(FileImport, file)
PerformWithRetry.new({ retries: 10 }).perform(FileExport, file)
There are a few benefits here. First, we now have a coherent interface for performing an abstract piece of work at an abstract time—either asynchronously, or synchronously with asynchronous retries. We've now completely hidden the specifics of our job queueing implementation in Sidekiq behind our own domain objects. We could swap out Sidekiq for something else tomorrow and know that our core app logic is going to stay the same, without having to make more changes. We also get the benefit of making it clear to everyone working in our codebase how our retry logic works, and where to go if we want to change how we backoff or handle failed jobs.
There are more pieces here, and the overhead might not be worth it for domains that aren't very dependent on this behavior. But, if we have a crucial feature coupled to a larger piece of our codebase, refactoring toward smaller pieces can often insulate our code from changes to our lower level implementation details, and allow us to compose different behavior from these small pieces.