weewx-proxy/lib/weewx_proxy/http/purple_air.ex

166 lines
4.2 KiB
Elixir

defmodule WeewxProxy.HTTP.PurpleAir do
require Logger
use GenServer
alias WeewxProxy.{Publisher, Utils}
@type parsed_body :: %{required(String.t()) => String.t() | float() | integer()}
defmodule State do
# credo:disable-for-previous-line Credo.Check.Readability.ModuleDoc
use TypedStruct
typedstruct do
field :last_update, non_neg_integer(), default: 0
end
end
@name __MODULE__
@spec child_spec(term) :: Supervisor.child_spec()
def child_spec(_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
restart: :permanent,
shutdown: 5000,
type: :worker
}
end
@spec start_link :: GenServer.on_start()
def start_link do
GenServer.start_link(__MODULE__, [], name: @name)
end
# API
@spec recently_uploaded_keys :: [atom()]
def recently_uploaded_keys do
current_timestamp = Utils.utc_timestamp()
recently_uploaded_keys(current_timestamp)
end
@spec recently_uploaded_keys(non_neg_integer()) :: [atom()]
def recently_uploaded_keys(current_timestamp) do
case :ets.lookup(:purpleair, :last_update) do
[{_key, timestamp}] when current_timestamp - timestamp < 70 ->
[:pressure]
_ ->
[]
end
end
# Callbacks
@impl true
def init([]) do
{:ok, %State{}, {:continue, :init}}
end
@impl true
def handle_continue(:init, state) do
:ok = Process.send(self(), :fetch, [])
{:noreply, state}
end
@impl true
def handle_info(:fetch, state) do
data = fetch_data()
:ok = handle_reading(data)
_ = trigger_fetch()
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
end
@impl true
def handle_info(request, state) do
_ = Logger.error("Unexpected message: #{inspect(request)}")
{:noreply, state}
end
# Helper
@spec trigger_fetch :: reference()
defp trigger_fetch do
Process.send_after(self(), :fetch, 25_000)
end
@spec fetch_data :: parsed_body() | nil
defp fetch_data do
url = Application.fetch_env!(:weewx_proxy, :purpleair_url)
case HTTPoison.get(url) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
{:ok, data} = Jason.decode(body)
if handle_reading?(data) do
data
else
_ = Logger.warning("Ignoring reading: #{inspect(data)}")
nil
end
{:ok, response} ->
_ = Logger.warning("Unexpected response: #{inspect(response)}")
nil
{:error, error} ->
_ = Logger.error("Unexpected error: #{inspect(error)}")
nil
end
end
@spec handle_reading?(parsed_body()) :: boolean
defp handle_reading?(data) do
is_number(data["pm1_0_atm"]) and is_number(data["pm2_5_atm"]) and is_number(data["pm10_0_atm"]) and
is_number(data["pm1_0_atm_b"]) and is_number(data["pm2_5_atm_b"]) and is_number(data["pm10_0_atm_b"]) and
abs(data["pm2_5_atm"] - data["pm2_5_atm_b"]) < 200 and
is_number(data["pressure"]) and is_number(data["pressure_680"]) and
is_integer(data["uptime"]) and data["uptime"] > 120
end
@spec handle_reading(parsed_body() | nil) :: :ok
defp handle_reading(nil), do: :ok
defp handle_reading(data) do
transformed_data = %{
dateTime: format_date_time(data),
pm1_0: calculate_mean(data, "pm1_0_atm", "pm1_0_atm_b"),
pm2_5: calculate_mean(data, "pm2_5_atm", "pm2_5_atm_b"),
pm10_0: calculate_mean(data, "pm10_0_atm", "pm10_0_atm_b"),
pressure: calculate_mean(data, "pressure", "pressure_680")
}
_ = :ets.insert(:purpleair, {:last_update, transformed_data.dateTime})
Publisher.publish("weewx/ingest_si", transformed_data)
end
@spec format_date_time(parsed_body()) :: non_neg_integer()
defp format_date_time(data) do
{:ok, dt, 0} =
data |> Map.get("DateTime") |> String.replace("/", "-") |> String.upcase(:ascii) |> DateTime.from_iso8601()
DateTime.to_unix(dt)
end
@spec calculate_mean(parsed_body(), String.t(), String.t()) :: float()
defp calculate_mean(data, key_a, key_b) do
data_a = data[key_a]
data_b = data[key_b]
raw_value =
case {data_a, data_b} do
{0.0, 0.0} -> 0.0
{_, 0.0} -> data_a
{0.0, _} -> data_b
{_, _} -> (data_a + data_b) / 2.0
end
Float.round(raw_value, 4)
end
end