diff options
author | Eugen Rochko <eugen@zeonfederated.com> | 2022-05-22 22:16:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-22 22:16:43 +0200 |
commit | a9b64b24d6c076cb96a66307c07d4f0158dc07da (patch) | |
tree | 8d41ef354040e5825c99bb3f656606dcc8d61291 /app | |
parent | 54bb659ad14fda8d3427752d2c99716420997d6e (diff) |
Change algorithm of `tootctl search deploy` to improve performance (#18463)
Diffstat (limited to 'app')
-rw-r--r-- | app/chewy/accounts_index.rb | 6 | ||||
-rw-r--r-- | app/chewy/statuses_index.rb | 2 | ||||
-rw-r--r-- | app/chewy/tags_index.rb | 8 | ||||
-rw-r--r-- | app/lib/importer/accounts_index_importer.rb | 30 | ||||
-rw-r--r-- | app/lib/importer/base_importer.rb | 87 | ||||
-rw-r--r-- | app/lib/importer/statuses_index_importer.rb | 89 | ||||
-rw-r--r-- | app/lib/importer/tags_index_importer.rb | 26 | ||||
-rw-r--r-- | app/models/trends/history.rb | 20 |
8 files changed, 254 insertions, 14 deletions
diff --git a/app/chewy/accounts_index.rb b/app/chewy/accounts_index.rb index 763958a3f..e38e14a10 100644 --- a/app/chewy/accounts_index.rb +++ b/app/chewy/accounts_index.rb @@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index }, } - index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? } + index_scope ::Account.searchable.includes(:account_stat) root date_detection: false do field :id, type: 'long' @@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' end - field :following_count, type: 'long', value: ->(account) { account.following.local.count } - field :followers_count, type: 'long', value: ->(account) { account.followers.local.count } + field :following_count, type: 'long', value: ->(account) { account.following_count } + field :followers_count, type: 'long', value: ->(account) { account.followers_count } field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at } end end diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb index c20009879..6dd4fb18b 100644 --- a/app/chewy/statuses_index.rb +++ b/app/chewy/statuses_index.rb @@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index }, } + # We do not use delete_if option here because it would call a method that we + # expect to be called with crutches without crutches, causing n+1 queries index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) crutch :mentions do |collection| diff --git a/app/chewy/tags_index.rb b/app/chewy/tags_index.rb index a5b139bca..df3d9e4cc 100644 --- a/app/chewy/tags_index.rb +++ b/app/chewy/tags_index.rb @@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index }, } - index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? } + index_scope ::Tag.listable + + crutch :time_period do + 7.days.ago.to_date..0.days.ago.to_date + end root date_detection: false do field :name, type: 'text', analyzer: 'content' do @@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index end field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? } - field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } } + field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts } field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at } end end diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb new file mode 100644 index 000000000..792a31b1b --- /dev/null +++ b/app/lib/importer/accounts_index_importer.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class Importer::AccountsIndexImporter < Importer::BaseImporter + def import! + scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |accounts| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + AccountsIndex + end + + def scope + Account.searchable + end +end diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb new file mode 100644 index 000000000..ea522c600 --- /dev/null +++ b/app/lib/importer/base_importer.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +class Importer::BaseImporter + # @param [Integer] batch_size + # @param [Concurrent::ThreadPoolExecutor] executor + def initialize(batch_size:, executor:) + @batch_size = batch_size + @executor = executor + @wait_for = Concurrent::Set.new + end + + # Callback to run when a concurrent work unit completes + # @param [Proc] + def on_progress(&block) + @on_progress = block + end + + # Callback to run when a concurrent work unit fails + # @param [Proc] + def on_failure(&block) + @on_failure = block + end + + # Reduce resource usage during and improve speed of indexing + def optimize_for_import! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } } + end + + # Restore original index settings + def optimize_for_search! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } } + end + + # Estimate the amount of documents that would be indexed. Not exact! + # @returns [Integer] + def estimate! + ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i } + end + + # Import data from the database into the index + def import! + raise NotImplementedError + end + + # Remove documents from the index that no longer exist in the database + def clean_up! + index.scroll_batches do |documents| + ids = documents.map { |doc| doc['_id'] } + existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true } + tmp = ids.reject { |id| existence_map[id] } + + next if tmp.empty? + + in_work_unit(tmp) do |deleted_ids| + bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [0, bulk.size] + end + end + + wait! + end + + protected + + def in_work_unit(*args, &block) + work_unit = Concurrent::Promises.future_on(@executor, *args, &block) + + work_unit.on_fulfillment!(&@on_progress) + work_unit.on_rejection!(&@on_failure) + work_unit.on_resolution! { @wait_for.delete(work_unit) } + + @wait_for << work_unit + rescue Concurrent::RejectedExecutionError + sleep(0.1) && retry # Backpressure + end + + def wait! + Concurrent::Promises.zip(*@wait_for).wait + end + + def index + raise NotImplementedError + end +end diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb new file mode 100644 index 000000000..7c6532560 --- /dev/null +++ b/app/lib/importer/statuses_index_importer.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +class Importer::StatusesIndexImporter < Importer::BaseImporter + def import! + # The idea is that instead of iterating over all statuses in the database + # and calculating the searchable_by for each of them (majority of which + # would be empty), we approach the index from the other end + + scopes.each do |scope| + # We could be tempted to keep track of status IDs we have already processed + # from a different scope to avoid indexing them multiple times, but that + # could end up being a very large array + + scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp.map(&:status_id)) do |status_ids| + bulk = ActiveRecord::Base.connection_pool.with_connection do + Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body + end + + indexed = 0 + deleted = 0 + + # We can't use the delete_if proc to do the filtering because delete_if + # is called before rendering the data and we need to filter based + # on the results of the filter, so this filtering happens here instead + bulk.map! do |entry| + new_entry = begin + if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? + { delete: entry[:index].except(:data) } + else + entry + end + end + + if new_entry[:index] + indexed += 1 + else + deleted += 1 + end + + new_entry + end + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + end + + wait! + end + + private + + def index + StatusesIndex + end + + def scopes + [ + local_statuses_scope, + local_mentions_scope, + local_favourites_scope, + local_votes_scope, + local_bookmarks_scope, + ] + end + + def local_mentions_scope + Mention.where(account: Account.local, silent: false).select(:id, :status_id) + end + + def local_favourites_scope + Favourite.where(account: Account.local).select(:id, :status_id) + end + + def local_bookmarks_scope + Bookmark.select(:id, :status_id) + end + + def local_votes_scope + Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id') + end + + def local_statuses_scope + Status.local.select('id, coalesce(reblog_of_id, id) as status_id') + end +end diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb new file mode 100644 index 000000000..f5bd8f052 --- /dev/null +++ b/app/lib/importer/tags_index_importer.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class Importer::TagsIndexImporter < Importer::BaseImporter + def import! + index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |tags| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + TagsIndex + end +end diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb index 608e33792..74723e35c 100644 --- a/app/models/trends/history.rb +++ b/app/models/trends/history.rb @@ -11,11 +11,11 @@ class Trends::History end def uses - redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum + with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum } end def accounts - redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) + with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) } end end @@ -33,19 +33,21 @@ class Trends::History attr_reader :day def accounts - redis.pfcount(key_for(:accounts)) + with_redis { |redis| redis.pfcount(key_for(:accounts)) } end def uses - redis.get(key_for(:uses))&.to_i || 0 + with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 } end def add(account_id) - redis.pipelined do - redis.incrby(key_for(:uses), 1) - redis.pfadd(key_for(:accounts), account_id) - redis.expire(key_for(:uses), EXPIRE_AFTER) - redis.expire(key_for(:accounts), EXPIRE_AFTER) + with_redis do |redis| + redis.pipelined do |pipeline| + pipeline.incrby(key_for(:uses), 1) + pipeline.pfadd(key_for(:accounts), account_id) + pipeline.expire(key_for(:uses), EXPIRE_AFTER) + pipeline.expire(key_for(:accounts), EXPIRE_AFTER) + end end end |