Chunk the notification type backfill migration

Long-term we want that migration to be done entirely in SQL,
but for now this is a hotfix to not cause OOMs on large databases.

This is using a homegrown version of `Repo.stream`, it's worse in
terms of performance than the upstream since it doesn't use the same
prepared query for chunk queries, but unlike the upstream it supports
preloads.
This commit is contained in:
rinpatch 2020-06-16 23:45:59 +03:00
parent b536e57124
commit 5c0e1039ce
2 changed files with 29 additions and 1 deletions

View File

@ -18,7 +18,7 @@ def fill_in_notification_types do
) )
query query
|> Repo.all() |> Repo.chunk_stream(100)
|> Enum.each(fn notification -> |> Enum.each(fn notification ->
type = type =
notification.activity notification.activity

View File

@ -8,6 +8,7 @@ defmodule Pleroma.Repo do
adapter: Ecto.Adapters.Postgres, adapter: Ecto.Adapters.Postgres,
migration_timestamps: [type: :naive_datetime_usec] migration_timestamps: [type: :naive_datetime_usec]
import Ecto.Query
require Logger require Logger
defmodule Instrumenter do defmodule Instrumenter do
@ -78,6 +79,33 @@ def check_migrations_applied!() do
:ok :ok
end end
end end
def chunk_stream(query, chunk_size) do
# We don't actually need start and end funcitons of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and
# have individual records be the elements of the stream, instead of
# lists of records
Stream.resource(
fn -> 0 end,
fn
last_id ->
query
|> order_by(asc: :id)
|> where([r], r.id > ^last_id)
|> limit(^chunk_size)
|> all()
|> case do
[] ->
{:halt, last_id}
records ->
last_id = List.last(records).id
{records, last_id}
end
end,
fn _ -> :ok end
)
end
end end
defmodule Pleroma.Repo.UnappliedMigrationsError do defmodule Pleroma.Repo.UnappliedMigrationsError do