about summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2022-05-24 13:59:21 +0200
committerClaire <claire.github-309c@sitedethib.com>2022-05-24 13:59:21 +0200
commit22111914bfc838a43bb2e20244c965467f127de2 (patch)
tree8f6301da32a36a3ebeec0b965a74f67c02632f7c /lib
parentc279dbd47082e908d98dc8cf869c0ff7fc19f1ae (diff)
parente5997a195602624efdb366e9f09ffa377e859580 (diff)
Merge branch 'main' into glitch-soc/merge-upstream
Diffstat (limited to 'lib')
-rw-r--r--lib/chewy/strategy/custom_sidekiq.rb11
-rw-r--r--lib/chewy/strategy/mastodon.rb27
-rw-r--r--lib/mastodon/search_cli.rb129
3 files changed, 67 insertions, 100 deletions
diff --git a/lib/chewy/strategy/custom_sidekiq.rb b/lib/chewy/strategy/custom_sidekiq.rb
deleted file mode 100644
index 794ae4ed4..000000000
--- a/lib/chewy/strategy/custom_sidekiq.rb
+++ /dev/null
@@ -1,11 +0,0 @@
-# frozen_string_literal: true
-
-module Chewy
-  class Strategy
-    class CustomSidekiq < Sidekiq
-      def update(_type, _objects, _options = {})
-        super if Chewy.enabled?
-      end
-    end
-  end
-end
diff --git a/lib/chewy/strategy/mastodon.rb b/lib/chewy/strategy/mastodon.rb
new file mode 100644
index 000000000..ee8b92186
--- /dev/null
+++ b/lib/chewy/strategy/mastodon.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+module Chewy
+  class Strategy
+    class Mastodon < Base
+      def initialize
+        super
+
+        @stash = Hash.new { |hash, key| hash[key] = [] }
+      end
+
+      def update(type, objects, _options = {})
+        @stash[type].concat(type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)) if Chewy.enabled?
+      end
+
+      def leave
+        RedisConfiguration.with do |redis|
+          redis.pipelined do |pipeline|
+            @stash.each do |type, ids|
+              pipeline.sadd("chewy:queue:#{type.name}", ids)
+            end
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 74f980ba1..b579ebc14 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -16,19 +16,21 @@ module Mastodon
       StatusesIndex,
     ].freeze
 
-    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
-    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
+    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
     option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
+    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
     desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
     long_desc <<~LONG_DESC
       If Elasticsearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
 
       This command will also upgrade indices if the underlying schema has been
-      changed since the last run.
+      changed since the last run. Index upgrades erase index data.
 
       Even if creating or upgrading indices is not necessary, data from the
-      database will be imported into the indices.
+      database will be imported into the indices, unless overriden with --no-import.
     LONG_DESC
     def deploy
       if options[:concurrency] < 1
@@ -49,7 +51,9 @@ module Mastodon
         end
       end
 
-      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
+      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
+      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
 
       # First, ensure all indices are created and have the correct
       # structure, so that live data can already be written
@@ -59,99 +63,46 @@ module Mastodon
         index.specification.lock!
       end
 
+      progress.title = 'Estimating workload '
+      progress.total = indices.sum { |index| importers[index].estimate! }
+
       reset_connection_pools!
 
-      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
-      added   = Concurrent::AtomicFixnum.new(0)
-      removed = Concurrent::AtomicFixnum.new(0)
+      added   = 0
+      removed = 0
 
-      progress.title = 'Estimating workload '
+      indices.each do |index|
+        importer = importers[index]
+        importer.optimize_for_import!
+
+        importer.on_progress do |(indexed, deleted)|
+          progress.total = nil if progress.progress + indexed + deleted > progress.total
+          progress.progress += indexed + deleted
+          added   += indexed
+          removed += deleted
+        end
 
-      # Estimate the amount of data that has to be imported first
-      progress.total = indices.sum { |index| index.adapter.default_scope.count }
+        importer.on_failure do |reason|
+          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
+        end
 
-      # Now import all the actual data. Mind that unlike chewy:sync, we don't
-      # fetch and compare all record IDs from the database and the index to
-      # find out which to add and which to remove from the index. Because with
-      # potentially millions of rows, the memory footprint of such a calculation
-      # is uneconomical. So we only ever add.
-      indices.each do |index|
-        progress.title = "Importing #{index} "
-        batch_size     = options[:batch_size]
-        slice_size     = (batch_size / options[:concurrency]).ceil
-
-        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-          futures = []
-
-          batch.each_slice(slice_size) do |records|
-            futures << Concurrent::Future.execute(executor: pool) do
-              begin
-                if !progress.total.nil? && progress.progress + records.size > progress.total
-                  # The number of items has changed between start and now,
-                  # since there is no good way to predict the final count from
-                  # here, just change the progress bar to an indeterminate one
-
-                  progress.total = nil
-                end
-
-                grouped_records = nil
-                bulk_body       = nil
-                index_count     = 0
-                delete_count    = 0
-
-                ActiveRecord::Base.connection_pool.with_connection do
-                  grouped_records = records.to_a.group_by do |record|
-                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
-                  end
-
-                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
-                end
-
-                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
-                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
-
-                # The following is an optimization for statuses specifically, since
-                # we want to de-index statuses that cannot be searched by anybody,
-                # but can't use Chewy's delete_if logic because it doesn't use
-                # crutches and our searchable_by logic depends on them
-                if index == StatusesIndex
-                  bulk_body.map! do |entry|
-                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
-                      index_count  -= 1
-                      delete_count += 1
-
-                      { delete: entry[:to_index].except(:data) }
-                    else
-                      entry
-                    end
-                  end
-                end
-
-                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
-
-                progress.progress += records.size
-
-                added.increment(index_count)
-                removed.increment(delete_count)
-
-                sleep 1
-              rescue => e
-                progress.log pastel.red("Error importing #{index}: #{e}")
-              ensure
-                RedisConfiguration.pool.checkin if Thread.current[:redis]
-                Thread.current[:redis] = nil
-              end
-            end
-          end
-
-          futures.map(&:value)
+        if options[:import]
+          progress.title = "Importing #{index} "
+          importer.import!
+        end
+
+        if options[:clean]
+          progress.title = "Cleaning #{index} "
+          importer.clean_up!
         end
+      ensure
+        importer.optimize_for_search!
       end
 
-      progress.title = ''
-      progress.stop
+      progress.title = 'Done! '
+      progress.finish
 
-      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
+      say("Indexed #{added} records, de-indexed #{removed}", :green, true)
     end
   end
 end