about summary refs log tree commit diff
path: root/lib/mastodon/search_cli.rb
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2022-05-22 22:16:43 +0200
committerGitHub <noreply@github.com>2022-05-22 22:16:43 +0200
commita9b64b24d6c076cb96a66307c07d4f0158dc07da (patch)
tree8d41ef354040e5825c99bb3f656606dcc8d61291 /lib/mastodon/search_cli.rb
parent54bb659ad14fda8d3427752d2c99716420997d6e (diff)
Change algorithm of `tootctl search deploy` to improve performance (#18463)
Diffstat (limited to 'lib/mastodon/search_cli.rb')
-rw-r--r--lib/mastodon/search_cli.rb129
1 files changed, 40 insertions, 89 deletions
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 74f980ba1..b579ebc14 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -16,19 +16,21 @@ module Mastodon
       StatusesIndex,
     ].freeze
 
-    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
-    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
+    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
     option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
+    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
     desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
     long_desc <<~LONG_DESC
       If Elasticsearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
 
       This command will also upgrade indices if the underlying schema has been
-      changed since the last run.
+      changed since the last run. Index upgrades erase index data.
 
       Even if creating or upgrading indices is not necessary, data from the
-      database will be imported into the indices.
+      database will be imported into the indices, unless overriden with --no-import.
     LONG_DESC
     def deploy
       if options[:concurrency] < 1
@@ -49,7 +51,9 @@ module Mastodon
         end
       end
 
-      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
+      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
+      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
 
       # First, ensure all indices are created and have the correct
       # structure, so that live data can already be written
@@ -59,99 +63,46 @@ module Mastodon
         index.specification.lock!
       end
 
+      progress.title = 'Estimating workload '
+      progress.total = indices.sum { |index| importers[index].estimate! }
+
       reset_connection_pools!
 
-      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
-      added   = Concurrent::AtomicFixnum.new(0)
-      removed = Concurrent::AtomicFixnum.new(0)
+      added   = 0
+      removed = 0
 
-      progress.title = 'Estimating workload '
+      indices.each do |index|
+        importer = importers[index]
+        importer.optimize_for_import!
+
+        importer.on_progress do |(indexed, deleted)|
+          progress.total = nil if progress.progress + indexed + deleted > progress.total
+          progress.progress += indexed + deleted
+          added   += indexed
+          removed += deleted
+        end
 
-      # Estimate the amount of data that has to be imported first
-      progress.total = indices.sum { |index| index.adapter.default_scope.count }
+        importer.on_failure do |reason|
+          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
+        end
 
-      # Now import all the actual data. Mind that unlike chewy:sync, we don't
-      # fetch and compare all record IDs from the database and the index to
-      # find out which to add and which to remove from the index. Because with
-      # potentially millions of rows, the memory footprint of such a calculation
-      # is uneconomical. So we only ever add.
-      indices.each do |index|
-        progress.title = "Importing #{index} "
-        batch_size     = options[:batch_size]
-        slice_size     = (batch_size / options[:concurrency]).ceil
-
-        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-          futures = []
-
-          batch.each_slice(slice_size) do |records|
-            futures << Concurrent::Future.execute(executor: pool) do
-              begin
-                if !progress.total.nil? && progress.progress + records.size > progress.total
-                  # The number of items has changed between start and now,
-                  # since there is no good way to predict the final count from
-                  # here, just change the progress bar to an indeterminate one
-
-                  progress.total = nil
-                end
-
-                grouped_records = nil
-                bulk_body       = nil
-                index_count     = 0
-                delete_count    = 0
-
-                ActiveRecord::Base.connection_pool.with_connection do
-                  grouped_records = records.to_a.group_by do |record|
-                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
-                  end
-
-                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
-                end
-
-                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
-                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
-
-                # The following is an optimization for statuses specifically, since
-                # we want to de-index statuses that cannot be searched by anybody,
-                # but can't use Chewy's delete_if logic because it doesn't use
-                # crutches and our searchable_by logic depends on them
-                if index == StatusesIndex
-                  bulk_body.map! do |entry|
-                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
-                      index_count  -= 1
-                      delete_count += 1
-
-                      { delete: entry[:to_index].except(:data) }
-                    else
-                      entry
-                    end
-                  end
-                end
-
-                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
-
-                progress.progress += records.size
-
-                added.increment(index_count)
-                removed.increment(delete_count)
-
-                sleep 1
-              rescue => e
-                progress.log pastel.red("Error importing #{index}: #{e}")
-              ensure
-                RedisConfiguration.pool.checkin if Thread.current[:redis]
-                Thread.current[:redis] = nil
-              end
-            end
-          end
-
-          futures.map(&:value)
+        if options[:import]
+          progress.title = "Importing #{index} "
+          importer.import!
+        end
+
+        if options[:clean]
+          progress.title = "Cleaning #{index} "
+          importer.clean_up!
         end
+      ensure
+        importer.optimize_for_search!
       end
 
-      progress.title = ''
-      progress.stop
+      progress.title = 'Done! '
+      progress.finish
 
-      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
+      say("Indexed #{added} records, de-indexed #{removed}", :green, true)
     end
   end
 end