Tutorial

Run one-off tasks in Phoenix

This post was updated 27 Mar

oban

If you run a web application with users in production, you have surely had the need to run a task that changes the database for one or more users. This is something usually called one-off tasks. They are a type of background job or batch process that is designed to run only once, or a limited number of times, to accomplish a specific goal.

You can either set them up to run when the application boots, schedule them in a queue, or manually log into the server and run them. In this tutorial, I will show my preferred way of doing it with the help of Oban when the application deploys and boots.

Why Oban?

The reason I use Oban and not natively in Elixir is because I can easily set up the job to be unique, which is important if I have a multi-node environment so I can make sure the task is only run once. The other is that Oban has a (paid) web interface and I can see that the task has been executed.

Generate the Task schema and migration

To keep track of the one-off tasks, so they are not executed multiple times, I store the execution in the database. For that, I need a simple table that only has a string column called module. The idea is to store just the module name for the task. That means they should be unique, and if you delete the files at some point, then you can clear the database.

However, the command for generating the schema and migration is:

mix phx.gen.schema OneOffs.Task one_off_tasks module

I want to make sure that the module field can’t be null, so I am adding a not-null check. I am also adding a unique constraint on the module name.

# priv/repo/migrations/XXXXXXXXXX_create_one_off_tasks.exs
defmodule MyApp.Repo.Migrations.CreateOneOffTasks do
use Ecto.Migration
def change do
create table(:one_off_tasks, primary_key: false) do
add :id, :binary_id, primary_key: true
add :module, :string, null: false
timestamps(updated_at: false)
end
create unique_index(:one_off_tasks, [:module])
end
end

Also, one more thing. This table should never be updated, so I can just skip having the updated_at field.

The only thing I need to change in the schema file is to also tell it to ignore the updated_at.

# lib/my_app/one_offs/task.ex
schema "one_off_tasks" do
field :module, :string
timestamps(updated_at: false)
end

With these changes in place, I can now run the migrations.

mix ecto.migrate

Create the OneOffs module

I only need the basic functions for this module to start with. It’s listing tasks that have already run and persisting tasks in the database.

# lib/my_app/one_offs/one_offs.ex
defmodule MyApp.OneOffs do
import Ecto.Query, warn: false
alias MyApp.Repo
alias MyApp.OneOffs.Task
def list_tasks() do
Repo.all(Task)
end
def create_task(attrs \\ %{}) do
%Task{}
|> Task.changeset(attrs)
|> Repo.insert()
end
end

Add execute task logic and an example task

Next step is to add execute/0, which is the function that is responsible for looping through and executing each task that hasn’t already been executed.

# lib/my_app/one_offs/one_offs.ex
@tasks_to_run [
MyApp.OneOffs.Tasks.ExampleTask,
# add other tasks that you want to run here
]
def execute do
executed_tasks = Enum.map(list_tasks(), & &1.module)
Enum.each(@tasks_to_run, fn module ->
module_string = inspect(module)
if Enum.member?(executed_tasks, module_string) == false do
module.execute()
create_task(%{module: module_string})
end
end)
end

As you can see, I reference a task called MyApp.OneOffs.Tasks.ExampleTask. I have set it up so these tasks should live in the same folder and all task modules should also implement execute/0.

# lib/my_app/one_offs/tasks/example_task.ex
defmodule MyApp.OneOffs.Tasks.ExampleTask do
@moduledoc """
This is just an example task that doesn't do anything.
"""
def execute do
# Add your logic here
:ok
end
end

Set up an Oban worker

Since I have Oban to be responsible for this, I need a worker. It is basically a default worker with the only addition being unique for 120 seconds. If you have a complex setup with multi-nodes, you might need to look at the options that Oban Pro provides.

# lib/my_app/one_offs/run_one_offs_worker.ex
defmodule MyApp.OneOffs.RunOneOffsWorker do
use Oban.Worker, unique: [period: 120]
alias MyApp.OneOffs
@impl Oban.Worker
def perform(_job) do
OneOffs.execute()
:ok
end
end

After I have created the worker, I also need to add it to the config. In the Oban config, I need to add this to the crontab. And I tell it to run on @reboot.

# config/config.exs
plugins: [
{Oban.Plugins.Cron, crontab: [
{"@reboot", MyApp.OneOffs.RunOneOffsWorker}, # <-- Add
]}
]

Now, when you start the server, either in development or production, that Oban job will run and execute all the tasks that you have added to the @tasks_to_run list. And of course, only those that have not been run before.

Conclusion

This is basically all I need to set up and organise one-off tasks in my Elixir and Phoenix applications. I have added this as an option to the boilerplates that you can download here:

https://livesaaskit.com/starterkit/new

Go ahead and try it out there and then you also get to see how I tested this.

Related Tutorials

Published 05 May - 2021
Updated 27 Mar

How to setup recurring jobs with Oban in Elixir

Oban has proven itself to be the most versatile job processing library in Elixir and Phoenix. Coming from Sidekiq, it feels very familiar and suppor..