Up And Running With Faktory In Elixir

Faktory is a server daemon that allows us to queue the jobs we want to be performed in the background of our applications. It also handles other aspects of job processing such as failures, retries, and queue prioritisation. In this post, we are going to look at how we can use the faktory_worker library to connect to Faktory from an Elixir application and process jobs.

Before we start, let's quickly sum up what we can do with this library.

Faktory Worker allows us to perform any kind of work, whether it's sending emails, taking customer payments, or resizing images. Any task we can perform in Elixir can likely be run in the background via Faktory. By using this library, all of the communication with Faktory is taken care of for us. This means we can focus on writing code that gets work done without concerning ourselves with complicated error handling and retry logic.

Let's get started by setting up a Faktory server.

Starting Faktory

There are a couple of ways to get Faktory set up for local development. My favourite approach is using Docker, so let's get started there.

To get Faktory running, it's as simple as using the following command.

  $ docker run --rm -it -p 7419:7419 -p 7420:7420 contribsys/faktory:latest

Once Docker has started the Faktory server, you can visit http://localhost:7420 to view the dashboard and confirm the server has started correctly.

I find this approach to be the fastest and most flexible way of using Faktory for local development. This allows us to easily run multiple servers mapped to different ports, allowing each new project to work with its own instance of Faktory.

Alternatively, Faktory can also be installed via homebrew if you are on macOS, and there are binary packages available if you are running Linux. Further instructions can be found on the project's installation page.

We have Faktory running, what's next?

The first thing we need is an Elixir application. You can use an existing project you already have and apply the following steps where needed, or you can start from scratch and follow along.

The first step is to create our application.

  $ mix new example_app

Next, we need to add the faktory_worker dependency to our mix.exs file.

  defp deps do
    [
      {:faktory_worker, "~> 1.0"}
    ]
  end

With this in place, we are now ready to configure our application. The recommended way to start Faktory Worker is to include it in a supervision tree.

Let's add a new application module to handle how our app gets started and add the FaktoryWorker module to its list of children.

  defmodule ExampleApp.Application do
    use Application

    def start(_, _) do
      children = [
        FaktoryWorker
      ]

      Supervisor.start_link(children,
        strategy: :one_for_one,
        name: ExampleApp.Supervisor
      )
    end
  end

Once we have this in place, we need to update the application section of our mix.exs file to ensure that our app gets started.

  def application do
    [
      mod: {ExampleApp.Application, []},
      extra_applications: [:logger]
    ]
  end

Now head back to the terminal and run the iex -S mix command to start the app.

If you take a look at the Faktory dashboard we viewed earlier, you will see there are several connections reported at the bottom of the page. This confirms our application has successfully started and connected to Faktory using the default configuration.

Let's perform some jobs!

Before we can perform our first job, we need to set up a worker module that will be responsible for doing the work.

A worker module has two requirements. It must pull in the Faktory Worker job functionality and define a function called perform that will be used to handle incoming jobs.

Let's get started by creating the simplest worker we can, the HelloWorldWorker.

  defmodule ExampleApp.HelloWorldWorker do
    use FaktoryWorker.Job

    def perform(name) do
      IO.puts("Hello #{name}!")
    end
  end

Here we used the FaktoryWorker.Job module, which pulls in the functionality required for sending and fetching jobs. We then defined the perform/1 function, which Faktory Worker will pass the job arguments into when it receives work to perform.

To send a job to Faktory we can call the perform_async/1 function that has been added to our worker by the FaktoryWorker.Job module.

Let's open up an iex session and say hello.

  iex(1)> ExampleApp.HelloWorldWorker.perform_async("Stuart")
  Hello Stuart
  :ok

Great, we have just sent a job to Faktory, fetched it back and performed some work. If you take another look at the Faktory dashboard, you will see that it now reports 1 job has been processed.

This was a simple example, let's try something slightly more interesting.

So far we have only performed work that accepts a binary input. Faktory Worker supports any data type that is JSON serialisable, such as a map for example. Let's create another worker that allows us to do some basic calculations.

  defmodule ExampleApp.MathWorker do
    use FaktoryWorker.Job

    def perform(%{"op" => "+"} = params) do
      log(params["x"] + params["y"])
    end

    def perform(%{"op" => "-"} = params) do
      log(params["x"] - params["y"])
    end

    defp log(result), do: IO.puts("The answer is: #{result}")
  end

Now back in iex, we can calculate some numbers.

  iex(1)> ExampleApp.MathWorker.perform_async(%{op: "+", x: 3, y: 5})
  The answer is: 8
  :ok
  iex(2)> ExampleApp.MathWorker.perform_async(%{op: "-", x: 10, y: 6})
  The answer is: 4
  :ok

Here we can see how to make use of pattern matching with our perform functions, allowing us to keep the different variations of the job cleanly separated.

You might have noticed, when we sent the map to Faktory we used atom keys. However, when we received the data back we got a map with binary keys. This is a side effect of the Faktory protocol since it uses JSON to transport data between the server and the client. This is something worth keeping in mind when building workers, which should save some debugging time when match errors begin to occur.

Jobs with multiple arguments

So far we have only been sending a single value to Faktory. Let's take a look at how we can send multiple pieces of data.

To do this, we need to pass a list into the perform_async/1 function. When we use multiple arguments we need to make sure we have a perform function that accepts the same number of arguments.

For example, if we were to send three pieces of data.

  MyWorker.perform_async([1, 2, 3])

The worker would need to define a perform/3 function. Let's refactor our MathWorker to accept multiple arguments instead of a map.

In the MathWorker module, replace the perform/1 functions with the new perform/3 version.

  def perform("+", x, y) do
    log(x + y)
  end

  def perform("-", x, y) do
    log(x - y)
  end

Now in iex, we can pass in our calculations using a list.

  iex(1)> ExampleApp.MathWorker.perform_async(["+", 3, 5])
  The answer is: 8
  :ok
  iex(1)> ExampleApp.MathWorker.perform_async(["-", 10, 6])
  The answer is: 4
  :ok

The ability to pass in multiple pieces of data comes into its own when we consider more complex use cases. For example, imagine we were building a worker to handle customer payments. We could merge all of the information into a single map and pass that to Faktory, however that doesn't feel right. Using multiple arguments allows us to keep the data in the same shape our application already expects whilst grouping it for the payment job.

For example, we could send a job that looked like the following.

  iex(1)> PaymentWorker.perform_async([customer, payment_info, shipping_items])

And then cleanly handle it in our worker perform function.

  def perform(customer, payment_info, shipping_items) do
    # do work
  end

What happens when things go wrong?

The work we perform in our workers is eventually going to go wrong. In software, this is a reality we cannot escape, let's see how the worker helps us out with this.

Faktory Worker is designed to be our failsafe and expects that any job being performed by the worker can crash at any point. This allows our workers to be very strict and expect to be successful. Faktory Worker will catch exceptions and report these as a failure to Faktory. In the case of a failed job, Faktory will attempt to retry the job using exponential backoff until it either succeeds or reaches the maximum number of retries. Because of this, we must make sure our workers are idempotent.

Let's build out an example of a job that sometimes fails. Create a new FailureWorker module like so.

  defmodule ExampleApp.FailureWorker do
    use FaktoryWorker.Job

    def perform(number) do
      random_number = Enum.random(1..3)

      if number == random_number do
        IO.puts("Yay, we found our number!")
      else
        raise "Nope, the number was #{random_number}"
      end
    end
  end

This worker will generate a random number (1, 2 or 3) and raise an exception if it doesn't match the number we provide as the job argument. This loosely mimics some of the strange blips we can sometimes see in production, such as a network connection dropping off or a database query timing out.

Let's jump back into iex and choose a number.

  iex(1)> ExampleApp.FailureWorker.perform_async(2)
  :ok
  20:36:12.551 [error] Task #PID<0.293.0> started from :worker_d117a98a82c04a50_8 terminating
  ** (RuntimeError) Nope, the number was 3
      (example_app) lib/example_app/failure_worker.ex:10: ExampleApp.FailureWorker.perform/1
      (elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
      (elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
      (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
  Function: &ExampleApp.FailureWorker.perform/1
      Args: [2]

It looks we chose the wrong number.

Looking at the error output we can see there was a RuntimeError raised in a process that was started by the worker. This is how Faktory Worker can handle our errors for us. All of our work is run in a process that is being monitored by the worker. If the process exits unexpectedly, the worker catches it and reports the error to Faktory.

If we open up the Faktory dashboard, we will there is 1 retry reported at the top of the page. Clicking on this will list all of the pending retries and allows us to click through to the failed job and view the details. Here we can see that our error message is listed under the error section along with a stack trace. If you don't see any retries listed, the job may have already retried and completed successfully.

It's worth noting that Faktory Worker only concerns itself with errors that occur whilst performing jobs. If the job completes successfully (i.e. does not crash with an exception) it will be treated as a successful job. Therefore returning some form of error tuple from the perform function will not fail the job. In this case, it's usually enough to be strict and match on the success tuple, allowing the match to fail if an error is returned. If you need to explicitly fail a job for any reason you can either raise an error like we did in the above example or call Process.exit/2 with an exit reason.

Summary

This post has covered the basics of getting up and running with Faktory, how to perform work and handling errors. There are many other aspects of the library we have not been able to cover here and I would recommend taking a look at the documentation for further details.

If you have any issues using the library please feel free to leave an issue on the Github repo or you can find me on the Elixir forum and slack channel.