Tutorial

Run one off tasks in Phoenix

oban

If you run a web application with users in production, you have surely had the need to run a task that changes the database for one or more users. This is something usually called one-off tasks. They are a type of background job or batch process that is designed to run only once, or a limited number of times, to accomplish a specific goal.

You can either set them up to run when the application boots, schedule them in a queue or manually log into the server and run them. In this tutorial, I will show my preferred way of doing it with the help of Oban when the application deploys and boots.

Why Oban?

The reason I use Oban and not natively in Elixir is because I can easily setup the job to be unique, which is important if I have a multi-node environment so I can make sure the task is only run once. The other is that Oban has a (paid) web interface and I can see that the task has been executed.

Generate the Task schema and migration

To keep track of the one off tasks, so they are not executed multiple times, I store the execution in the database. For that, I need a simple table and only has a string column called module. The idea is to store just the module name for the task. That means, they should be unique and if you delete the files at some point, then you can clear the database.

However, the command for generating the schema and migration is:

mix phx.gen.schema OneOffs.Task one_off_tasks module

I want to make sure that the module field can’t be null, so I am adding a not-null check. I also, adding a unique constraint on the module name.

defmodule Tutorial.Repo.Migrations.CreateOneOffTasks do
  use Ecto.Migration

  def change do
    create table(:one_off_tasks, primary_key: false) do
      add :id, :binary_id, primary_key: true
      add :module, :string, null: false

      timestamps(updated_at: false)
    end

    create unique_index(:one_off_tasks, [:module])
  end
end

Also, one more thing. This table should never be updated, so I can just skip having the updated_at fields.

The only thing I need to change in the schema file, is to also tell it to ignore the updated_at.

# lib/tutorial/one_offs/task.ex
schema "one_off_tasks" do
  field :module, :string

  timestamps(updated_at: false)
end

With these changes in place, I can now run the migrations.

mix ecto.migrate

Create the OneOffs module

I only need the basic functions for this module to start with. It’s listing tasks that have already ran and persisting tasks in the database.

# lib/tutorial/one_offs/one_offs.ex
defmodule Tutorial.OneOffs do
  import Ecto.Query, warn: false
  alias Tutorial.Repo
  alias Tutorial.OneOffs.Task

  def list_tasks() do
    Repo.all(Task)
  end

  def create_task(attrs \\ %{}) do
    %Task{}
    |> Task.changeset(attrs)
    |> Repo.insert()
  end
end

Add execute task logic and an example task

Next step is to add execute/0 , which is the function that is responsible for looping through and execute each task that hasn’t already been executed.

# lib/tutorial/one_offs/one_offs.ex
@tasks_to_run [
  Tutorial.OneOffs.Tasks.ExampleTask,
  # other other tasks that you wan't to run here
]

def execute do
  executed_tasks = Enum.map(list_tasks(), & &1.module)

  Enum.each(@tasks_to_run, fn module ->
    module_string = inspect(module)

    if Enum.member?(executed_tasks, module_string) == false do
      module.execute()

      create_task(%{module: module_string})
    end
  end)
end

As you can see, I reference a task called Tutorial.OneOffs.Tasks.ExampleTask. I have set it up so these tasks should live in the same folder and all task modules should also implement execute/0.

# lib/tutorial/one_offs/tasks/example_task.ex
defmodule Tutorial.OneOffs.Tasks.ExampleTask do
  @moduledoc """
  This is just an example task that doesn't do anything.
  """

  def execute do
		# Add your logic here
    :ok
  end
end

Setup a Oban worker

Since I have Oban to be responsible for this, I need a worker. It is basically a default worker with the only addition to be unique for 120 seconds. If you have a complex setup, with multi-nodes you might need to look at the options that Oban Pro provides.

# lib/tutorial/one_offs/run_one_offs_worker.ex
defmodule Tutorial.OneOffs.RunOneOffsWorker do
  use Oban.Worker, unique: [period: 120]

  alias Tutorial.OneOffs

  @impl Oban.Worker
  def perform(_job) do
    OneOffs.execute()
    :ok
  end
end

After I have created the worker, I also need to add it to the config. In the Oban config, I need to add the this to the crontab. And I tell it to run on @reboot.

# config/config.exs
plugins: [
  {Oban.Plugins.Cron, crontab: [
    {"@reboot", Tutorial.OneOffs.RunOneOffsWorker}, # <-- Add
  ]}
]

Now, when you start the server, either in development or production, that Oban job will run and execute all the tasks that you have added to @tasks_to_run list. And of course, that has not been ran before.

Conclusion

This is basically all I need to setup and organise one-offs tasks in my Elixir and Phoenix applications. I have added this as an option to the boilerplates that you can download here:

https://livesaaskit.com/starterkit/new

Go ahead to try it out there and then you also get to see how I tested this.

Related Tutorials

Published 05 May - 2021

How to setup recurring jobs with Oban in Elixir

Oban has proven itself to be the most versatile job processing library in Elixir and Phoenix. Coming from Sidekiq, it feels very familiar and suppor..