diff options
-rw-r--r-- | app/chewy/accounts_index.rb | 6 | ||||
-rw-r--r-- | app/chewy/statuses_index.rb | 2 | ||||
-rw-r--r-- | app/chewy/tags_index.rb | 8 | ||||
-rw-r--r-- | app/lib/importer/accounts_index_importer.rb | 30 | ||||
-rw-r--r-- | app/lib/importer/base_importer.rb | 87 | ||||
-rw-r--r-- | app/lib/importer/statuses_index_importer.rb | 89 | ||||
-rw-r--r-- | app/lib/importer/tags_index_importer.rb | 26 | ||||
-rw-r--r-- | app/models/trends/history.rb | 20 | ||||
-rw-r--r-- | lib/mastodon/search_cli.rb | 129 |
9 files changed, 294 insertions, 103 deletions
diff --git a/app/chewy/accounts_index.rb b/app/chewy/accounts_index.rb index 763958a3f..e38e14a10 100644 --- a/app/chewy/accounts_index.rb +++ b/app/chewy/accounts_index.rb @@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index }, } - index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? } + index_scope ::Account.searchable.includes(:account_stat) root date_detection: false do field :id, type: 'long' @@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' end - field :following_count, type: 'long', value: ->(account) { account.following.local.count } - field :followers_count, type: 'long', value: ->(account) { account.followers.local.count } + field :following_count, type: 'long', value: ->(account) { account.following_count } + field :followers_count, type: 'long', value: ->(account) { account.followers_count } field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at } end end diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb index c20009879..6dd4fb18b 100644 --- a/app/chewy/statuses_index.rb +++ b/app/chewy/statuses_index.rb @@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index }, } + # We do not use delete_if option here because it would call a method that we + # expect to be called with crutches without crutches, causing n+1 queries index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) crutch :mentions do |collection| diff --git a/app/chewy/tags_index.rb b/app/chewy/tags_index.rb index a5b139bca..df3d9e4cc 100644 --- a/app/chewy/tags_index.rb +++ b/app/chewy/tags_index.rb @@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index }, } - index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? } + index_scope ::Tag.listable + + crutch :time_period do + 7.days.ago.to_date..0.days.ago.to_date + end root date_detection: false do field :name, type: 'text', analyzer: 'content' do @@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index end field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? } - field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } } + field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts } field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at } end end diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb new file mode 100644 index 000000000..792a31b1b --- /dev/null +++ b/app/lib/importer/accounts_index_importer.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class Importer::AccountsIndexImporter < Importer::BaseImporter + def import! + scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |accounts| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + AccountsIndex + end + + def scope + Account.searchable + end +end diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb new file mode 100644 index 000000000..ea522c600 --- /dev/null +++ b/app/lib/importer/base_importer.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +class Importer::BaseImporter + # @param [Integer] batch_size + # @param [Concurrent::ThreadPoolExecutor] executor + def initialize(batch_size:, executor:) + @batch_size = batch_size + @executor = executor + @wait_for = Concurrent::Set.new + end + + # Callback to run when a concurrent work unit completes + # @param [Proc] + def on_progress(&block) + @on_progress = block + end + + # Callback to run when a concurrent work unit fails + # @param [Proc] + def on_failure(&block) + @on_failure = block + end + + # Reduce resource usage during and improve speed of indexing + def optimize_for_import! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } } + end + + # Restore original index settings + def optimize_for_search! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } } + end + + # Estimate the amount of documents that would be indexed. Not exact! + # @returns [Integer] + def estimate! + ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i } + end + + # Import data from the database into the index + def import! + raise NotImplementedError + end + + # Remove documents from the index that no longer exist in the database + def clean_up! + index.scroll_batches do |documents| + ids = documents.map { |doc| doc['_id'] } + existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true } + tmp = ids.reject { |id| existence_map[id] } + + next if tmp.empty? + + in_work_unit(tmp) do |deleted_ids| + bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [0, bulk.size] + end + end + + wait! + end + + protected + + def in_work_unit(*args, &block) + work_unit = Concurrent::Promises.future_on(@executor, *args, &block) + + work_unit.on_fulfillment!(&@on_progress) + work_unit.on_rejection!(&@on_failure) + work_unit.on_resolution! { @wait_for.delete(work_unit) } + + @wait_for << work_unit + rescue Concurrent::RejectedExecutionError + sleep(0.1) && retry # Backpressure + end + + def wait! + Concurrent::Promises.zip(*@wait_for).wait + end + + def index + raise NotImplementedError + end +end diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb new file mode 100644 index 000000000..7c6532560 --- /dev/null +++ b/app/lib/importer/statuses_index_importer.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +class Importer::StatusesIndexImporter < Importer::BaseImporter + def import! + # The idea is that instead of iterating over all statuses in the database + # and calculating the searchable_by for each of them (majority of which + # would be empty), we approach the index from the other end + + scopes.each do |scope| + # We could be tempted to keep track of status IDs we have already processed + # from a different scope to avoid indexing them multiple times, but that + # could end up being a very large array + + scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp.map(&:status_id)) do |status_ids| + bulk = ActiveRecord::Base.connection_pool.with_connection do + Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body + end + + indexed = 0 + deleted = 0 + + # We can't use the delete_if proc to do the filtering because delete_if + # is called before rendering the data and we need to filter based + # on the results of the filter, so this filtering happens here instead + bulk.map! do |entry| + new_entry = begin + if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? + { delete: entry[:index].except(:data) } + else + entry + end + end + + if new_entry[:index] + indexed += 1 + else + deleted += 1 + end + + new_entry + end + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + end + + wait! + end + + private + + def index + StatusesIndex + end + + def scopes + [ + local_statuses_scope, + local_mentions_scope, + local_favourites_scope, + local_votes_scope, + local_bookmarks_scope, + ] + end + + def local_mentions_scope + Mention.where(account: Account.local, silent: false).select(:id, :status_id) + end + + def local_favourites_scope + Favourite.where(account: Account.local).select(:id, :status_id) + end + + def local_bookmarks_scope + Bookmark.select(:id, :status_id) + end + + def local_votes_scope + Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id') + end + + def local_statuses_scope + Status.local.select('id, coalesce(reblog_of_id, id) as status_id') + end +end diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb new file mode 100644 index 000000000..f5bd8f052 --- /dev/null +++ b/app/lib/importer/tags_index_importer.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class Importer::TagsIndexImporter < Importer::BaseImporter + def import! + index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |tags| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + TagsIndex + end +end diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb index 608e33792..74723e35c 100644 --- a/app/models/trends/history.rb +++ b/app/models/trends/history.rb @@ -11,11 +11,11 @@ class Trends::History end def uses - redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum + with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum } end def accounts - redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) + with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) } end end @@ -33,19 +33,21 @@ class Trends::History attr_reader :day def accounts - redis.pfcount(key_for(:accounts)) + with_redis { |redis| redis.pfcount(key_for(:accounts)) } end def uses - redis.get(key_for(:uses))&.to_i || 0 + with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 } end def add(account_id) - redis.pipelined do - redis.incrby(key_for(:uses), 1) - redis.pfadd(key_for(:accounts), account_id) - redis.expire(key_for(:uses), EXPIRE_AFTER) - redis.expire(key_for(:accounts), EXPIRE_AFTER) + with_redis do |redis| + redis.pipelined do |pipeline| + pipeline.incrby(key_for(:uses), 1) + pipeline.pfadd(key_for(:accounts), account_id) + pipeline.expire(key_for(:uses), EXPIRE_AFTER) + pipeline.expire(key_for(:accounts), EXPIRE_AFTER) + end end end diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb index 74f980ba1..b579ebc14 100644 --- a/lib/mastodon/search_cli.rb +++ b/lib/mastodon/search_cli.rb @@ -16,19 +16,21 @@ module Mastodon StatusesIndex, ].freeze - option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' - option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch' + option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads' + option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch' option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' + option :import, type: :boolean, default: true, desc: 'Import data from the database to the index' + option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index' desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them' long_desc <<~LONG_DESC If Elasticsearch is empty, this command will create the necessary indices and then import data from the database into those indices. This command will also upgrade indices if the underlying schema has been - changed since the last run. + changed since the last run. Index upgrades erase index data. Even if creating or upgrading indices is not necessary, data from the - database will be imported into the indices. + database will be imported into the indices, unless overriden with --no-import. LONG_DESC def deploy if options[:concurrency] < 1 @@ -49,7 +51,9 @@ module Mastodon end end - progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) + pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10) + importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) } + progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) # First, ensure all indices are created and have the correct # structure, so that live data can already be written @@ -59,99 +63,46 @@ module Mastodon index.specification.lock! end + progress.title = 'Estimating workload ' + progress.total = indices.sum { |index| importers[index].estimate! } + reset_connection_pools! - pool = Concurrent::FixedThreadPool.new(options[:concurrency]) - added = Concurrent::AtomicFixnum.new(0) - removed = Concurrent::AtomicFixnum.new(0) + added = 0 + removed = 0 - progress.title = 'Estimating workload ' + indices.each do |index| + importer = importers[index] + importer.optimize_for_import! + + importer.on_progress do |(indexed, deleted)| + progress.total = nil if progress.progress + indexed + deleted > progress.total + progress.progress += indexed + deleted + added += indexed + removed += deleted + end - # Estimate the amount of data that has to be imported first - progress.total = indices.sum { |index| index.adapter.default_scope.count } + importer.on_failure do |reason| + progress.log(pastel.red("Error while importing #{index}: #{reason}")) + end - # Now import all the actual data. Mind that unlike chewy:sync, we don't - # fetch and compare all record IDs from the database and the index to - # find out which to add and which to remove from the index. Because with - # potentially millions of rows, the memory footprint of such a calculation - # is uneconomical. So we only ever add. - indices.each do |index| - progress.title = "Importing #{index} " - batch_size = options[:batch_size] - slice_size = (batch_size / options[:concurrency]).ceil - - index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| - futures = [] - - batch.each_slice(slice_size) do |records| - futures << Concurrent::Future.execute(executor: pool) do - begin - if !progress.total.nil? && progress.progress + records.size > progress.total - # The number of items has changed between start and now, - # since there is no good way to predict the final count from - # here, just change the progress bar to an indeterminate one - - progress.total = nil - end - - grouped_records = nil - bulk_body = nil - index_count = 0 - delete_count = 0 - - ActiveRecord::Base.connection_pool.with_connection do - grouped_records = records.to_a.group_by do |record| - index.adapter.send(:delete_from_index?, record) ? :delete : :to_index - end - - bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body - end - - index_count = grouped_records[:to_index].size if grouped_records.key?(:to_index) - delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) - - # The following is an optimization for statuses specifically, since - # we want to de-index statuses that cannot be searched by anybody, - # but can't use Chewy's delete_if logic because it doesn't use - # crutches and our searchable_by logic depends on them - if index == StatusesIndex - bulk_body.map! do |entry| - if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank? - index_count -= 1 - delete_count += 1 - - { delete: entry[:to_index].except(:data) } - else - entry - end - end - end - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body) - - progress.progress += records.size - - added.increment(index_count) - removed.increment(delete_count) - - sleep 1 - rescue => e - progress.log pastel.red("Error importing #{index}: #{e}") - ensure - RedisConfiguration.pool.checkin if Thread.current[:redis] - Thread.current[:redis] = nil - end - end - end - - futures.map(&:value) + if options[:import] + progress.title = "Importing #{index} " + importer.import! + end + + if options[:clean] + progress.title = "Cleaning #{index} " + importer.clean_up! end + ensure + importer.optimize_for_search! end - progress.title = '' - progress.stop + progress.title = 'Done! ' + progress.finish - say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) + say("Indexed #{added} records, de-indexed #{removed}", :green, true) end end end |