diff --git a/config/runtime.exs b/config/runtime.exs index 80a19c2..9c67ccd 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -6,15 +6,11 @@ config :weewx_proxy, mqtt_weewx_user: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_USER"), mqtt_weewx_password: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_PASSWORD"), mqtt_weewx_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_WEEWX_CLIENT_ID", "WeewxBroker")), - mqtt_sdr_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_SDR_HOST"))), 1), - mqtt_sdr_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_SDR_PORT", "1883")), 0), - mqtt_sdr_user: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_USER"), - mqtt_sdr_password: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_PASSWORD"), + mqtt_local_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_HOST"))), 1), + mqtt_local_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_LOCAL_PORT", "1883")), 0), + mqtt_local_user: System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_USER"), + mqtt_local_password: System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_PASSWORD"), mqtt_sdr_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_SDR_CLIENT_ID", "SdrIngestLocal")), - mqtt_modbus_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_HOST"))), 1), - mqtt_modbus_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_MODBUS_PORT", "1884")), 0), - mqtt_modbus_user: System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_USER"), - mqtt_modbus_password: System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_PASSWORD"), mqtt_modbus_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_MODBUS_CLIENT_ID", "ModbusIngestLocal")), purpleair_url: System.fetch_env!("WEEWX_PROXY_PURPLEAIR_URL") diff --git a/lib/weewx_proxy/modbus/deye.ex b/lib/weewx_proxy/modbus/deye.ex index 12944d9..a648e73 100644 --- a/lib/weewx_proxy/modbus/deye.ex +++ b/lib/weewx_proxy/modbus/deye.ex @@ -75,6 +75,7 @@ defmodule WeewxProxy.Modbus.Deye do } :ok = Publisher.publish("weewx/ingest_si", mqtt_data) + :ok = Publisher.publish_value_map("hadata/bitShake", mqtt_data) {:ok, state} else @@ -120,6 +121,7 @@ defmodule WeewxProxy.Modbus.Deye do } :ok = Publisher.publish("weewx/ingest_si", data) + :ok = Publisher.publish_value_map("hadata/deye", data) nil end @@ -138,6 +140,7 @@ defmodule WeewxProxy.Modbus.Deye do } :ok = Publisher.publish("weewx/ingest_si", data) + :ok = Publisher.publish_value_map("hadata/deye", data) nil end diff --git a/lib/weewx_proxy/mqtt.ex b/lib/weewx_proxy/mqtt.ex index 2b2dfe4..94d2f8c 100644 --- a/lib/weewx_proxy/mqtt.ex +++ b/lib/weewx_proxy/mqtt.ex @@ -38,16 +38,28 @@ defmodule WeewxProxy.Mqtt do password: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_password), handler: {Tortoise311.Handler.Logger, []} ]}, + {Tortoise311.Connection, + [ + name: WeewxProxy.Mqtt.LocalBroker, + client_id: LocalBroker, + server: + {Tortoise311.Transport.Tcp, + host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host), + port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)}, + user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user), + password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password), + handler: {Tortoise311.Handler.Logger, []} + ]}, {Tortoise311.Connection, [ name: WeewxProxy.Mqtt.SdrIngest, client_id: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_client_id), server: {Tortoise311.Transport.Tcp, - host: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_host), - port: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_port)}, - user_name: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_user), - password: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_password), + host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host), + port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)}, + user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user), + password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password), subscriptions: ["rtl433"], handler: {WeewxProxy.Sdr.Ecowitt, []} ]}, @@ -57,10 +69,10 @@ defmodule WeewxProxy.Mqtt do client_id: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_client_id), server: {Tortoise311.Transport.Tcp, - host: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_host), - port: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_port)}, - user_name: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_user), - password: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_password), + host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host), + port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)}, + user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user), + password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password), subscriptions: ["deye/#", "bitshake/#"], handler: {WeewxProxy.Modbus.Deye, []} ]} diff --git a/lib/weewx_proxy/publisher.ex b/lib/weewx_proxy/publisher.ex index 00d4423..332ccb3 100644 --- a/lib/weewx_proxy/publisher.ex +++ b/lib/weewx_proxy/publisher.ex @@ -41,6 +41,19 @@ defmodule WeewxProxy.Publisher do GenServer.cast(@name, {:publish, topic, filtered_data}) end + @spec publish_value_map(String.t(), data()) :: :ok + def publish_value_map(topic, data) do + filtered_data = :maps.filter(fn _k, v -> not is_nil(v) end, data) + + _ = + for {key, value} <- filtered_data do + full_topic = "#{topic}/#{key}" + GenServer.cast(@name, {:publish_value, full_topic, to_string(value)}) + end + + :ok + end + # Callbacks @impl true @@ -57,4 +70,13 @@ defmodule WeewxProxy.Publisher do {:noreply, %State{state | last_update: Utils.utc_timestamp()}} end + + @impl true + def handle_cast({:publish_value, topic, data}, state) do + _ = Logger.info("Publishing value to #{topic}") + _ = Tortoise311.publish(LocalBroker, topic, data, qos: 0, timeout: 1000) + _ = Logger.debug("Published value: #{inspect(data)}") + + {:noreply, %State{state | last_update: Utils.utc_timestamp()}} + end end