From e31a5ff4af04156b9db8f032cf184e1da540f025 Mon Sep 17 00:00:00 2001 From: Lain Iwakura Date: Wed, 6 Dec 2017 16:51:11 +0100 Subject: [PATCH] Priority queue fixes. --- lib/pleroma/web/activity_pub/utils.ex | 7 ++++++- lib/pleroma/web/federator/federator.ex | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 51fac6fe2..ac20a2822 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -29,7 +29,12 @@ def generate_id(type) do Enqueues an activity for federation if it's local """ def maybe_federate(%Activity{local: true} = activity) do - Pleroma.Web.Federator.enqueue(:publish, activity) + priority = case activity.data["type"] do + "Delete" -> 10 + "Create" -> 1 + _ -> 5 + end + Pleroma.Web.Federator.enqueue(:publish, activity, priority) :ok end def maybe_federate(_), do: :ok diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f384b313c..b23ed5fcc 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -15,7 +15,7 @@ def start_link do enqueue(:refresh_subscriptions, nil) end) GenServer.start_link(__MODULE__, %{ - in: {:sets.new(), [], + in: {:sets.new(), []}, out: {:sets.new(), []} }, name: __MODULE__) end @@ -79,17 +79,17 @@ def handle(type, _) do {:error, "Don't know what do do with this"} end - def enqueue(type, payload) do + def enqueue(type, payload, priority \\ 1) do if Mix.env == :test do handle(type, payload) else - GenServer.cast(__MODULE__, {:enqueue, type, payload}) + GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) end end def maybe_start_job(running_jobs, queue) do if (:sets.size(running_jobs) < @max_jobs) && queue != [] do - {{:value, {type, payload}}, queue} = queue_pop(queue) + {{type, payload}, queue} = queue_pop(queue) {:ok, pid} = Task.start(fn -> handle(type, payload) end) mref = Process.monitor(pid) {:sets.add_element(mref, running_jobs), queue} @@ -98,14 +98,14 @@ def maybe_start_job(running_jobs, queue) do end end - def handle_cast({:enqueue, type, payload}, state) when type in [:incoming_doc] do + def handle_cast({:enqueue, type, payload, priority}, state) when type in [:incoming_doc] do %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state i_queue = enqueue_sorted(i_queue, {type, payload}, 1) {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} end - def handle_cast({:enqueue, type, payload}, state) do + def handle_cast({:enqueue, type, payload, priority}, state) do %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state o_queue = enqueue_sorted(o_queue, {type, payload}, 1) {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)