From 48b399cedb7d46ea0f08181cfbe4df222861f65b Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 16 Jan 2021 20:22:14 +0300 Subject: [PATCH] [#3213] Refactoring of HashtagsTableMigrator. Hashtag timeline performance optimization (auto switch to non-aggregate join strategy when efficient). --- CHANGELOG.md | 1 + config/description.exs | 6 ++ .../migrators/hashtags_table_migrator.ex | 47 +++++++----- .../hashtags_table_migrator/state.ex | 9 +-- lib/pleroma/web/activity_pub/activity_pub.ex | 72 +++++++++++-------- .../web/activity_pub/activity_pub_test.exs | 4 +- 6 files changed, 86 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25b24bf07..9a053156f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Search: When using Postgres 11+, Pleroma will use the `websearch_to_tsvector` function to parse search queries. - Emoji: Support the full Unicode 13.1 set of Emoji for reactions, plus regional indicators. - Admin API: Reports now ordered by newest +- Extracted object hashtags into separate table in order to improve hashtag timeline performance (via background migration in `Pleroma.Migrators.HashtagsTableMigrator`). ### Added diff --git a/config/description.exs b/config/description.exs index f438a88ab..c73d50f7d 100644 --- a/config/description.exs +++ b/config/description.exs @@ -941,6 +941,12 @@ key: :show_reactions, type: :boolean, description: "Let favourites and emoji reactions be viewed through the API." + }, + %{ + key: :improved_hashtag_timeline, + type: :keyword, + description: + "If `true` / `:prefer_aggregation` / `:avoid_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." } ] }, diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 9f1a00f9c..b40578d50 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -45,25 +45,23 @@ def init(_) do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - put_stat(:status, :init) + update_status(:init) - dm = data_migration() + data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) cond do Config.get(:env) == :test -> - put_stat(:status, :noop) + update_status(:noop) - is_nil(dm) -> - put_stat(:status, :halt) - put_stat(:message, "Data migration does not exist.") + is_nil(data_migration) -> + update_status(:halt, "Data migration does not exist.") - dm.state == :manual or dm.name in manual_migrations -> - put_stat(:status, :noop) - put_stat(:message, "Data migration is in manual execution state.") + data_migration.state == :manual or data_migration.name in manual_migrations -> + update_status(:noop, "Data migration is in manual execution state.") - dm.state == :complete -> - handle_success() + data_migration.state == :complete -> + handle_success(data_migration) true -> send(self(), :migrate_hashtags) @@ -81,7 +79,7 @@ def handle_info(:migrate_hashtags, state) do {:ok, data_migration} = DataMigration.update(data_migration, %{state: :running, data: persistent_data}) - put_stat(:status, :running) + update_status(:running) Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") @@ -146,13 +144,12 @@ def handle_info(:migrate_hashtags, state) do ) do _ = DataMigration.update_state(data_migration, :complete) - handle_success() + handle_success(data_migration) else _ -> _ = DataMigration.update_state(data_migration, :failed) - put_stat(:status, :failed) - put_stat(:message, "Please check data_migration_failed_ids records.") + update_status(:failed, "Please check data_migration_failed_ids records.") end {:noreply, state} @@ -196,16 +193,25 @@ defp persist_stats(data_migration) do _ = DataMigration.update(data_migration, %{data: runner_state}) end - defp handle_success do - put_stat(:status, :complete) + defp handle_success(data_migration) do + update_status(:complete) - unless Config.improved_hashtag_timeline() do + unless data_migration.feature_lock || Config.improved_hashtag_timeline() do Config.put(Config.improved_hashtag_timeline_path(), true) end :ok end + def failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> order_by([o], asc: o.id) + end + def force_continue do send(whereis(), :migrate_hashtags) end @@ -214,4 +220,9 @@ def force_restart do {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) force_continue() end + + defp update_status(status, message \\ nil) do + put_stat(:status, status) + put_stat(:message, message) + end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index 79926892c..c1a2709fc 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -2,23 +2,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do use Agent @init_state %{} + @reg_name {:global, __MODULE__} def start_link(_) do - Agent.start_link(fn -> @init_state end, name: __MODULE__) + Agent.start_link(fn -> @init_state end, name: @reg_name) end def get do - Agent.get(__MODULE__, & &1) + Agent.get(@reg_name, & &1) end def put(key, value) do - Agent.update(__MODULE__, fn state -> + Agent.update(@reg_name, fn state -> Map.put(state, key, value) end) end def increment(key, increment \\ 1) do - Agent.update(__MODULE__, fn state -> + Agent.update(@reg_name, fn state -> updated_value = (state[key] || 0) + increment Map.put(state, key, updated_value) end) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index f5563b0fd..0609827ec 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -669,63 +669,66 @@ defp restrict_since(query, %{since_id: since_id}) do defp restrict_since(query, _), do: query - defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do + defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end - defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_tag_reject(query, %{tag_reject: [tag_reject]}) + defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) + when is_binary(tag_reject) do + restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]}) end - defp restrict_tag_reject(query, _), do: query + defp restrict_embedded_tag_reject(query, _), do: query - defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do + defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do + defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) ) end - defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_tag(query, %{tag: tag}) + defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do + restrict_embedded_tag(query, %{tag: tag}) end - defp restrict_tag_all(query, _), do: query + defp restrict_embedded_tag_all(query, _), do: query - defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do + defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_tag(query, %{tag: tag}) when is_list(tag) do + defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end - defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do - restrict_tag(query, %{tag: [tag]}) + defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do + restrict_embedded_tag(query, %{tag: [tag]}) end - defp restrict_tag(query, _), do: query + defp restrict_embedded_tag(query, _), do: query - defp restrict_hashtag(query, opts) do - [tag_any, tag_all, tag_reject] = - [:tag, :tag_all, :tag_reject] - |> Enum.map(&opts[&1]) - |> Enum.map(&List.wrap(&1)) + defp hashtag_conditions(opts) do + [:tag, :tag_all, :tag_reject] + |> Enum.map(&opts[&1]) + |> Enum.map(&List.wrap(&1)) + end + defp restrict_hashtag_agg(query, opts) do + [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) cond do @@ -1275,15 +1278,19 @@ def fetch_activities_query(recipients, opts \\ %{}) do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - cond do - Config.object_embedded_hashtags?() -> - query - |> restrict_tag(opts) - |> restrict_tag_reject(opts) - |> restrict_tag_all(opts) + hashtag_timeline_strategy = Config.improved_hashtag_timeline() - # TODO: benchmark (initial approach preferring non-aggregate ops when possible) - Config.improved_hashtag_timeline() == :join -> + cond do + !hashtag_timeline_strategy -> + query + |> restrict_embedded_tag(opts) + |> restrict_embedded_tag_reject(opts) + |> restrict_embedded_tag_all(opts) + + hashtag_timeline_strategy == :prefer_aggregation -> + restrict_hashtag_agg(query, opts) + + hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) -> query |> distinct([activity], true) |> restrict_hashtag_any(opts) @@ -1291,10 +1298,17 @@ def fetch_activities_query(recipients, opts \\ %{}) do |> restrict_hashtag_reject_any(opts) true -> - restrict_hashtag(query, opts) + restrict_hashtag_agg(query, opts) end end + defp avoid_hashtags_aggregation?(opts) do + [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) + + joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0 + Enum.empty?(tag_reject) and joins_count <= 2 + end + def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do list_memberships = Pleroma.List.memberships(opts[:user]) diff --git a/test/pleroma/web/activity_pub/activity_pub_test.exs b/test/pleroma/web/activity_pub/activity_pub_test.exs index f86d0a265..36fd65c76 100644 --- a/test/pleroma/web/activity_pub/activity_pub_test.exs +++ b/test/pleroma/web/activity_pub/activity_pub_test.exs @@ -217,8 +217,8 @@ test "it fetches the appropriate tag-restricted posts" do {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"}) {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"}) - for new_timeline_enabled <- [true, false] do - clear_config([:instance, :improved_hashtag_timeline], new_timeline_enabled) + for hashtag_timeline_strategy <- [true, :prefer_aggregation, :avoid_aggregation, false] do + clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy) fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})