Initial commit

This commit is contained in:
Daniel Kempkens 2023-06-06 16:41:34 +02:00
commit 4837ac0fbf
Signed by: daniel
SSH key fingerprint: SHA256:Ks/MyhQYcPRQiwMKLAKquWCdCPe3JXlb1WttgnAoSeM
21 changed files with 1313 additions and 0 deletions

2
.envrc Normal file
View file

@ -0,0 +1,2 @@
use flake
project elixir

8
.formatter.exs Normal file
View file

@ -0,0 +1,8 @@
[
inputs: [
"mix.exs",
"{config,lib,test}/**/*.{ex,exs}"
],
line_length: 120,
import_deps: [:plug, :typed_struct]
]

22
.gitignore vendored Normal file
View file

@ -0,0 +1,22 @@
# The directory Mix will write compiled artifacts to.
/_build
# If you run "mix test --cover", coverage assets end up here.
/cover
# The directory Mix downloads your dependencies sources to.
/deps
# Where 3rd-party dependencies like ExDoc output generated docs.
/doc
# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump
# Also ignore archive artifacts (built via "mix archive.build").
*.ez
# nix
/.direnv
/.elixir_ls
/result

5
LICENSE Normal file
View file

@ -0,0 +1,5 @@
Copyright 2023 Daniel Kempkens
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED “AS IS” AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

21
README.md Normal file
View file

@ -0,0 +1,21 @@
# WeewxProxy
**TODO: Add description**
## Installation
If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `weewx_proxy` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:weewx_proxy, "~> 0.1.0"}
]
end
```
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at <https://hexdocs.pm/weewx_proxy>.

9
config/config.exs Normal file
View file

@ -0,0 +1,9 @@
import Config
config :elixir, :time_zone_database, Tz.TimeZoneDatabase
config :logger,
backends: [],
level: :warning,
handle_otp_reports: false,
handle_sasl_reports: false

14
config/runtime.exs Normal file
View file

@ -0,0 +1,14 @@
import Config
config :weewx_proxy,
mqtt_weewx_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_HOST"))), 1),
mqtt_weewx_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_WEEWX_PORT", "1883")), 0),
mqtt_weewx_user: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_USER"),
mqtt_weewx_password: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_PASSWORD"),
mqtt_weewx_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_WEEWX_CLIENT_ID", "WeewxBroker")),
mqtt_sdr_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_SDR_HOST"))), 1),
mqtt_sdr_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_SDR_PORT", "1883")), 0),
mqtt_sdr_user: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_USER"),
mqtt_sdr_password: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_PASSWORD"),
mqtt_sdr_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_SDR_CLIENT_ID", "SdrIngestLocal")),
purpleair_url: System.fetch_env!("WEEWX_PROXY_PURPLEAIR_URL")

64
flake.lock Normal file
View file

@ -0,0 +1,64 @@
{
"nodes": {
"flake-parts": {
"inputs": {
"nixpkgs-lib": "nixpkgs-lib"
},
"locked": {
"lastModified": 1680392223,
"narHash": "sha256-n3g7QFr85lDODKt250rkZj2IFS3i4/8HBU2yKHO3tqw=",
"owner": "hercules-ci",
"repo": "flake-parts",
"rev": "dcc36e45d054d7bb554c9cdab69093debd91a0b5",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "flake-parts",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1682566018,
"narHash": "sha256-HPzPRFiy2o/7k7mtnwfM1E6NVZHiFbPdmYCMoIpkHO4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "8e3b64db39f2aaa14b35ee5376bd6a2e707cadc2",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-lib": {
"locked": {
"dir": "lib",
"lastModified": 1680213900,
"narHash": "sha256-cIDr5WZIj3EkKyCgj/6j3HBH4Jj1W296z7HTcWj1aMA=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e3652e0735fbec227f342712f180f4f21f0594f2",
"type": "github"
},
"original": {
"dir": "lib",
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-parts": "flake-parts",
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

60
flake.nix Normal file
View file

@ -0,0 +1,60 @@
{
description = "weewx-proxy development environment";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
flake-parts.url = "github:hercules-ci/flake-parts";
};
outputs = inputs@{ flake-parts, ... }:
flake-parts.lib.mkFlake { inherit inputs; } {
systems = [ "aarch64-darwin" "x86_64-linux" "aarch64-linux" ];
perSystem = { pkgs, lib, self', ... }:
let
pname = "weewx-proxy";
version = "0.0.1";
erlang = pkgs.beam.interpreters.erlangR25;
beamPackages = pkgs.beam.packagesWith erlang;
elixir = beamPackages.elixir_1_14;
inherit (pkgs.stdenv) isDarwin;
in
{
devShells.default = pkgs.mkShell {
packages = (with pkgs; [
erlang
elixir
beamPackages.elixir-ls
mix2nix
mosquitto
]) ++ lib.optionals isDarwin (with pkgs.darwin.apple_sdk.frameworks; [
CoreFoundation
CoreServices
]);
ERL_INCLUDE_PATH = "${erlang}/lib/erlang/usr/include";
};
packages.default = beamPackages.mixRelease {
inherit pname version;
src = ./.;
mixNixDeps = import ./mix.nix { inherit lib beamPackages; };
};
packages.container = pkgs.dockerTools.buildLayeredImage {
name = pname;
tag = "v${version}";
config = {
ExposedPorts = { "4040/tcp" = { }; };
Entrypoint = [ "${self'.packages.default}/bin/weewx_proxy" ];
Cmd = [ "start" ];
};
};
apps.default = { type = "app"; program = "${self'.packages.default}/bin/weewx_proxy"; };
};
};
}

View file

@ -0,0 +1,17 @@
defmodule WeewxProxy.Application do
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
WeewxProxy.Mqtt,
WeewxProxy.Publisher,
WeewxProxy.Http
]
opts = [strategy: :one_for_one, name: WeewxProxy.Supervisor]
Supervisor.start_link(children, opts)
end
end

44
lib/weewx_proxy/http.ex Normal file
View file

@ -0,0 +1,44 @@
defmodule WeewxProxy.Http do
@moduledoc false
use Supervisor
@name __MODULE__
@spec child_spec(term) :: Supervisor.child_spec()
def child_spec(_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
restart: :permanent,
shutdown: 5000,
type: :supervisor
}
end
@spec start_link :: Supervisor.on_start()
def start_link do
Supervisor.start_link(__MODULE__, [], name: @name)
end
# Callbacks
@impl true
def init([]) do
children = [
{Plug.Cowboy,
scheme: :http,
plug: WeewxProxy.HTTP.Ecowitt,
options: [
port: 4040,
transport_options: [
num_acceptors: 3
]
]},
{Plug.Cowboy.Drainer, refs: [WeewxProxy.HTTP.Ecowitt.HTTP]},
WeewxProxy.HTTP.PurpleAir
]
Supervisor.init(children, strategy: :one_for_one)
end
end

View file

@ -0,0 +1,116 @@
defmodule WeewxProxy.HTTP.Ecowitt do
require Logger
use Plug.Router
alias WeewxProxy.{Publisher, Utils}
alias WeewxProxy.Sdr.Ecowitt, as: Sdr
@type parsed_body :: %{required(String.t()) => String.t()}
plug Plug.Logger, log: :debug
plug Plug.Parsers, parsers: [:urlencoded]
plug :match
plug :dispatch
post "/update" do
body = conn.body_params
_ = Logger.debug("Incoming request body: #{inspect(body)}")
data = transform_data(body)
:ok =
if valid_data?(data) do
sdr_keys = Sdr.recently_uploaded_keys(data.dateTime)
_ = Logger.debug("Removing keys: `#{inspect(sdr_keys)}'")
partial_data = Map.drop(data, sdr_keys)
Publisher.publish("weewx/ingest_us", partial_data)
else
_ = Logger.error("Not publishing record because data appears invalid: #{inspect(data)}")
:ok
end
tz = System.get_env("TZ", "Europe/Berlin")
utc_offset = Utils.utc_offset_string(tz)
response = ~s({"errcode":"0","errmsg":"ok","UTC_offset":"#{utc_offset}"})
send_resp(conn, 200, response)
end
match _ do
send_resp(conn, 404, "Not Found")
end
# Private
@spec transform_data(parsed_body()) :: Publisher.data()
defp transform_data(data) do
# Fields with totals:
# - rain
# - lightning_strike_count
%{
dateTime: format_date_time(data),
# Outdoor
outTemp: Utils.parse_float(data["tempf"]),
outHumidity: Utils.parse_float(data["humidity"]),
pressure: Utils.parse_float(data["baromabsin"]),
windSpeed: Utils.parse_float(data["windspeedmph"]),
windGust: Utils.parse_float(data["windgustmph"]),
windDir: Utils.parse_float(data["winddir"]),
rain: Utils.parse_float(data["yearlyrainin"]),
rainRate: Utils.parse_float(data["rainratein"]),
UV: Utils.parse_float(data["uv"]),
radiation: Utils.parse_float(data["solarradiation"]),
soilMoist1: Utils.parse_float(data["soilmoisture1"]),
soilTemp1: Utils.parse_float(data["tf_ch1"]),
lightning_strike_count: calculate_lightning_strike_count(data),
lightning_last_det_time: Utils.parse_integer(data["lightning_time"]),
lightning_distance: calculate_lightning_distance(data),
# Indoor
inTemp: Utils.parse_float(data["tempinf"]),
inHumidity: Utils.parse_float(data["humidityin"]),
# Battery
soilMoistBatteryVoltage1: Utils.parse_float(data["soilbatt1"]),
soilTempBatteryVoltage1: Utils.parse_float(data["tf_batt1"])
}
end
@spec format_date_time(parsed_body()) :: non_neg_integer()
defp format_date_time(data) do
{:ok, dt, 0} =
data |> Map.get("dateutc") |> String.replace("+", "T") |> Utils.append_string("Z") |> DateTime.from_iso8601()
DateTime.to_unix(dt)
end
@spec calculate_lightning_strike_count(parsed_body()) :: float() | nil
defp calculate_lightning_strike_count(data) do
if Map.has_key?(data, "lightning_num") do
value = Utils.parse_float(data["lightning_num"])
if is_nil(value), do: 0.0, else: value
else
nil
end
end
@spec calculate_lightning_distance(parsed_body()) :: float() | nil
defp calculate_lightning_distance(data) do
distance_km = data["lightning"]
strikes = calculate_lightning_strike_count(data)
current_time = Utils.utc_timestamp()
lightning_time = Utils.parse_integer(data["lightning_time"])
time_diff = current_time - lightning_time
if is_binary(distance_km) and is_number(strikes) and byte_size(distance_km) > 0 and strikes > 0 and time_diff < 1200 do
0.62137119 * Utils.parse_float(distance_km)
else
nil
end
end
@spec valid_data?(Publisher.data()) :: boolean()
defp valid_data?(data) do
Map.has_key?(data, :outTemp) and is_number(data.outTemp)
end
end

View file

@ -0,0 +1,165 @@
defmodule WeewxProxy.HTTP.PurpleAir do
require Logger
use GenServer
alias WeewxProxy.{Publisher, Utils}
@type parsed_body :: %{required(String.t()) => String.t() | float() | integer()}
defmodule State do
# credo:disable-for-previous-line Credo.Check.Readability.ModuleDoc
use TypedStruct
typedstruct do
field :last_update, non_neg_integer(), default: 0
end
end
@name __MODULE__
@spec child_spec(term) :: Supervisor.child_spec()
def child_spec(_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
restart: :permanent,
shutdown: 5000,
type: :worker
}
end
@spec start_link :: GenServer.on_start()
def start_link do
GenServer.start_link(__MODULE__, [], name: @name)
end
# API
@spec recently_uploaded_keys :: [atom()]
def recently_uploaded_keys do
current_timestamp = Utils.utc_timestamp()
recently_uploaded_keys(current_timestamp)
end
@spec recently_uploaded_keys(non_neg_integer()) :: [atom()]
def recently_uploaded_keys(current_timestamp) do
case :ets.lookup(:purpleair, :last_update) do
[{_key, timestamp}] when current_timestamp - timestamp < 70 ->
[:pressure]
_ ->
[]
end
end
# Callbacks
@impl true
def init([]) do
{:ok, %State{}, {:continue, :init}}
end
@impl true
def handle_continue(:init, state) do
:ok = Process.send(self(), :fetch, [])
{:noreply, state}
end
@impl true
def handle_info(:fetch, state) do
data = fetch_data()
:ok = handle_reading(data)
_ = trigger_fetch()
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
end
@impl true
def handle_info(request, state) do
_ = Logger.error("Unexpected message: #{inspect(request)}")
{:noreply, state}
end
# Helper
@spec trigger_fetch :: reference()
defp trigger_fetch do
Process.send_after(self(), :fetch, 25_000)
end
@spec fetch_data :: parsed_body() | nil
defp fetch_data do
url = Application.fetch_env!(:weewx_proxy, :purpleair_url)
case HTTPoison.get(url) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
{:ok, data} = Jason.decode(body)
if handle_reading?(data) do
data
else
_ = Logger.warn("Ignoring reading: #{inspect(data)}")
nil
end
{:ok, response} ->
_ = Logger.warn("Unexpected response: #{inspect(response)}")
nil
{:error, error} ->
_ = Logger.error("Unexpected error: #{inspect(error)}")
nil
end
end
@spec handle_reading?(parsed_body()) :: boolean
defp handle_reading?(data) do
is_number(data["pm1_0_atm"]) and is_number(data["pm2_5_atm"]) and is_number(data["pm10_0_atm"]) and
is_number(data["pm1_0_atm_b"]) and is_number(data["pm2_5_atm_b"]) and is_number(data["pm10_0_atm_b"]) and
abs(data["pm2_5_atm"] - data["pm2_5_atm_b"]) < 200 and
is_number(data["pressure"]) and is_number(data["pressure_680"]) and
is_integer(data["uptime"]) and data["uptime"] > 120
end
@spec handle_reading(parsed_body() | nil) :: :ok
defp handle_reading(nil), do: :ok
defp handle_reading(data) do
transformed_data = %{
dateTime: format_date_time(data),
pm1_0: calculate_mean(data, "pm1_0_atm", "pm1_0_atm_b"),
pm2_5: calculate_mean(data, "pm2_5_atm", "pm2_5_atm_b"),
pm10_0: calculate_mean(data, "pm10_0_atm", "pm10_0_atm_b"),
pressure: calculate_mean(data, "pressure", "pressure_680")
}
_ = :ets.insert(:purpleair, {:last_update, transformed_data.dateTime})
Publisher.publish("weewx/ingest_si", transformed_data)
end
@spec format_date_time(parsed_body()) :: non_neg_integer()
defp format_date_time(data) do
{:ok, dt, 0} =
data |> Map.get("DateTime") |> String.replace("/", "-") |> String.upcase(:ascii) |> DateTime.from_iso8601()
DateTime.to_unix(dt)
end
@spec calculate_mean(parsed_body(), String.t(), String.t()) :: float()
defp calculate_mean(data, key_a, key_b) do
data_a = data[key_a]
data_b = data[key_b]
raw_value =
case {data_a, data_b} do
{0.0, 0.0} -> 0.0
{_, 0.0} -> data_a
{0.0, _} -> data_b
{_, _} -> (data_a + data_b) / 2.0
end
Float.round(raw_value, 4)
end
end

61
lib/weewx_proxy/mqtt.ex Normal file
View file

@ -0,0 +1,61 @@
defmodule WeewxProxy.Mqtt do
@moduledoc false
use Supervisor
@name __MODULE__
@spec child_spec(term) :: Supervisor.child_spec()
def child_spec(_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
restart: :permanent,
shutdown: 5000,
type: :supervisor
}
end
@spec start_link :: Supervisor.on_start()
def start_link do
Supervisor.start_link(__MODULE__, [], name: @name)
end
# Callbacks
@impl true
def init([]) do
children = [
{Tortoise311.Connection,
[
name: WeewxProxy.Mqtt.WeewxBroker,
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_client_id),
server:
{Tortoise311.Transport.Tcp,
host: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_host),
port: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_port)},
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_user),
password: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_password),
handler: {Tortoise311.Handler.Logger, []}
]},
{Tortoise311.Connection,
[
name: WeewxProxy.Mqtt.SdrIngest,
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_client_id),
server:
{Tortoise311.Transport.Tcp,
host: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_host),
port: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_port)},
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_user),
password: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_password),
subscriptions: ["rtl433"],
handler: {WeewxProxy.Sdr.Ecowitt, []}
]}
]
:sdr_ecowitt = :ets.new(:sdr_ecowitt, [:set, :public, :named_table, {:read_concurrency, true}])
:purpleair = :ets.new(:purpleair, [:set, :public, :named_table, {:read_concurrency, true}])
Supervisor.init(children, strategy: :one_for_one, max_restarts: 9, max_seconds: 5)
end
end

View file

@ -0,0 +1,60 @@
defmodule WeewxProxy.Publisher do
require Logger
use GenServer
alias WeewxProxy.Utils
@type data :: %{required(atom) => number() | nil}
defmodule State do
# credo:disable-for-previous-line Credo.Check.Readability.ModuleDoc
use TypedStruct
typedstruct do
field :last_update, non_neg_integer(), default: 0
end
end
@name __MODULE__
@spec child_spec(term) :: Supervisor.child_spec()
def child_spec(_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
restart: :permanent,
shutdown: 5000,
type: :worker
}
end
@spec start_link :: GenServer.on_start()
def start_link do
GenServer.start_link(__MODULE__, [], name: @name)
end
@spec publish(String.t(), data()) :: :ok
def publish(topic, data) do
filtered_data = :maps.filter(fn _k, v -> not is_nil(v) end, data)
GenServer.cast(@name, {:publish, topic, filtered_data})
end
# Callbacks
@impl true
def init([]) do
{:ok, %State{}}
end
@impl true
def handle_cast({:publish, topic, data}, state) do
{:ok, json_data} = Jason.encode(data)
_ = Logger.info("Publishing record to #{topic}")
_ = Tortoise311.publish(WeewxBroker, topic, json_data, qos: 0, timeout: 5000)
_ = Logger.debug("Published record: #{inspect(data)}")
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
end
end

View file

@ -0,0 +1,199 @@
defmodule WeewxProxy.Sdr.Ecowitt do
@moduledoc false
require Logger
use Tortoise311.Handler
alias WeewxProxy.HTTP.PurpleAir
alias WeewxProxy.{Publisher, Utils}
@type parsed_body :: %{required(String.t()) => String.t() | float() | integer()}
# API
@spec recently_uploaded_keys :: [atom()]
def recently_uploaded_keys do
current_timestamp = Utils.utc_timestamp()
recently_uploaded_keys(current_timestamp)
end
@spec recently_uploaded_keys(non_neg_integer()) :: [atom()]
def recently_uploaded_keys(current_timestamp) do
wh65b_keys =
case :ets.lookup(:sdr_ecowitt, {:wh65b, :last_update}) do
[{_key, timestamp}] when current_timestamp - timestamp < 48 ->
[:outTemp, :outHumidity, :windSpeed, :windGust, :windDir]
_ ->
[]
end
wh32b_keys =
case :ets.lookup(:sdr_ecowitt, {:wh32b, :last_update}) do
[{_key, timestamp}] when current_timestamp - timestamp < 80 ->
[:inTemp, :inHumidity, :pressure]
_ ->
[]
end
wh65b_keys ++ wh32b_keys
end
# Callbacks
@impl true
def init(_opts) do
_ = Logger.info("Initializing handler")
{:ok, nil}
end
@impl true
def connection(:up, state) do
_ = Logger.info("Connection has been established")
{:ok, state}
end
@impl true
def connection(:down, state) do
_ = Logger.warn("Connection has been dropped")
{:ok, state}
end
@impl true
def connection(:terminating, state) do
_ = Logger.warn("Connection is terminating")
{:ok, state}
end
@impl true
def subscription(:up, topic, state) do
_ = Logger.info("Subscribed to `#{topic}'")
{:ok, state}
end
@impl true
def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
_ = Logger.warn("Subscribed to `#{topic}'; requested #{req} but got accepted with QoS #{qos}")
{:ok, state}
end
@impl true
def subscription({:error, reason}, topic, state) do
_ = Logger.error("Error subscribing to `#{topic}'; #{inspect(reason)}")
{:ok, state}
end
@impl true
def subscription(:down, topic, state) do
_ = Logger.info("Unsubscribed from `#{topic}'")
{:ok, state}
end
@impl true
def handle_message(topic, publish, state) do
full_topic = Enum.join(topic, "/")
parsed_message = parse_message(full_topic, publish)
:ok = handle_reading(parsed_message)
{:ok, state}
end
@impl true
def terminate(reason, _state) do
_ = Logger.warn("Client has been terminated with reason: `#{inspect(reason)}'")
:ok
end
# Helper
@spec parse_message(String.t(), String.t()) :: parsed_body() | nil
defp parse_message("rtl433", message) do
{:ok, body} = Jason.decode(message)
if handle_reading?(body) do
body
else
_ = Logger.warn("Ignoring reading: #{inspect(body)}")
nil
end
end
defp parse_message(_topic, _message), do: nil
@spec handle_reading?(parsed_body()) :: boolean()
defp handle_reading?(%{"model" => "Fineoffset-WH65B", "id" => 189}), do: true
defp handle_reading?(%{"model" => "Fineoffset-WH32B", "id" => 173}), do: true
defp handle_reading?(_reading), do: false
@spec handle_reading(parsed_body() | nil) :: :ok
defp handle_reading(nil), do: :ok
defp handle_reading(body) do
{type, data} = transform_data(body)
:ok =
if valid_data?(type, data) do
purpleair_keys = PurpleAir.recently_uploaded_keys(data.dateTime)
_ = Logger.debug("Removing keys: `#{inspect(purpleair_keys)}'")
partial_data = Map.drop(data, purpleair_keys)
true = :ets.insert(:sdr_ecowitt, {{type, :last_update}, data.dateTime})
Publisher.publish("weewx/ingest_si", partial_data)
else
_ = Logger.error("Not publishing record because data appears invalid: #{inspect(data)}")
:ok
end
:ok
end
@spec transform_data(parsed_body()) :: {:wh65b | :wh32b, Publisher.data()}
defp transform_data(%{"model" => "Fineoffset-WH65B"} = data) do
data = %{
dateTime: format_date_time(data),
outTemp: Utils.parse_float(data["temperature_C"]),
outHumidity: Utils.parse_float(data["humidity"]),
windSpeed: Utils.parse_float(data["wind_avg_m_s"]),
windGust: Utils.parse_float(data["wind_max_m_s"]),
windDir: Utils.parse_float(data["wind_dir_deg"]),
luminosity: Utils.parse_float(data["light_lux"])
}
{:wh65b, data}
end
defp transform_data(%{"model" => "Fineoffset-WH32B"} = data) do
data = %{
dateTime: format_date_time(data),
inTemp: Utils.parse_float(data["temperature_C"]),
inHumidity: Utils.parse_float(data["humidity"]),
pressure: Utils.parse_float(data["pressure_hPa"])
}
{:wh32b, data}
end
@spec format_date_time(parsed_body()) :: non_neg_integer()
defp format_date_time(data) do
{:ok, dt, 0} =
data
|> Map.get("time")
|> String.replace(" ", "T")
|> Utils.append_string("Z")
|> DateTime.from_iso8601()
DateTime.to_unix(dt)
end
@spec valid_data?(:wh65b | :wh32b, Publisher.data()) :: boolean()
defp valid_data?(:wh65b, data) do
Map.has_key?(data, :outTemp) and is_number(data.outTemp)
end
defp valid_data?(:wh32b, data) do
Map.has_key?(data, :inTemp) and is_number(data.inTemp)
end
end

47
lib/weewx_proxy/utils.ex Normal file
View file

@ -0,0 +1,47 @@
defmodule WeewxProxy.Utils do
@spec parse_integer(String.t()) :: integer() | nil
def parse_integer(str) when is_binary(str) and byte_size(str) > 0 do
case Integer.parse(str) do
{int, ""} -> int
_error -> nil
end
end
def parse_integer(_), do: nil
@spec parse_float(String.t() | integer() | float()) :: float() | nil
def parse_float(str) when is_binary(str) and byte_size(str) > 0 do
case Float.parse(str) do
{float, _rem} -> float
_error -> nil
end
end
def parse_float(float) when is_float(float), do: float
def parse_float(int) when is_integer(int), do: int / 1.0
def parse_float(_), do: nil
@spec utc_timestamp :: non_neg_integer()
def utc_timestamp do
:os.system_time(:seconds)
end
@spec utc_offset_string(String.t()) :: String.t()
def utc_offset_string(tz) do
{:ok, dt} = DateTime.now(tz)
offset = dt.utc_offset
if offset > 0 do
"+#{offset}"
else
to_string(offset)
end
end
@spec append_string(String.t(), String.t()) :: String.t()
def append_string(str, append) do
str <> append
end
end

34
mix.exs Normal file
View file

@ -0,0 +1,34 @@
defmodule WeewxProxy.MixProject do
use Mix.Project
def project do
[
app: :weewx_proxy,
version: "0.1.0",
elixir: "~> 1.13",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {WeewxProxy.Application, []}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:plug_cowboy, "~> 2.6"},
{:tortoise311, "~> 0.11"},
{:httpoison, "~> 2.1"},
{:jason, "~> 1.4"},
{:tz, "~> 0.26"},
{:typed_struct, "~> 0.3.0", runtime: false},
{:dialyxir, "~> 1.3", only: [:dev], runtime: false}
]
end
end

27
mix.lock Normal file
View file

@ -0,0 +1,27 @@
%{
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"},
"dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"httpoison": {:hex, :httpoison, "2.1.0", "655fd9a7b0b95ee3e9a3b535cf7ac8e08ef5229bab187fa86ac4208b122d934b", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "fc455cb4306b43827def4f57299b2d5ac8ac331cb23f517e734a4b78210a160c"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "2.0.3", "3676436d3d1f7b81b5a2d2bd8405f412c677558c81b1c92be58c00562bb59095", [:mix], [], "hexpm", "27a30bf0db44d25eecba73755acf4068cbfe26a4372f9eb3e4ea3a45956bff6b"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"plug": {:hex, :plug, "1.14.2", "cff7d4ec45b4ae176a227acd94a7ab536d9b37b942c8e8fa6dfc0fff98ff4d80", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "842fc50187e13cf4ac3b253d47d9474ed6c296a8732752835ce4a86acdf68d13"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"},
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"tortoise311": {:hex, :tortoise311, "0.11.5", "4d2850ea123987cfdd9ebdced510977cf37f6c392683d2ab862446fa14c171f3", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "61279fcbdfd1b139020eb3644f94ebdfcdf27a538ee6ba992e4c8bc67b628ba5"},
"typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"},
"tz": {:hex, :tz, "0.26.1", "773555ecb9c01c87fcf969b4c2d2140e63fe6b3d7d9520fa2134ac1072b540a8", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "da38cea41e9cfd0deaa7f634e167a30399dcc8b84fd3da32e1d972466053f57c"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}

337
mix.nix Normal file
View file

@ -0,0 +1,337 @@
{ lib, beamPackages, overrides ? (x: y: {}) }:
let
buildRebar3 = lib.makeOverridable beamPackages.buildRebar3;
buildMix = lib.makeOverridable beamPackages.buildMix;
buildErlangMk = lib.makeOverridable beamPackages.buildErlangMk;
self = packages // (overrides self packages);
packages = with beamPackages; with self; {
certifi = buildRebar3 rec {
name = "certifi";
version = "2.9.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0ha6vmf5p3xlbf5w1msa89frhvfk535rnyfybz9wdmh6vdms8v96";
};
beamDeps = [];
};
cowboy = buildErlangMk rec {
name = "cowboy";
version = "2.10.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0sqxqjdykxc2ai9cvkc0xjwkvr80z98wzlqlrd1z3iiw32vwrz9s";
};
beamDeps = [ cowlib ranch ];
};
cowboy_telemetry = buildRebar3 rec {
name = "cowboy_telemetry";
version = "0.4.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1pn90is3k9dq64wbijvzkqb6ldfqvwiqi7ymc8dx6ra5xv0vm63x";
};
beamDeps = [ cowboy telemetry ];
};
cowlib = buildRebar3 rec {
name = "cowlib";
version = "2.12.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1c4dgi8canscyjgddp22mjc69znvwy44wk3r7jrl2wvs6vv76fqn";
};
beamDeps = [];
};
dialyxir = buildMix rec {
name = "dialyxir";
version = "1.3.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0vv90jip2w362n3l7dkhqfdwlz97nwji535kn3fbk3dassya9ch0";
};
beamDeps = [ erlex ];
};
erlex = buildMix rec {
name = "erlex";
version = "0.2.6";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0x8c1j62y748ldvlh46sxzv5514rpzm809vxn594vd7y25by5lif";
};
beamDeps = [];
};
gen_state_machine = buildMix rec {
name = "gen_state_machine";
version = "3.0.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "05gdy0cxsmg3j43xmd6vm3nzsi8vlk94kdzn15rypg5yfhjnan8a";
};
beamDeps = [];
};
hackney = buildRebar3 rec {
name = "hackney";
version = "1.18.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "13hja14kig5jnzcizpdghj68i88f0yd9wjdfjic9nzi98kzxmv54";
};
beamDeps = [ certifi idna metrics mimerl parse_trans ssl_verify_fun unicode_util_compat ];
};
httpoison = buildMix rec {
name = "httpoison";
version = "2.1.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "030n18hphjsafdz52gxj3hrsrj2s5ndjjmsgxxyq4hvb62s5qigw";
};
beamDeps = [ hackney ];
};
idna = buildRebar3 rec {
name = "idna";
version = "6.1.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1sjcjibl34sprpf1dgdmzfww24xlyy34lpj7mhcys4j4i6vnwdwj";
};
beamDeps = [ unicode_util_compat ];
};
jason = buildMix rec {
name = "jason";
version = "1.4.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0891p2yrg3ri04p302cxfww3fi16pvvw1kh4r91zg85jhl87k8vr";
};
beamDeps = [];
};
metrics = buildRebar3 rec {
name = "metrics";
version = "1.0.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "05lz15piphyhvvm3d1ldjyw0zsrvz50d2m5f2q3s8x2gvkfrmc39";
};
beamDeps = [];
};
mime = buildMix rec {
name = "mime";
version = "2.0.3";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0szzdfalafpawjrrwbrplhkgxjv8837mlxbkpbn5xlj4vgq0p8r7";
};
beamDeps = [];
};
mimerl = buildRebar3 rec {
name = "mimerl";
version = "1.2.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "08wkw73dy449n68ssrkz57gikfzqk3vfnf264s31jn5aa1b5hy7j";
};
beamDeps = [];
};
parse_trans = buildRebar3 rec {
name = "parse_trans";
version = "3.3.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "12w8ai6b5s6b4hnvkav7hwxd846zdd74r32f84nkcmjzi1vrbk87";
};
beamDeps = [];
};
plug = buildMix rec {
name = "plug";
version = "1.14.2";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "04wdyv6nma74bj1m49vkm2bc5mjf8zclfg957fng8g71hw0wabw4";
};
beamDeps = [ mime plug_crypto telemetry ];
};
plug_cowboy = buildMix rec {
name = "plug_cowboy";
version = "2.6.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "04v6xc4v741dr2y38j66fmcc4xc037dnaxzkj2vih6j53yif2dny";
};
beamDeps = [ cowboy cowboy_telemetry plug ];
};
plug_crypto = buildMix rec {
name = "plug_crypto";
version = "1.2.5";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0hnqgzc3zas7j7wycgnkkdhaji5farkqccy2n4p1gqj5ccfrlm16";
};
beamDeps = [];
};
ranch = buildRebar3 rec {
name = "ranch";
version = "1.8.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1rfz5ld54pkd2w25jadyznia2vb7aw9bclck21fizargd39wzys9";
};
beamDeps = [];
};
ssl_verify_fun = buildRebar3 rec {
name = "ssl_verify_fun";
version = "1.1.6";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1026l1z1jh25z8bfrhaw0ryk5gprhrpnirq877zqhg253x3x5c5x";
};
beamDeps = [];
};
telemetry = buildRebar3 rec {
name = "telemetry";
version = "1.2.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "1mgyx9zw92g6w8fp9pblm3b0bghwxwwcbslrixq23ipzisfwxnfs";
};
beamDeps = [];
};
tortoise311 = buildMix rec {
name = "tortoise311";
version = "0.11.5";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "19cbc9xwd2sc5scvmrlfadxg5kfzxfa4yr5k1q13kcfivz5ry9v1";
};
beamDeps = [ gen_state_machine telemetry ];
};
typed_struct = buildMix rec {
name = "typed_struct";
version = "0.3.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0v8v3l8j7g3ran3f9gc2nc1mkj6kwfdr6kshm2cf3r0zlv1xa2y5";
};
beamDeps = [];
};
tz = buildMix rec {
name = "tz";
version = "0.26.1";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "0z7madh4cwnrw4rdmlsgp34dr683ldky2d7nlzm0vzcw3sjcwf6s";
};
beamDeps = [];
};
unicode_util_compat = buildRebar3 rec {
name = "unicode_util_compat";
version = "0.7.0";
src = fetchHex {
pkg = "${name}";
version = "${version}";
sha256 = "08952lw8cjdw8w171lv8wqbrxc4rcmb3jhkrdb7n06gngpbfdvi5";
};
beamDeps = [];
};
};
in self

1
test/test_helper.exs Normal file
View file

@ -0,0 +1 @@
ExUnit.start()