diff --git a/lib/pigeon/apns/config.ex b/lib/pigeon/apns/config.ex index 83c8448d..891a78e7 100644 --- a/lib/pigeon/apns/config.ex +++ b/lib/pigeon/apns/config.ex @@ -9,7 +9,7 @@ defmodule Pigeon.APNS.Config do config = Application.get_env(:pigeon, :apns)[name] %{ name: name, - mode: config[:mode], + mode: mode(config[:mode]), reconnect: Map.get(config, :reconnect, true), cert: cert(config[:cert]), certfile: file_path(config[:cert]), @@ -20,6 +20,16 @@ defmodule Pigeon.APNS.Config do } end + def mode({:system, env_var}), do: to_mode(System.get_env(env_var)) + def mode(mode), do: mode + + def to_mode(nil), do: raise "APNS.Config mode is nil" + def to_mode("dev"), do: :dev + def to_mode(":dev"), do: :dev + def to_mode("prod"), do: :prod + def to_mode(":prod"), do: :prod + def to_mode(other), do: raise "APNS.Config mode is #{inspect(other)}" + def file_path(nil), do: nil def file_path(path) when is_binary(path) do cond do diff --git a/lib/pigeon/fcm.ex b/lib/pigeon/fcm.ex index fc8a4e71..3e56ede6 100644 --- a/lib/pigeon/fcm.ex +++ b/lib/pigeon/fcm.ex @@ -5,37 +5,26 @@ defmodule Pigeon.FCM do require Logger import Supervisor.Spec - alias Pigeon.FCM.NotificationResponse + alias Pigeon.FCM.{Notification, NotificationResponse} @default_timeout 5_000 @default_worker :fcm_default + @chunk_size 1_000 + @spec push(Notification.t, Keyword.t) :: {:ok, NotificationResponse.t} + @spec push([Notification.t, ...], Keyword.t) :: [NotificationResponse.t, ...] def push(notification, opts \\ []) def push(notification, opts) when is_list(notification) do case opts[:on_response] do nil -> - ref = :erlang.make_ref - pid = self() - for n <- notification do - on_response = fn(x) -> send pid, {ref, x} end - send_push(n, on_response, opts) - end - Enum.foldl(notification, %{}, fn(_n, acc) -> - receive do - {^ref, %NotificationResponse{message_id: id} = response} -> - if Map.has_key?(acc, id) do - %{acc | id => merge(response, acc[:message_id])} - else - Map.merge(%{id => response}, acc) - end - after 5_000 -> - acc - end - end) - on_response -> send_push(notification, on_response, opts) + tasks = for n <- notification, do: Task.async(fn -> do_sync_push(n, opts) end) + tasks + |> Task.yield_many(@default_timeout + 10_000) + |> Enum.map(&task_mapper(&1)) + on_response -> + for n <- notification, do: send_push(n, on_response, opts) end end - def push(notification, opts) do case opts[:on_response] do nil -> do_sync_push(notification, opts) @@ -43,6 +32,24 @@ defmodule Pigeon.FCM do end end + defp task_mapper({task, result}) do + case result do + nil -> Task.shutdown(task, :brutal_kill) + {:ok, {:ok, response}} -> {:ok, response} + {:ok, {:error, :timeout, notification}} -> {:error, notification} + end + end + + @doc """ + Sends a push over FCM. + """ + def send_push(notification, on_response, opts) do + worker_name = opts[:to] || @default_worker + notification + |> encode_requests() + |> Enum.map(& GenServer.cast(worker_name, generate_envelope(&1, on_response, opts))) + end + defp do_sync_push(notification, opts) do ref = :erlang.make_ref pid = self() @@ -73,24 +80,22 @@ defmodule Pigeon.FCM do end def encode_requests(notification) do notification.registration_id - |> Enum.chunk(1000, 1000, []) + |> chunk(@chunk_size, @chunk_size, []) |> Enum.map(& encode_requests(%{notification | registration_id: &1})) |> List.flatten end + defp chunk(collection, chunk_size, step, padding) do + if Kernel.function_exported?(Enum, :chunk_every, 4) do + Enum.chunk_every(collection, chunk_size, step, padding) + else + Enum.chunk(collection, chunk_size, step, padding) + end + end + defp recipient_attr([regid]), do: %{"to" => regid} defp recipient_attr(regid) when is_list(regid), do: %{"registration_ids" => regid} - @doc """ - Sends a push over FCM. - """ - def send_push(notification, on_response, opts) do - worker_name = opts[:to] || @default_worker - notification - |> encode_requests() - |> Enum.map(& GenServer.cast(worker_name, generate_envelope(&1, on_response, opts))) - end - def start_connection(opts \\ []) def start_connection(name) when is_atom(name) do config = Application.get_env(:pigeon, :fcm)[name] @@ -121,7 +126,8 @@ defmodule Pigeon.FCM do is_map(value_1) -> merge(value_1, value_2) is_nil(value_1) -> value_2 is_nil(value_2) -> value_1 - true -> value_1 ++ value_2 + is_list(value_1) && is_list(value_2) -> value_1 ++ value_2 + true -> [value_1] ++ [value_2] end end) end diff --git a/lib/pigeon/fcm/worker.ex b/lib/pigeon/fcm/worker.ex index cb228019..32a9ed10 100644 --- a/lib/pigeon/fcm/worker.ex +++ b/lib/pigeon/fcm/worker.ex @@ -184,32 +184,46 @@ defmodule Pigeon.FCM.Worker do end end - def process_end_stream(%Pigeon.Http2.Stream{id: stream_id, headers: headers, body: body}, + def process_end_stream(%Pigeon.Http2.Stream{id: stream_id, headers: headers, body: body, error: error} = stream, %{socket: _socket, queue: queue} = state) do - {registration_ids, on_response} = queue["#{stream_id}"] - case get_status(headers) do - "200" -> - result = Poison.decode!(body) - parse_result(registration_ids, result, on_response) - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - nil -> + cond do + queue["#{stream_id}"] == nil -> + Logger.error("Unknown stream_id: #{stream_id}, Error: #{inspect(error)}") {:noreply, state} - "401" -> - log_error("401", "Unauthorized") - unless on_response == nil do on_response.({:error, :unauthorized}) end - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - "400" -> - log_error("400", "Malformed JSON") - unless on_response == nil do on_response.({:error, :malformed_json}) end - new_queue = Map.delete(queue, "#{stream_id}") - {:noreply, %{state | queue: new_queue}} - code -> - reason = parse_error(body) - log_error(code, reason) - unless on_response == nil do on_response.({:error, reason}) end + error == nil -> + {registration_ids, on_response} = queue["#{stream_id}"] + case get_status(headers) do + nil -> + stream |> inspect() |> Logger.error + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + "200" -> + result = Poison.decode!(body) + parse_result(registration_ids, result, on_response) + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + "401" -> + log_error("401", "Unauthorized") + unless on_response == nil do on_response.({:error, :unauthorized}) end + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + "400" -> + log_error("400", "Malformed JSON") + unless on_response == nil do on_response.({:error, :malformed_json}) end + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + code -> + reason = parse_error(body) + log_error(code, reason) + unless on_response == nil do on_response.({:error, reason}) end + new_queue = Map.delete(queue, "#{stream_id}") + {:noreply, %{state | queue: new_queue}} + end + true -> + {_registration_ids, on_response} = queue["#{stream_id}"] + error |> inspect() |> Logger.error + unless on_response == nil do on_response.({:error, :unavailable}) end new_queue = Map.delete(queue, "#{stream_id}") {:noreply, %{state | queue: new_queue}} end @@ -222,6 +236,9 @@ defmodule Pigeon.FCM.Worker do ResultParser.parse(ids, results, on_response, %NotificationResponse{}) end + defp get_status(nil) do + nil + end defp get_status(headers) do case Enum.find(headers, fn({key, _val}) -> key == ":status" end) do {":status", status} -> status diff --git a/lib/pigeon/http2_client/kadabra.ex b/lib/pigeon/http2_client/kadabra.ex index b3705440..f59eed68 100644 --- a/lib/pigeon/http2_client/kadabra.ex +++ b/lib/pigeon/http2_client/kadabra.ex @@ -15,9 +15,10 @@ defmodule Pigeon.Http2.Client.Kadabra do def handle_end_stream({:end_stream, %{id: id, headers: headers, - body: body}}, _state) do + body: body, + error: error}}, _state) do - {:ok, %Pigeon.Http2.Stream{id: id, headers: headers, body: body}} + {:ok, %Pigeon.Http2.Stream{id: id, headers: headers, body: body, error: error}} end def handle_end_stream(msg, _state), do: msg end diff --git a/lib/pigeon/http2_stream.ex b/lib/pigeon/http2_stream.ex index d6b03156..72e2376d 100644 --- a/lib/pigeon/http2_stream.ex +++ b/lib/pigeon/http2_stream.ex @@ -1,3 +1,3 @@ defmodule Pigeon.Http2.Stream do - defstruct id: nil, headers: nil, body: nil + defstruct id: nil, headers: nil, body: nil, error: nil end diff --git a/mix.exs b/mix.exs index 42e9aff7..983206ec 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Pigeon.Mixfile do def project do [app: :pigeon, name: "Pigeon", - version: "1.0.2", + version: "1.0.3", elixir: "~> 1.2", source_url: "https://github.com/codedge-llc/pigeon", description: description(), @@ -31,7 +31,7 @@ defmodule Pigeon.Mixfile do defp deps do [{:poison, "~> 2.0 or ~> 3.0"}, {:httpoison, "~> 0.7"}, - {:kadabra, "~> 0.2.1", optional: true}, + {:kadabra, "~> 0.2.2", optional: true}, {:dogma, "~> 0.1", only: :dev}, {:earmark, "~> 1.0", only: :dev}, {:ex_doc, "~> 0.2", only: :dev},