From f0e670d99f2dd8a176ec2b59588792b7c84e2b4e Mon Sep 17 00:00:00 2001 From: Daniel Kempkens Date: Mon, 14 Aug 2023 18:23:29 +0200 Subject: [PATCH] feat: Only re-import changed files --- .formatter.exs | 2 +- lib/bdfr_browser/http/plug.ex | 5 ++ lib/bdfr_browser/importer.ex | 103 ++++++++++++++++++++++++++++++++-- mix.exs | 2 + mix.lock | 2 + mix.nix | 26 +++++++++ 6 files changed, 135 insertions(+), 5 deletions(-) diff --git a/.formatter.exs b/.formatter.exs index 433fea5..d19e5ae 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -4,5 +4,5 @@ "{config,lib,test}/**/*.{ex,exs}" ], line_length: 120, - import_deps: [:ecto, :ecto_sql, :plug] + import_deps: [:ecto, :ecto_sql, :plug, :typed_struct] ] diff --git a/lib/bdfr_browser/http/plug.ex b/lib/bdfr_browser/http/plug.ex index 7165bf0..2c79592 100644 --- a/lib/bdfr_browser/http/plug.ex +++ b/lib/bdfr_browser/http/plug.ex @@ -117,6 +117,11 @@ defmodule BdfrBrowser.HTTP.Plug do send_resp(conn, 200, "IMPORTING") end + post "/_import_changes" do + :ok = BdfrBrowser.Importer.background_import_changes() + send_resp(conn, 200, "IMPORTING CHANGES") + end + get "/_ping" do send_resp(conn, 200, "PONG") end diff --git a/lib/bdfr_browser/importer.ex b/lib/bdfr_browser/importer.ex index 3b781d7..1597297 100644 --- a/lib/bdfr_browser/importer.ex +++ b/lib/bdfr_browser/importer.ex @@ -5,6 +5,16 @@ defmodule BdfrBrowser.Importer do alias BdfrBrowser.{Chat, Comment, Message, Post, Repo, Subreddit} + defmodule State do + use TypedStruct + + typedstruct do + field :fs_pid, pid + field :post_changes, [Path.t()], default: [] + field :chat_changes, [Path.t()], default: [] + end + end + def start_link([]) do GenServer.start_link(__MODULE__, [], name: __MODULE__) end @@ -69,11 +79,25 @@ defmodule BdfrBrowser.Importer do GenServer.cast(__MODULE__, :background_import) end + def background_import_changes do + GenServer.cast(__MODULE__, :background_import_changes) + end + # Callbacks @impl true def init([]) do - {:ok, nil} + {:ok, %State{}, {:continue, :setup_fs}} + end + + @impl true + def handle_continue(:setup_fs, state) do + base_directory = Application.fetch_env!(:bdfr_browser, :base_directory) + chat_directory = Application.fetch_env!(:bdfr_browser, :chat_directory) + + {:ok, pid} = FileSystem.start_link(dirs: [base_directory, chat_directory]) + :ok = FileSystem.subscribe(pid) + {:noreply, %State{state | fs_pid: pid}} end @impl true @@ -84,6 +108,41 @@ defmodule BdfrBrowser.Importer do {:noreply, state} end + @impl true + def handle_cast(:background_import_changes, state) do + _ = subreddits() + _ = changed_posts_and_comments(state.post_changes) + _ = changed_chats(state.chat_changes) + {:noreply, %State{state | post_changes: [], chat_changes: []}} + end + + @impl true + def handle_info({:file_event, pid, {path, events}}, %State{fs_pid: pid} = state) do + _ = Logger.info("Events `#{inspect(events)}' on file `#{path}'") + base_directory = Application.fetch_env!(:bdfr_browser, :base_directory) + chat_directory = Application.fetch_env!(:bdfr_browser, :chat_directory) + ext = Path.extname(path) + + new_state = + cond do + String.contains?(path, chat_directory) and ext == ".json" -> + %State{state | chat_changes: [path | state.chat_changes]} + + String.contains?(path, base_directory) and ext == ".json" -> + %State{state | post_changes: [path | state.post_changes]} + + true -> + state + end + + {:noreply, new_state} + end + + @impl true + def handle_info({:file_event, pid, :stop}, %State{fs_pid: pid} = state) do + {:noreply, %State{state | fs_pid: nil}, {:continue, :setup_fs}} + end + # Helper defp list_folders(args) do @@ -100,6 +159,42 @@ defmodule BdfrBrowser.Importer do |> Enum.sort_by(&String.downcase/1, sort) end + defp changed_posts_and_comments(posts) do + result = + for file_path <- posts do + _ = Logger.info("Importing changed file `#{file_path}' ...") + + post = file_path |> File.read!() |> Jason.decode!() + post = Map.put(post, "filename", Path.basename(file_path)) + + ["r", subreddit, "comments", _, _] = String.split(post["permalink"], "/", trim: true) + subreddit_record = Repo.get_by(Subreddit, name: subreddit) + {:ok, post_record} = import_post(post, subreddit_record) + comment_records = for comment <- post["comments"], do: import_comment(comment, post_record, nil) + + {post_record, List.flatten(comment_records)} + end + + List.flatten(result) + end + + defp changed_chats(chats) do + result = + for file_path <- chats do + _ = Logger.info("Importing changed file `#{file_path}' ...") + + chat = file_path |> File.read!() |> Jason.decode!() + chat = Map.put(chat, "filename", Path.basename(file_path)) + + {:ok, chat_record} = import_chat(chat) + message_records = for message <- chat["messages"], do: import_message(message, chat_record) + + {chat_record, List.flatten(message_records)} + end + + List.flatten(result) + end + defp read_posts(args) do posts = list_folders(args) sort = Keyword.get(args, :sort, :desc) @@ -156,7 +251,7 @@ defmodule BdfrBrowser.Importer do old_chats ++ new_chats end - defp import_post(post, subreddit) do + defp import_post(post, subreddit) when not is_nil(subreddit) do id = post["id"] %Post{ @@ -177,7 +272,7 @@ defmodule BdfrBrowser.Importer do ) end - defp import_comment(comment, post, parent) do + defp import_comment(comment, post, parent) when not is_nil(post) do id = comment["id"] {:ok, parent} = @@ -214,7 +309,7 @@ defmodule BdfrBrowser.Importer do ) end - defp import_message(message, chat) do + defp import_message(message, chat) when not is_nil(chat) do id = :sha3_256 |> :crypto.hash([chat.id, message["timestamp"]]) |> Base.encode16(case: :lower) {:ok, posted_at, 0} = DateTime.from_iso8601(message["timestamp"]) diff --git a/mix.exs b/mix.exs index d1b25c4..3753e62 100644 --- a/mix.exs +++ b/mix.exs @@ -25,6 +25,8 @@ defmodule BdfrBrowser.MixProject do {:postgrex, "~> 0.17"}, {:jason, "~> 1.4"}, {:earmark, "~> 1.4"}, + {:file_system, "~> 0.2.10"}, + {:typed_struct, "~> 0.3.0", runtime: false}, {:systemd, "~> 0.6"} ] end diff --git a/mix.lock b/mix.lock index 658afa0..0de9fac 100644 --- a/mix.lock +++ b/mix.lock @@ -9,6 +9,7 @@ "ecto": {:hex, :ecto, "3.10.3", "eb2ae2eecd210b4eb8bece1217b297ad4ff824b4384c0e3fdd28aaf96edd6135", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "44bec74e2364d491d70f7e42cd0d690922659d329f6465e89feb8a34e8cd3433"}, "ecto_sql": {:hex, :ecto_sql, "3.10.1", "6ea6b3036a0b0ca94c2a02613fd9f742614b5cfe494c41af2e6571bb034dd94c", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.10.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f6a25bdbbd695f12c8171eaff0851fa4c8e72eec1e98c7364402dda9ce11c56b"}, "enough": {:hex, :enough, "0.1.0", "0254710c52d324e2dadde54cb56fbb80a792c2eb285669b8379efd0752bf89f0", [:rebar3], [], "hexpm", "0460c7abda5f5e0ea592b12bc6976b8a5c4b96e42f332059cd396525374bf9a1"}, + "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "plug": {:hex, :plug, "1.14.2", "cff7d4ec45b4ae176a227acd94a7ab536d9b37b942c8e8fa6dfc0fff98ff4d80", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "842fc50187e13cf4ac3b253d47d9474ed6c296a8732752835ce4a86acdf68d13"}, @@ -18,4 +19,5 @@ "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "systemd": {:hex, :systemd, "0.6.2", "aaa24f1e3e6cb978c45369768b1abd766a0dbff637ed61254ca64797bcec9963", [:rebar3], [{:enough, "~> 0.1.0", [hex: :enough, repo: "hexpm", optional: false]}], "hexpm", "5062b911800c1ab05157c7bf9a9fbe23dd24c58891c87fd12d2e3ed8fc1708b8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "typed_struct": {:hex, :typed_struct, "0.3.0", "939789e3c1dca39d7170c87f729127469d1315dcf99fee8e152bb774b17e7ff7", [:mix], [], "hexpm", "c50bd5c3a61fe4e198a8504f939be3d3c85903b382bde4865579bc23111d1b6d"}, } diff --git a/mix.nix b/mix.nix index afe92e7..7f99935 100644 --- a/mix.nix +++ b/mix.nix @@ -138,6 +138,19 @@ let beamDeps = [ ]; }; + file_system = buildMix rec { + name = "file_system"; + version = "0.2.10"; + + src = fetchHex { + pkg = "${name}"; + version = "${version}"; + sha256 = "1p0myxmnjjds8bbg69dd6fvhk8q3n7lb78zd4qvmjajnzgdmw6a1"; + }; + + beamDeps = [ ]; + }; + jason = buildMix rec { name = "jason"; version = "1.4.1"; @@ -254,6 +267,19 @@ let beamDeps = [ ]; }; + + typed_struct = buildMix rec { + name = "typed_struct"; + version = "0.3.0"; + + src = fetchHex { + pkg = "${name}"; + version = "${version}"; + sha256 = "0v8v3l8j7g3ran3f9gc2nc1mkj6kwfdr6kshm2cf3r0zlv1xa2y5"; + }; + + beamDeps = [ ]; + }; }; in self