about summary refs log tree commit diff
diff options
authorEugen Rochko <eugen@zeonfederated.com>2022-05-22 22:16:43 +0200
committerGitHub <noreply@github.com>2022-05-22 22:16:43 +0200
commita9b64b24d6c076cb96a66307c07d4f0158dc07da (patch)
parent54bb659ad14fda8d3427752d2c99716420997d6e (diff)
Change algorithm of `tootctl search deploy` to improve performance (#18463)
9 files changed, 294 insertions, 103 deletions
diff --git a/app/chewy/accounts_index.rb b/app/chewy/accounts_index.rb
index 763958a3f..e38e14a10 100644
--- a/app/chewy/accounts_index.rb
+++ b/app/chewy/accounts_index.rb
@@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index
-  index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? }
+  index_scope ::Account.searchable.includes(:account_stat)
   root date_detection: false do
     field :id, type: 'long'
@@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index
       field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content'
-    field :following_count, type: 'long', value: ->(account) { account.following.local.count }
-    field :followers_count, type: 'long', value: ->(account) { account.followers.local.count }
+    field :following_count, type: 'long', value: ->(account) { account.following_count }
+    field :followers_count, type: 'long', value: ->(account) { account.followers_count }
     field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at }
diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb
index c20009879..6dd4fb18b 100644
--- a/app/chewy/statuses_index.rb
+++ b/app/chewy/statuses_index.rb
@@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index
+  # We do not use delete_if option here because it would call a method that we
+  # expect to be called with crutches without crutches, causing n+1 queries
   index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll)
   crutch :mentions do |collection|
diff --git a/app/chewy/tags_index.rb b/app/chewy/tags_index.rb
index a5b139bca..df3d9e4cc 100644
--- a/app/chewy/tags_index.rb
+++ b/app/chewy/tags_index.rb
@@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index
-  index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? }
+  index_scope ::Tag.listable
+  crutch :time_period do
+    7.days.ago.to_date..0.days.ago.to_date
+  end
   root date_detection: false do
     field :name, type: 'text', analyzer: 'content' do
@@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index
     field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? }
-    field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } }
+    field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts }
     field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at }
diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb
new file mode 100644
index 000000000..792a31b1b
--- /dev/null
+++ b/app/lib/importer/accounts_index_importer.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+class Importer::AccountsIndexImporter < Importer::BaseImporter
+  def import!
+    scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp|
+      in_work_unit(tmp) do |accounts|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body
+        indexed = bulk.select { |entry| entry[:index] }.size
+        deleted = bulk.select { |entry| entry[:delete] }.size
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+        [indexed, deleted]
+      end
+    end
+    wait!
+  end
+  private
+  def index
+    AccountsIndex
+  end
+  def scope
+    Account.searchable
+  end
diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb
new file mode 100644
index 000000000..ea522c600
--- /dev/null
+++ b/app/lib/importer/base_importer.rb
@@ -0,0 +1,87 @@
+# frozen_string_literal: true
+class Importer::BaseImporter
+  # @param [Integer] batch_size
+  # @param [Concurrent::ThreadPoolExecutor] executor
+  def initialize(batch_size:, executor:)
+    @batch_size = batch_size
+    @executor   = executor
+    @wait_for   = Concurrent::Set.new
+  end
+  # Callback to run when a concurrent work unit completes
+  # @param [Proc]
+  def on_progress(&block)
+    @on_progress = block
+  end
+  # Callback to run when a concurrent work unit fails
+  # @param [Proc]
+  def on_failure(&block)
+    @on_failure = block
+  end
+  # Reduce resource usage during and improve speed of indexing
+  def optimize_for_import!
+    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } }
+  end
+  # Restore original index settings
+  def optimize_for_search!
+    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } }
+  end
+  # Estimate the amount of documents that would be indexed. Not exact!
+  # @returns [Integer]
+  def estimate!
+    ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i }
+  end
+  # Import data from the database into the index
+  def import!
+    raise NotImplementedError
+  end
+  # Remove documents from the index that no longer exist in the database
+  def clean_up!
+    index.scroll_batches do |documents|
+      ids           = documents.map { |doc| doc['_id'] }
+      existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true }
+      tmp           = ids.reject { |id| existence_map[id] }
+      next if tmp.empty?
+      in_work_unit(tmp) do |deleted_ids|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+        [0, bulk.size]
+      end
+    end
+    wait!
+  end
+  protected
+  def in_work_unit(*args, &block)
+    work_unit = Concurrent::Promises.future_on(@executor, *args, &block)
+    work_unit.on_fulfillment!(&@on_progress)
+    work_unit.on_rejection!(&@on_failure)
+    work_unit.on_resolution! { @wait_for.delete(work_unit) }
+    @wait_for << work_unit
+  rescue Concurrent::RejectedExecutionError
+    sleep(0.1) && retry # Backpressure
+  end
+  def wait!
+    Concurrent::Promises.zip(*@wait_for).wait
+  end
+  def index
+    raise NotImplementedError
+  end
diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb
new file mode 100644
index 000000000..7c6532560
--- /dev/null
+++ b/app/lib/importer/statuses_index_importer.rb
@@ -0,0 +1,89 @@
+# frozen_string_literal: true
+class Importer::StatusesIndexImporter < Importer::BaseImporter
+  def import!
+    # The idea is that instead of iterating over all statuses in the database
+    # and calculating the searchable_by for each of them (majority of which
+    # would be empty), we approach the index from the other end
+    scopes.each do |scope|
+      # We could be tempted to keep track of status IDs we have already processed
+      # from a different scope to avoid indexing them multiple times, but that
+      # could end up being a very large array
+      scope.find_in_batches(batch_size: @batch_size) do |tmp|
+        in_work_unit(tmp.map(&:status_id)) do |status_ids|
+          bulk = ActiveRecord::Base.connection_pool.with_connection do
+            Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body
+          end
+          indexed = 0
+          deleted = 0
+          # We can't use the delete_if proc to do the filtering because delete_if
+          # is called before rendering the data and we need to filter based
+          # on the results of the filter, so this filtering happens here instead
+          bulk.map! do |entry|
+            new_entry = begin
+              if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
+                { delete: entry[:index].except(:data) }
+              else
+                entry
+              end
+            end
+            if new_entry[:index]
+              indexed += 1
+            else
+              deleted += 1
+            end
+            new_entry
+          end
+          Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+          [indexed, deleted]
+        end
+      end
+    end
+    wait!
+  end
+  private
+  def index
+    StatusesIndex
+  end
+  def scopes
+    [
+      local_statuses_scope,
+      local_mentions_scope,
+      local_favourites_scope,
+      local_votes_scope,
+      local_bookmarks_scope,
+    ]
+  end
+  def local_mentions_scope
+    Mention.where(account: Account.local, silent: false).select(:id, :status_id)
+  end
+  def local_favourites_scope
+    Favourite.where(account: Account.local).select(:id, :status_id)
+  end
+  def local_bookmarks_scope
+    Bookmark.select(:id, :status_id)
+  end
+  def local_votes_scope
+    Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id')
+  end
+  def local_statuses_scope
+    Status.local.select('id, coalesce(reblog_of_id, id) as status_id')
+  end
diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb
new file mode 100644
index 000000000..f5bd8f052
--- /dev/null
+++ b/app/lib/importer/tags_index_importer.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+class Importer::TagsIndexImporter < Importer::BaseImporter
+  def import!
+    index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
+      in_work_unit(tmp) do |tags|
+        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body
+        indexed = bulk.select { |entry| entry[:index] }.size
+        deleted = bulk.select { |entry| entry[:delete] }.size
+        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
+        [indexed, deleted]
+      end
+    end
+    wait!
+  end
+  private
+  def index
+    TagsIndex
+  end
diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb
index 608e33792..74723e35c 100644
--- a/app/models/trends/history.rb
+++ b/app/models/trends/history.rb
@@ -11,11 +11,11 @@ class Trends::History
     def uses
-      redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum
+      with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum }
     def accounts
-      redis.pfcount(*@days.map { |day| day.key_for(:accounts) })
+      with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) }
@@ -33,19 +33,21 @@ class Trends::History
     attr_reader :day
     def accounts
-      redis.pfcount(key_for(:accounts))
+      with_redis { |redis| redis.pfcount(key_for(:accounts)) }
     def uses
-      redis.get(key_for(:uses))&.to_i || 0
+      with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 }
     def add(account_id)
-      redis.pipelined do
-        redis.incrby(key_for(:uses), 1)
-        redis.pfadd(key_for(:accounts), account_id)
-        redis.expire(key_for(:uses), EXPIRE_AFTER)
-        redis.expire(key_for(:accounts), EXPIRE_AFTER)
+      with_redis do |redis|
+        redis.pipelined do |pipeline|
+          pipeline.incrby(key_for(:uses), 1)
+          pipeline.pfadd(key_for(:accounts), account_id)
+          pipeline.expire(key_for(:uses), EXPIRE_AFTER)
+          pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
+        end
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 74f980ba1..b579ebc14 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -16,19 +16,21 @@ module Mastodon
-    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
-    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
+    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
     option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
+    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
     desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
     long_desc <<~LONG_DESC
       If Elasticsearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
       This command will also upgrade indices if the underlying schema has been
-      changed since the last run.
+      changed since the last run. Index upgrades erase index data.
       Even if creating or upgrading indices is not necessary, data from the
-      database will be imported into the indices.
+      database will be imported into the indices, unless overriden with --no-import.
     def deploy
       if options[:concurrency] < 1
@@ -49,7 +51,9 @@ module Mastodon
-      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
+      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
+      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
       # First, ensure all indices are created and have the correct
       # structure, so that live data can already be written
@@ -59,99 +63,46 @@ module Mastodon
+      progress.title = 'Estimating workload '
+      progress.total = indices.sum { |index| importers[index].estimate! }
-      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
-      added   = Concurrent::AtomicFixnum.new(0)
-      removed = Concurrent::AtomicFixnum.new(0)
+      added   = 0
+      removed = 0
-      progress.title = 'Estimating workload '
+      indices.each do |index|
+        importer = importers[index]
+        importer.optimize_for_import!
+        importer.on_progress do |(indexed, deleted)|
+          progress.total = nil if progress.progress + indexed + deleted > progress.total
+          progress.progress += indexed + deleted
+          added   += indexed
+          removed += deleted
+        end
-      # Estimate the amount of data that has to be imported first
-      progress.total = indices.sum { |index| index.adapter.default_scope.count }
+        importer.on_failure do |reason|
+          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
+        end
-      # Now import all the actual data. Mind that unlike chewy:sync, we don't
-      # fetch and compare all record IDs from the database and the index to
-      # find out which to add and which to remove from the index. Because with
-      # potentially millions of rows, the memory footprint of such a calculation
-      # is uneconomical. So we only ever add.
-      indices.each do |index|
-        progress.title = "Importing #{index} "
-        batch_size     = options[:batch_size]
-        slice_size     = (batch_size / options[:concurrency]).ceil
-        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-          futures = []
-          batch.each_slice(slice_size) do |records|
-            futures << Concurrent::Future.execute(executor: pool) do
-              begin
-                if !progress.total.nil? && progress.progress + records.size > progress.total
-                  # The number of items has changed between start and now,
-                  # since there is no good way to predict the final count from
-                  # here, just change the progress bar to an indeterminate one
-                  progress.total = nil
-                end
-                grouped_records = nil
-                bulk_body       = nil
-                index_count     = 0
-                delete_count    = 0
-                ActiveRecord::Base.connection_pool.with_connection do
-                  grouped_records = records.to_a.group_by do |record|
-                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
-                  end
-                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
-                end
-                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
-                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
-                # The following is an optimization for statuses specifically, since
-                # we want to de-index statuses that cannot be searched by anybody,
-                # but can't use Chewy's delete_if logic because it doesn't use
-                # crutches and our searchable_by logic depends on them
-                if index == StatusesIndex
-                  bulk_body.map! do |entry|
-                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
-                      index_count  -= 1
-                      delete_count += 1
-                      { delete: entry[:to_index].except(:data) }
-                    else
-                      entry
-                    end
-                  end
-                end
-                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
-                progress.progress += records.size
-                added.increment(index_count)
-                removed.increment(delete_count)
-                sleep 1
-              rescue => e
-                progress.log pastel.red("Error importing #{index}: #{e}")
-              ensure
-                RedisConfiguration.pool.checkin if Thread.current[:redis]
-                Thread.current[:redis] = nil
-              end
-            end
-          end
-          futures.map(&:value)
+        if options[:import]
+          progress.title = "Importing #{index} "
+          importer.import!
+        end
+        if options[:clean]
+          progress.title = "Cleaning #{index} "
+          importer.clean_up!
+      ensure
+        importer.optimize_for_search!
-      progress.title = ''
-      progress.stop
+      progress.title = 'Done! '
+      progress.finish
-      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
+      say("Indexed #{added} records, de-indexed #{removed}", :green, true)