about summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2020-07-14 18:10:35 +0200
committerGitHub <noreply@github.com>2020-07-14 18:10:35 +0200
commit4abe3be321a620641b9a316b168ce754d7438eff (patch)
tree2ec72381f94c258a707268b31df2cadf0efcadac /lib
parent98b3b80d6bc6b8549448bb7fadc1449ba68a52ea (diff)
Change `tootctl search deploy` algorithm (#14300)
Diffstat (limited to 'lib')
-rw-r--r--lib/mastodon/cli_helper.rb1
-rw-r--r--lib/mastodon/search_cli.rb142
2 files changed, 127 insertions, 16 deletions
diff --git a/lib/mastodon/cli_helper.rb b/lib/mastodon/cli_helper.rb
index 4a20fa8d6..ed22f44b2 100644
--- a/lib/mastodon/cli_helper.rb
+++ b/lib/mastodon/cli_helper.rb
@@ -7,6 +7,7 @@ ActiveRecord::Base.logger    = dev_null
 ActiveJob::Base.logger       = dev_null
 HttpLog.configuration.logger = dev_null
 Paperclip.options[:log]      = false
+Chewy.logger                 = dev_null
 
 module Mastodon
   module CLIHelper
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 8bd5f9543..22a0acec8 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -6,8 +6,19 @@ require_relative 'cli_helper'
 
 module Mastodon
   class SearchCLI < Thor
-    option :processes, default: 2, aliases: [:p]
-    desc 'deploy', 'Create or update an ElasticSearch index and populate it'
+    include CLIHelper
+
+    # Indices are sorted by amount of data to be expected in each, so that
+    # smaller indices can go online sooner
+    INDICES = [
+      AccountsIndex,
+      TagsIndex,
+      StatusesIndex,
+    ].freeze
+
+    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
     long_desc <<~LONG_DESC
       If ElasticSearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
@@ -15,27 +26,126 @@ module Mastodon
       This command will also upgrade indices if the underlying schema has been
       changed since the last run.
 
-      With the --processes option, parallelize execution of the command. The
-      default is 2. If "auto" is specified, the number is automatically
-      derived from available CPUs.
+      Even if creating or upgrading indices is not necessary, data from the
+      database will be imported into the indices.
     LONG_DESC
     def deploy
-      processed = Chewy::RakeHelper.upgrade(parallel: processes)
-      Chewy::RakeHelper.sync(except: processed, parallel: processes)
-    end
+      if options[:concurrency] < 1
+        say('Cannot run with this concurrency setting, must be at least 1', :red)
+        exit(1)
+      end
+
+      indices = begin
+        if options[:only]
+          options[:only].map { |str| "#{str.camelize}Index".constantize }
+        else
+          INDICES
+        end
+      end
+
+      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+
+      # First, ensure all indices are created and have the correct
+      # structure, so that live data can already be written
+      indices.select { |index| index.specification.changed? }.each do |index|
+        progress.title = "Upgrading #{index} "
+        index.purge
+        index.specification.lock!
+      end
+
+      ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1
+
+      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
+      added   = Concurrent::AtomicFixnum.new(0)
+      removed = Concurrent::AtomicFixnum.new(0)
+
+      progress.title = 'Estimating workload '
+
+      # Estimate the amount of data that has to be imported first
+      indices.each do |index|
+        index.types.each do |type|
+          progress.total = (progress.total || 0) + type.adapter.default_scope.count
+        end
+      end
+
+      # Now import all the actual data. Mind that unlike chewy:sync, we don't
+      # fetch and compare all record IDs from the database and the index to
+      # find out which to add and which to remove from the index. Because with
+      # potentially millions of rows, the memory footprint of such a calculation
+      # is uneconomical. So we only ever add.
+      indices.each do |index|
+        progress.title = "Importing #{index} "
+        batch_size     = 1_000
+        slice_size     = (batch_size / options[:concurrency]).ceil
+
+        index.types.each do |type|
+          type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
+            futures = []
+
+            batch.each_slice(slice_size) do |records|
+              futures << Concurrent::Future.execute(executor: pool) do
+                begin
+                  if !progress.total.nil? && progress.progress + records.size > progress.total
+                    # The number of items has changed between start and now,
+                    # since there is no good way to predict the final count from
+                    # here, just change the progress bar to an indeterminate one
+
+                    progress.total = nil
+                  end
+
+                  grouped_records = nil
+                  bulk_body       = nil
+                  index_count     = 0
+                  delete_count    = 0
+
+                  ActiveRecord::Base.connection_pool.with_connection do
+                    grouped_records = type.adapter.send(:grouped_objects, records)
+                    bulk_body       = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body
+                  end
 
-    private
+                  index_count  = grouped_records[:index].size  if grouped_records.key?(:index)
+                  delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)
 
-    def processes
-      return true if options[:processes] == 'auto'
+                  # The following is an optimization for statuses specifically, since
+                  # we want to de-index statuses that cannot be searched by anybody,
+                  # but can't use Chewy's delete_if logic because it doesn't use
+                  # crutches and our searchable_by logic depends on them
+                  if type == StatusesIndex::Status
+                    bulk_body.map! do |entry|
+                      if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
+                        index_count  -= 1
+                        delete_count += 1
 
-      num = options[:processes].to_i
+                        { delete: entry[:index].except(:data) }
+                      else
+                        entry
+                      end
+                    end
+                  end
 
-      if num < 2
-        nil
-      else
-        num
+                  Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)
+
+                  progress.progress += records.size
+
+                  added.increment(index_count)
+                  removed.increment(delete_count)
+
+                  sleep 1
+                rescue => e
+                  progress.log pastel.red("Error importing #{index}: #{e}")
+                end
+              end
+            end
+
+            futures.map(&:value)
+          end
+        end
       end
+
+      progress.title = ''
+      progress.stop
+
+      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
     end
   end
 end