diff options
Diffstat (limited to 'app/lib/importer')
-rw-r--r-- | app/lib/importer/accounts_index_importer.rb | 30 | ||||
-rw-r--r-- | app/lib/importer/base_importer.rb | 87 | ||||
-rw-r--r-- | app/lib/importer/statuses_index_importer.rb | 89 | ||||
-rw-r--r-- | app/lib/importer/tags_index_importer.rb | 26 |
4 files changed, 232 insertions, 0 deletions
diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb new file mode 100644 index 000000000..792a31b1b --- /dev/null +++ b/app/lib/importer/accounts_index_importer.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class Importer::AccountsIndexImporter < Importer::BaseImporter + def import! + scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |accounts| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + AccountsIndex + end + + def scope + Account.searchable + end +end diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb new file mode 100644 index 000000000..ea522c600 --- /dev/null +++ b/app/lib/importer/base_importer.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +class Importer::BaseImporter + # @param [Integer] batch_size + # @param [Concurrent::ThreadPoolExecutor] executor + def initialize(batch_size:, executor:) + @batch_size = batch_size + @executor = executor + @wait_for = Concurrent::Set.new + end + + # Callback to run when a concurrent work unit completes + # @param [Proc] + def on_progress(&block) + @on_progress = block + end + + # Callback to run when a concurrent work unit fails + # @param [Proc] + def on_failure(&block) + @on_failure = block + end + + # Reduce resource usage during and improve speed of indexing + def optimize_for_import! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } } + end + + # Restore original index settings + def optimize_for_search! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } } + end + + # Estimate the amount of documents that would be indexed. Not exact! + # @returns [Integer] + def estimate! + ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i } + end + + # Import data from the database into the index + def import! + raise NotImplementedError + end + + # Remove documents from the index that no longer exist in the database + def clean_up! + index.scroll_batches do |documents| + ids = documents.map { |doc| doc['_id'] } + existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true } + tmp = ids.reject { |id| existence_map[id] } + + next if tmp.empty? + + in_work_unit(tmp) do |deleted_ids| + bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [0, bulk.size] + end + end + + wait! + end + + protected + + def in_work_unit(*args, &block) + work_unit = Concurrent::Promises.future_on(@executor, *args, &block) + + work_unit.on_fulfillment!(&@on_progress) + work_unit.on_rejection!(&@on_failure) + work_unit.on_resolution! { @wait_for.delete(work_unit) } + + @wait_for << work_unit + rescue Concurrent::RejectedExecutionError + sleep(0.1) && retry # Backpressure + end + + def wait! + Concurrent::Promises.zip(*@wait_for).wait + end + + def index + raise NotImplementedError + end +end diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb new file mode 100644 index 000000000..7c6532560 --- /dev/null +++ b/app/lib/importer/statuses_index_importer.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +class Importer::StatusesIndexImporter < Importer::BaseImporter + def import! + # The idea is that instead of iterating over all statuses in the database + # and calculating the searchable_by for each of them (majority of which + # would be empty), we approach the index from the other end + + scopes.each do |scope| + # We could be tempted to keep track of status IDs we have already processed + # from a different scope to avoid indexing them multiple times, but that + # could end up being a very large array + + scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp.map(&:status_id)) do |status_ids| + bulk = ActiveRecord::Base.connection_pool.with_connection do + Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body + end + + indexed = 0 + deleted = 0 + + # We can't use the delete_if proc to do the filtering because delete_if + # is called before rendering the data and we need to filter based + # on the results of the filter, so this filtering happens here instead + bulk.map! do |entry| + new_entry = begin + if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? + { delete: entry[:index].except(:data) } + else + entry + end + end + + if new_entry[:index] + indexed += 1 + else + deleted += 1 + end + + new_entry + end + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + end + + wait! + end + + private + + def index + StatusesIndex + end + + def scopes + [ + local_statuses_scope, + local_mentions_scope, + local_favourites_scope, + local_votes_scope, + local_bookmarks_scope, + ] + end + + def local_mentions_scope + Mention.where(account: Account.local, silent: false).select(:id, :status_id) + end + + def local_favourites_scope + Favourite.where(account: Account.local).select(:id, :status_id) + end + + def local_bookmarks_scope + Bookmark.select(:id, :status_id) + end + + def local_votes_scope + Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id') + end + + def local_statuses_scope + Status.local.select('id, coalesce(reblog_of_id, id) as status_id') + end +end diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb new file mode 100644 index 000000000..f5bd8f052 --- /dev/null +++ b/app/lib/importer/tags_index_importer.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class Importer::TagsIndexImporter < Importer::BaseImporter + def import! + index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| + in_work_unit(tmp) do |tags| + bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body + + indexed = bulk.select { |entry| entry[:index] }.size + deleted = bulk.select { |entry| entry[:delete] }.size + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [indexed, deleted] + end + end + + wait! + end + + private + + def index + TagsIndex + end +end |