Merge branch 'fix/streamer-timeout' into 'develop'

Fix streamer timeout

Closes #1735 and #1753

See merge request pleroma/pleroma!2509
This commit is contained in:
rinpatch 2020-05-11 17:07:55 +00:00
commit 768c18facb
1 changed files with 26 additions and 4 deletions

View File

@ -12,8 +12,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket @behaviour :cowboy_websocket
# Client ping period.
@tick :timer.seconds(30)
# Cowboy timeout period. # Cowboy timeout period.
@timeout :timer.seconds(30) @timeout :timer.seconds(60)
# Hibernate every X messages # Hibernate every X messages
@hibernate_every 100 @hibernate_every 100
@ -44,7 +46,8 @@ def init(%{qs: qs} = req, state) do
req req
end end
{:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}} {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
%{idle_timeout: @timeout}}
else else
{:error, code} -> {:error, code} ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
@ -66,11 +69,18 @@ def websocket_init(state) do
) )
Streamer.add_socket(state.topic, state.user) Streamer.add_socket(state.topic, state.user)
{:ok, state} {:ok, %{state | timer: timer()}}
end
# Client's Pong frame.
def websocket_handle(:pong, state) do
if state.timer, do: Process.cancel_timer(state.timer)
{:ok, %{state | timer: timer()}}
end end
# We never receive messages. # We never receive messages.
def websocket_handle(_frame, state) do def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state} {:ok, state}
end end
@ -94,6 +104,14 @@ def websocket_info({:text, message}, state) do
end end
end end
# Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received.
# As we hibernate there, reset the count to 0.
# If the client misses :pong, Cowboy will automatically timeout the connection after
# `@idle_timeout`.
def websocket_info(:tick, state) do
{:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
def terminate(reason, _req, state) do def terminate(reason, _req, state) do
Logger.debug( Logger.debug(
"#{__MODULE__} terminating websocket connection for user #{ "#{__MODULE__} terminating websocket connection for user #{
@ -149,4 +167,8 @@ defp expand_topic("list", params) do
end end
defp expand_topic(topic, _), do: topic defp expand_topic(topic, _), do: topic
defp timer do
Process.send_after(self(), :tick, @tick)
end
end end