feat(mqtt): re-publish in a way home-assistant understands better
All checks were successful
Build / build (push) Successful in 4m23s
All checks were successful
Build / build (push) Successful in 4m23s
This commit is contained in:
parent
b72605185b
commit
42edf668a4
4 changed files with 49 additions and 16 deletions
|
@ -6,15 +6,11 @@ config :weewx_proxy,
|
||||||
mqtt_weewx_user: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_USER"),
|
mqtt_weewx_user: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_USER"),
|
||||||
mqtt_weewx_password: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_PASSWORD"),
|
mqtt_weewx_password: System.fetch_env!("WEEWX_PROXY_MQTT_WEEWX_PASSWORD"),
|
||||||
mqtt_weewx_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_WEEWX_CLIENT_ID", "WeewxBroker")),
|
mqtt_weewx_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_WEEWX_CLIENT_ID", "WeewxBroker")),
|
||||||
mqtt_sdr_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_SDR_HOST"))), 1),
|
mqtt_local_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_HOST"))), 1),
|
||||||
mqtt_sdr_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_SDR_PORT", "1883")), 0),
|
mqtt_local_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_LOCAL_PORT", "1883")), 0),
|
||||||
mqtt_sdr_user: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_USER"),
|
mqtt_local_user: System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_USER"),
|
||||||
mqtt_sdr_password: System.fetch_env!("WEEWX_PROXY_MQTT_SDR_PASSWORD"),
|
mqtt_local_password: System.fetch_env!("WEEWX_PROXY_MQTT_LOCAL_PASSWORD"),
|
||||||
mqtt_sdr_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_SDR_CLIENT_ID", "SdrIngestLocal")),
|
mqtt_sdr_client_id: String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_SDR_CLIENT_ID", "SdrIngestLocal")),
|
||||||
mqtt_modbus_host: elem(:inet.parse_address(to_charlist(System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_HOST"))), 1),
|
|
||||||
mqtt_modbus_port: elem(Integer.parse(System.get_env("WEEWX_PROXY_MQTT_MODBUS_PORT", "1884")), 0),
|
|
||||||
mqtt_modbus_user: System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_USER"),
|
|
||||||
mqtt_modbus_password: System.fetch_env!("WEEWX_PROXY_MQTT_MODBUS_PASSWORD"),
|
|
||||||
mqtt_modbus_client_id:
|
mqtt_modbus_client_id:
|
||||||
String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_MODBUS_CLIENT_ID", "ModbusIngestLocal")),
|
String.to_atom("Elixir." <> System.get_env("WEEWX_PROXY_MQTT_MODBUS_CLIENT_ID", "ModbusIngestLocal")),
|
||||||
purpleair_url: System.fetch_env!("WEEWX_PROXY_PURPLEAIR_URL")
|
purpleair_url: System.fetch_env!("WEEWX_PROXY_PURPLEAIR_URL")
|
||||||
|
|
|
@ -75,6 +75,7 @@ defmodule WeewxProxy.Modbus.Deye do
|
||||||
}
|
}
|
||||||
|
|
||||||
:ok = Publisher.publish("weewx/ingest_si", mqtt_data)
|
:ok = Publisher.publish("weewx/ingest_si", mqtt_data)
|
||||||
|
:ok = Publisher.publish_value_map("hadata/bitShake", mqtt_data)
|
||||||
|
|
||||||
{:ok, state}
|
{:ok, state}
|
||||||
else
|
else
|
||||||
|
@ -120,6 +121,7 @@ defmodule WeewxProxy.Modbus.Deye do
|
||||||
}
|
}
|
||||||
|
|
||||||
:ok = Publisher.publish("weewx/ingest_si", data)
|
:ok = Publisher.publish("weewx/ingest_si", data)
|
||||||
|
:ok = Publisher.publish_value_map("hadata/deye", data)
|
||||||
|
|
||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
@ -138,6 +140,7 @@ defmodule WeewxProxy.Modbus.Deye do
|
||||||
}
|
}
|
||||||
|
|
||||||
:ok = Publisher.publish("weewx/ingest_si", data)
|
:ok = Publisher.publish("weewx/ingest_si", data)
|
||||||
|
:ok = Publisher.publish_value_map("hadata/deye", data)
|
||||||
|
|
||||||
nil
|
nil
|
||||||
end
|
end
|
||||||
|
|
|
@ -38,16 +38,28 @@ defmodule WeewxProxy.Mqtt do
|
||||||
password: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_password),
|
password: Application.fetch_env!(:weewx_proxy, :mqtt_weewx_password),
|
||||||
handler: {Tortoise311.Handler.Logger, []}
|
handler: {Tortoise311.Handler.Logger, []}
|
||||||
]},
|
]},
|
||||||
|
{Tortoise311.Connection,
|
||||||
|
[
|
||||||
|
name: WeewxProxy.Mqtt.LocalBroker,
|
||||||
|
client_id: LocalBroker,
|
||||||
|
server:
|
||||||
|
{Tortoise311.Transport.Tcp,
|
||||||
|
host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host),
|
||||||
|
port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)},
|
||||||
|
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user),
|
||||||
|
password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password),
|
||||||
|
handler: {Tortoise311.Handler.Logger, []}
|
||||||
|
]},
|
||||||
{Tortoise311.Connection,
|
{Tortoise311.Connection,
|
||||||
[
|
[
|
||||||
name: WeewxProxy.Mqtt.SdrIngest,
|
name: WeewxProxy.Mqtt.SdrIngest,
|
||||||
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_client_id),
|
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_client_id),
|
||||||
server:
|
server:
|
||||||
{Tortoise311.Transport.Tcp,
|
{Tortoise311.Transport.Tcp,
|
||||||
host: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_host),
|
host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host),
|
||||||
port: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_port)},
|
port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)},
|
||||||
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_user),
|
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user),
|
||||||
password: Application.fetch_env!(:weewx_proxy, :mqtt_sdr_password),
|
password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password),
|
||||||
subscriptions: ["rtl433"],
|
subscriptions: ["rtl433"],
|
||||||
handler: {WeewxProxy.Sdr.Ecowitt, []}
|
handler: {WeewxProxy.Sdr.Ecowitt, []}
|
||||||
]},
|
]},
|
||||||
|
@ -57,10 +69,10 @@ defmodule WeewxProxy.Mqtt do
|
||||||
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_client_id),
|
client_id: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_client_id),
|
||||||
server:
|
server:
|
||||||
{Tortoise311.Transport.Tcp,
|
{Tortoise311.Transport.Tcp,
|
||||||
host: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_host),
|
host: Application.fetch_env!(:weewx_proxy, :mqtt_local_host),
|
||||||
port: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_port)},
|
port: Application.fetch_env!(:weewx_proxy, :mqtt_local_port)},
|
||||||
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_user),
|
user_name: Application.fetch_env!(:weewx_proxy, :mqtt_local_user),
|
||||||
password: Application.fetch_env!(:weewx_proxy, :mqtt_modbus_password),
|
password: Application.fetch_env!(:weewx_proxy, :mqtt_local_password),
|
||||||
subscriptions: ["deye/#", "bitshake/#"],
|
subscriptions: ["deye/#", "bitshake/#"],
|
||||||
handler: {WeewxProxy.Modbus.Deye, []}
|
handler: {WeewxProxy.Modbus.Deye, []}
|
||||||
]}
|
]}
|
||||||
|
|
|
@ -41,6 +41,19 @@ defmodule WeewxProxy.Publisher do
|
||||||
GenServer.cast(@name, {:publish, topic, filtered_data})
|
GenServer.cast(@name, {:publish, topic, filtered_data})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@spec publish_value_map(String.t(), data()) :: :ok
|
||||||
|
def publish_value_map(topic, data) do
|
||||||
|
filtered_data = :maps.filter(fn _k, v -> not is_nil(v) end, data)
|
||||||
|
|
||||||
|
_ =
|
||||||
|
for {key, value} <- filtered_data do
|
||||||
|
full_topic = "#{topic}/#{key}"
|
||||||
|
GenServer.cast(@name, {:publish_value, full_topic, to_string(value)})
|
||||||
|
end
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
# Callbacks
|
# Callbacks
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
@ -57,4 +70,13 @@ defmodule WeewxProxy.Publisher do
|
||||||
|
|
||||||
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
|
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_cast({:publish_value, topic, data}, state) do
|
||||||
|
_ = Logger.info("Publishing value to #{topic}")
|
||||||
|
_ = Tortoise311.publish(LocalBroker, topic, data, qos: 0, timeout: 1000)
|
||||||
|
_ = Logger.debug("Published value: #{inspect(data)}")
|
||||||
|
|
||||||
|
{:noreply, %State{state | last_update: Utils.utc_timestamp()}}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue