diff options
author | Claire <claire.github-309c@sitedethib.com> | 2022-05-24 14:49:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-24 14:49:13 +0200 |
commit | 1beb8db4e26d1bd4a801b432f466d948b3b562fc (patch) | |
tree | 39de9e2163c3737f182d67ba8718cde32d6a0296 /app/lib/importer/base_importer.rb | |
parent | c279dbd47082e908d98dc8cf869c0ff7fc19f1ae (diff) | |
parent | 228ec75048a5527237720cdbaa8d6ab086ea6c84 (diff) |
Merge pull request #1779 from ClearlyClaire/glitch-soc/merge-upstream
Merge upstream changes
Diffstat (limited to 'app/lib/importer/base_importer.rb')
-rw-r--r-- | app/lib/importer/base_importer.rb | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb new file mode 100644 index 000000000..ea522c600 --- /dev/null +++ b/app/lib/importer/base_importer.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +class Importer::BaseImporter + # @param [Integer] batch_size + # @param [Concurrent::ThreadPoolExecutor] executor + def initialize(batch_size:, executor:) + @batch_size = batch_size + @executor = executor + @wait_for = Concurrent::Set.new + end + + # Callback to run when a concurrent work unit completes + # @param [Proc] + def on_progress(&block) + @on_progress = block + end + + # Callback to run when a concurrent work unit fails + # @param [Proc] + def on_failure(&block) + @on_failure = block + end + + # Reduce resource usage during and improve speed of indexing + def optimize_for_import! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } } + end + + # Restore original index settings + def optimize_for_search! + Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } } + end + + # Estimate the amount of documents that would be indexed. Not exact! + # @returns [Integer] + def estimate! + ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i } + end + + # Import data from the database into the index + def import! + raise NotImplementedError + end + + # Remove documents from the index that no longer exist in the database + def clean_up! + index.scroll_batches do |documents| + ids = documents.map { |doc| doc['_id'] } + existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true } + tmp = ids.reject { |id| existence_map[id] } + + next if tmp.empty? + + in_work_unit(tmp) do |deleted_ids| + bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk) + + [0, bulk.size] + end + end + + wait! + end + + protected + + def in_work_unit(*args, &block) + work_unit = Concurrent::Promises.future_on(@executor, *args, &block) + + work_unit.on_fulfillment!(&@on_progress) + work_unit.on_rejection!(&@on_failure) + work_unit.on_resolution! { @wait_for.delete(work_unit) } + + @wait_for << work_unit + rescue Concurrent::RejectedExecutionError + sleep(0.1) && retry # Backpressure + end + + def wait! + Concurrent::Promises.zip(*@wait_for).wait + end + + def index + raise NotImplementedError + end +end |