From 23d714ed3038a24eb78314d52807c46d8b8de2f3 Mon Sep 17 00:00:00 2001 From: href Date: Wed, 8 Jul 2020 13:22:42 +0200 Subject: [PATCH] Fix race in enforcer/reclaimer start --- lib/pleroma/gun/connection_pool/reclaimer.ex | 85 +++++++++++++++++++ .../gun/connection_pool/worker_supervisor.ex | 81 +----------------- 2 files changed, 89 insertions(+), 77 deletions(-) create mode 100644 lib/pleroma/gun/connection_pool/reclaimer.ex diff --git a/lib/pleroma/gun/connection_pool/reclaimer.ex b/lib/pleroma/gun/connection_pool/reclaimer.ex new file mode 100644 index 000000000..1793ac3ee --- /dev/null +++ b/lib/pleroma/gun/connection_pool/reclaimer.ex @@ -0,0 +1,85 @@ +defmodule Pleroma.Gun.ConnectionPool.Reclaimer do + use GenServer, restart: :temporary + + @registry Pleroma.Gun.ConnectionPool + + def start_monitor() do + pid = + case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do + {:ok, pid} -> + pid + + {:error, {:already_registered, pid}} -> + pid + end + + {pid, Process.monitor(pid)} + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :reclaim}} + end + + @impl true + def handle_continue(:reclaim, _) do + max_connections = Pleroma.Config.get([:connections_pool, :max_connections]) + + reclaim_max = + [:connections_pool, :reclaim_multiplier] + |> Pleroma.Config.get() + |> Kernel.*(max_connections) + |> round + |> max(1) + + :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{ + max_connections: max_connections, + reclaim_max: reclaim_max + }) + + # :ets.fun2ms( + # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] -> + # {worker_pid, crf, last_reference} end) + unused_conns = + Registry.select( + @registry, + [ + {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]} + ] + ) + + case unused_conns do + [] -> + :telemetry.execute( + [:pleroma, :connection_pool, :reclaim, :stop], + %{reclaimed_count: 0}, + %{ + max_connections: max_connections + } + ) + + {:stop, :no_unused_conns, nil} + + unused_conns -> + reclaimed = + unused_conns + |> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} -> + crf1 <= crf2 and last_reference1 <= last_reference2 + end) + |> Enum.take(reclaim_max) + + reclaimed + |> Enum.each(fn {pid, _, _} -> + DynamicSupervisor.terminate_child(Pleroma.Gun.ConnectionPool.WorkerSupervisor, pid) + end) + + :telemetry.execute( + [:pleroma, :connection_pool, :reclaim, :stop], + %{reclaimed_count: Enum.count(reclaimed)}, + %{max_connections: max_connections} + ) + + {:stop, :normal, nil} + end + end +end diff --git a/lib/pleroma/gun/connection_pool/worker_supervisor.ex b/lib/pleroma/gun/connection_pool/worker_supervisor.ex index 5cb8d488a..39615c956 100644 --- a/lib/pleroma/gun/connection_pool/worker_supervisor.ex +++ b/lib/pleroma/gun/connection_pool/worker_supervisor.ex @@ -29,89 +29,16 @@ def start_worker(opts, retry \\ false) do end end - @registry Pleroma.Gun.ConnectionPool - @enforcer_key "enforcer" defp free_pool do - case Registry.lookup(@registry, @enforcer_key) do - [] -> - pid = - spawn(fn -> - {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil) - max_connections = Pleroma.Config.get([:connections_pool, :max_connections]) - - reclaim_max = - [:connections_pool, :reclaim_multiplier] - |> Pleroma.Config.get() - |> Kernel.*(max_connections) - |> round - |> max(1) - - :telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{ - max_connections: max_connections, - reclaim_max: reclaim_max - }) - - # :ets.fun2ms( - # fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] -> - # {worker_pid, crf, last_reference} end) - unused_conns = - Registry.select( - @registry, - [ - {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], - [{{:"$1", :"$3", :"$4"}}]} - ] - ) - - case unused_conns do - [] -> - :telemetry.execute( - [:pleroma, :connection_pool, :reclaim, :stop], - %{reclaimed_count: 0}, - %{ - max_connections: max_connections - } - ) - - exit(:no_unused_conns) - - unused_conns -> - reclaimed = - unused_conns - |> Enum.sort(fn {_pid1, crf1, last_reference1}, - {_pid2, crf2, last_reference2} -> - crf1 <= crf2 and last_reference1 <= last_reference2 - end) - |> Enum.take(reclaim_max) - - reclaimed - |> Enum.each(fn {pid, _, _} -> - DynamicSupervisor.terminate_child(__MODULE__, pid) - end) - - :telemetry.execute( - [:pleroma, :connection_pool, :reclaim, :stop], - %{reclaimed_count: Enum.count(reclaimed)}, - %{max_connections: max_connections} - ) - end - end) - - wait_for_enforcer_finish(pid) - - [{pid, _}] -> - wait_for_enforcer_finish(pid) - end + wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor()) end - defp wait_for_enforcer_finish(pid) do - ref = Process.monitor(pid) - + defp wait_for_reclaimer_finish({pid, mon}) do receive do - {:DOWN, ^ref, :process, ^pid, :no_unused_conns} -> + {:DOWN, ^mon, :process, ^pid, :no_unused_conns} -> :error - {:DOWN, ^ref, :process, ^pid, :normal} -> + {:DOWN, ^mon, :process, ^pid, :normal} -> :ok end end