Merge branch 'http-fixes' into 'develop'

Improvements and fixes for http requests

See merge request pleroma/pleroma!2904
This commit is contained in:
rinpatch 2020-09-02 09:36:56 +00:00
parent ea4b6c64d6
commit dccbed856d
13 changed files with 95 additions and 57 deletions

View File

@ -739,19 +739,23 @@
config :pleroma, :pools, config :pleroma, :pools,
federation: [ federation: [
size: 50, size: 50,
max_waiting: 10 max_waiting: 10,
timeout: 10_000
], ],
media: [ media: [
size: 50, size: 50,
max_waiting: 10 max_waiting: 10,
timeout: 10_000
], ],
upload: [ upload: [
size: 25, size: 25,
max_waiting: 5 max_waiting: 5,
timeout: 15_000
], ],
default: [ default: [
size: 10, size: 10,
max_waiting: 2 max_waiting: 2,
timeout: 5_000
] ]
config :pleroma, :hackney_pools, config :pleroma, :hackney_pools,

View File

@ -124,7 +124,9 @@ defp download_build(frontend_info, dest) do
url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"]) url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"])
with {:ok, %{status: 200, body: zip_body}} <- with {:ok, %{status: 200, body: zip_body}} <-
Pleroma.HTTP.get(url, [], timeout: 120_000, recv_timeout: 120_000) do Pleroma.HTTP.get(url, [],
adapter: [pool: :media, timeout: 120_000, recv_timeout: 120_000]
) do
unzip(zip_body, dest) unzip(zip_body, dest)
else else
e -> {:error, e} e -> {:error, e}

View File

@ -83,6 +83,11 @@ def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do
end) end)
{ref, state} = pop_in(state.client_monitors[client_pid]) {ref, state} = pop_in(state.client_monitors[client_pid])
# DOWN message can receive right after `remove_client` call and cause worker to terminate
state =
if is_nil(ref) do
state
else
Process.demonitor(ref) Process.demonitor(ref)
timer = timer =
@ -93,7 +98,10 @@ def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do
nil nil
end end
{:reply, :ok, %{state | timer: timer}, :hibernate} %{state | timer: timer}
end
{:reply, :ok, state, :hibernate}
end end
@impl true @impl true
@ -103,16 +111,21 @@ def handle_info(:idle_close, state) do
{:stop, :normal, state} {:stop, :normal, state}
end end
@impl true
def handle_info({:gun_up, _pid, _protocol}, state) do
{:noreply, state, :hibernate}
end
# Gracefully shutdown if the connection got closed without any streams left # Gracefully shutdown if the connection got closed without any streams left
@impl true @impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state} {:stop, :normal, state}
end end
# Otherwise, shutdown with an error # Otherwise, wait for retry
@impl true @impl true
def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do
{:stop, {:error, down_message}, state} {:noreply, state, :hibernate}
end end
@impl true @impl true

View File

@ -11,7 +11,6 @@ defmodule Pleroma.HTTP.AdapterHelper do
@type proxy_type() :: :socks4 | :socks5 @type proxy_type() :: :socks4 | :socks5
@type host() :: charlist() | :inet.ip_address() @type host() :: charlist() | :inet.ip_address()
alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper alias Pleroma.HTTP.AdapterHelper
require Logger require Logger
@ -44,27 +43,13 @@ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
@spec options(URI.t(), keyword()) :: keyword() @spec options(URI.t(), keyword()) :: keyword()
def options(%URI{} = uri, opts \\ []) do def options(%URI{} = uri, opts \\ []) do
@defaults @defaults
|> put_timeout()
|> Keyword.merge(opts) |> Keyword.merge(opts)
|> adapter_helper().options(uri) |> adapter_helper().options(uri)
end end
# For Hackney, this is the time a connection can stay idle in the pool. @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
# For Gun, this is the timeout to receive a message from Gun.
defp put_timeout(opts) do
{config_key, default} =
if adapter() == Tesla.Adapter.Gun do
{:pools, Config.get([:pools, :default, :timeout], 5_000)}
else
{:hackney_pools, 10_000}
end
timeout = Config.get([config_key, opts[:pool], :timeout], default)
Keyword.merge(opts, timeout: timeout)
end
def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
defp adapter, do: Application.get_env(:tesla, :adapter) defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do defp adapter_helper do

View File

@ -5,6 +5,7 @@
defmodule Pleroma.HTTP.AdapterHelper.Gun do defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper @behaviour Pleroma.HTTP.AdapterHelper
alias Pleroma.Config
alias Pleroma.Gun.ConnectionPool alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper alias Pleroma.HTTP.AdapterHelper
@ -14,31 +15,46 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
connect_timeout: 5_000, connect_timeout: 5_000,
domain_lookup_timeout: 5_000, domain_lookup_timeout: 5_000,
tls_handshake_timeout: 5_000, tls_handshake_timeout: 5_000,
retry: 0, retry: 1,
retry_timeout: 1000, retry_timeout: 1000,
await_up_timeout: 5_000 await_up_timeout: 5_000
] ]
@type pool() :: :federation | :upload | :media | :default
@spec options(keyword(), URI.t()) :: keyword() @spec options(keyword(), URI.t()) :: keyword()
def options(incoming_opts \\ [], %URI{} = uri) do def options(incoming_opts \\ [], %URI{} = uri) do
proxy = proxy =
Pleroma.Config.get([:http, :proxy_url]) [:http, :proxy_url]
|> Config.get()
|> AdapterHelper.format_proxy() |> AdapterHelper.format_proxy()
config_opts = Pleroma.Config.get([:http, :adapter], []) config_opts = Config.get([:http, :adapter], [])
@defaults @defaults
|> Keyword.merge(config_opts) |> Keyword.merge(config_opts)
|> add_scheme_opts(uri) |> add_scheme_opts(uri)
|> AdapterHelper.maybe_add_proxy(proxy) |> AdapterHelper.maybe_add_proxy(proxy)
|> Keyword.merge(incoming_opts) |> Keyword.merge(incoming_opts)
|> put_timeout()
end end
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
defp add_scheme_opts(opts, %{scheme: "https"}) do defp add_scheme_opts(opts, %{scheme: "https"}) do
opts Keyword.put(opts, :certificates_verification, true)
|> Keyword.put(:certificates_verification, true) end
defp put_timeout(opts) do
# this is the timeout to receive a message from Gun
Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool]))
end
@spec pool_timeout(pool()) :: non_neg_integer()
def pool_timeout(pool) do
default = Config.get([:pools, :default, :timeout], 5_000)
Config.get([:pools, pool, :timeout], default)
end end
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
@ -51,11 +67,11 @@ def get_conn(uri, opts) do
@prefix Pleroma.Gun.ConnectionPool @prefix Pleroma.Gun.ConnectionPool
def limiter_setup do def limiter_setup do
wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait]) wait = Config.get([:connections_pool, :connection_acquisition_wait])
retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries]) retries = Config.get([:connections_pool, :connection_acquisition_retries])
:pools :pools
|> Pleroma.Config.get([]) |> Config.get([])
|> Enum.each(fn {name, opts} -> |> Enum.each(fn {name, opts} ->
max_running = Keyword.get(opts, :size, 50) max_running = Keyword.get(opts, :size, 50)
max_waiting = Keyword.get(opts, :max_waiting, 10) max_waiting = Keyword.get(opts, :max_waiting, 10)
@ -69,7 +85,6 @@ def limiter_setup do
case result do case result do
:ok -> :ok :ok -> :ok
{:error, :existing} -> :ok {:error, :existing} -> :ok
e -> raise e
end end
end) end)

View File

@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.ExAws do
@impl true @impl true
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
http_opts = Keyword.put_new(http_opts, :adapter, pool: :upload)
case HTTP.request(method, url, body, headers, http_opts) do case HTTP.request(method, url, body, headers, http_opts) do
{:ok, env} -> {:ok, env} ->
{:ok, %{status_code: env.status, headers: env.headers, body: env.body}} {:ok, %{status_code: env.status, headers: env.headers, body: env.body}}

View File

@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.Tzdata do
@impl true @impl true
def get(url, headers, options) do def get(url, headers, options) do
options = Keyword.put_new(options, :adapter, pool: :default)
with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do
{:ok, {env.status, env.headers, env.body}} {:ok, {env.status, env.headers, env.body}}
end end
@ -18,6 +20,8 @@ def get(url, headers, options) do
@impl true @impl true
def head(url, headers, options) do def head(url, headers, options) do
options = Keyword.put_new(options, :adapter, pool: :default)
with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do
{:ok, {env.status, env.headers}} {:ok, {env.status, env.headers}}
end end

View File

@ -150,7 +150,9 @@ def get_or_update_favicon(%URI{host: host} = instance_uri) do
defp scrape_favicon(%URI{} = instance_uri) do defp scrape_favicon(%URI{} = instance_uri) do
try do try do
with {:ok, %Tesla.Env{body: html}} <- with {:ok, %Tesla.Env{body: html}} <-
Pleroma.HTTP.get(to_string(instance_uri), [{:Accept, "text/html"}]), Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}],
adapter: [pool: :media]
),
favicon_rel <- favicon_rel <-
html html
|> Floki.parse_document!() |> Floki.parse_document!()

View File

@ -164,12 +164,12 @@ defp make_signature(id, date) do
date: date date: date
}) })
[{"signature", signature}] {"signature", signature}
end end
defp sign_fetch(headers, id, date) do defp sign_fetch(headers, id, date) do
if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
headers ++ make_signature(id, date) [make_signature(id, date) | headers]
else else
headers headers
end end
@ -177,7 +177,7 @@ defp sign_fetch(headers, id, date) do
defp maybe_date_fetch(headers, date) do defp maybe_date_fetch(headers, date) do
if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do
headers ++ [{"date", date}] [{"date", date} | headers]
else else
headers headers
end end

View File

@ -46,12 +46,23 @@ def put_file(%Pleroma.Upload{} = upload) do
op = op =
if streaming do if streaming do
op =
upload.tempfile upload.tempfile
|> ExAws.S3.Upload.stream_file() |> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket, s3_name, [ |> ExAws.S3.upload(bucket, s3_name, [
{:acl, :public_read}, {:acl, :public_read},
{:content_type, upload.content_type} {:content_type, upload.content_type}
]) ])
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
# set s3 upload timeout to respect :upload pool timeout
# timeout should be slightly larger, so s3 can retry upload on fail
timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000
opts = Keyword.put(op.opts, :timeout, timeout)
Map.put(op, :opts, opts)
else
op
end
else else
{:ok, file_data} = File.read(upload.tempfile) {:ok, file_data} = File.read(upload.tempfile)

View File

@ -96,6 +96,6 @@ def rich_media_get(url) do
@rich_media_options @rich_media_options
end end
Pleroma.HTTP.get(url, headers, options) Pleroma.HTTP.get(url, headers, adapter: options)
end end
end end

View File

@ -136,12 +136,12 @@ def get_template_from_xml(body) do
def find_lrdd_template(domain) do def find_lrdd_template(domain) do
with {:ok, %{status: status, body: body}} when status in 200..299 <- with {:ok, %{status: status, body: body}} when status in 200..299 <-
HTTP.get("http://#{domain}/.well-known/host-meta", []) do HTTP.get("http://#{domain}/.well-known/host-meta") do
get_template_from_xml(body) get_template_from_xml(body)
else else
_ -> _ ->
with {:ok, %{body: body, status: status}} when status in 200..299 <- with {:ok, %{body: body, status: status}} when status in 200..299 <-
HTTP.get("https://#{domain}/.well-known/host-meta", []) do HTTP.get("https://#{domain}/.well-known/host-meta") do
get_template_from_xml(body) get_template_from_xml(body)
else else
e -> {:error, "Can't find LRDD template: #{inspect(e)}"} e -> {:error, "Can't find LRDD template: #{inspect(e)}"}

View File

@ -1350,11 +1350,11 @@ def get("https://relay.mastodon.host/actor", _, _, _) do
{:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/relay/relay.json")}} {:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/relay/relay.json")}}
end end
def get("http://localhost:4001/", _, "", Accept: "text/html") do def get("http://localhost:4001/", _, "", [{"accept", "text/html"}]) do
{:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/tesla_mock/7369654.html")}} {:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/tesla_mock/7369654.html")}}
end end
def get("https://osada.macgirvin.com/", _, "", Accept: "text/html") do def get("https://osada.macgirvin.com/", _, "", [{"accept", "text/html"}]) do
{:ok, {:ok,
%Tesla.Env{ %Tesla.Env{
status: 200, status: 200,