diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/chewy/strategy/custom_sidekiq.rb | 11 | ||||
-rw-r--r-- | lib/chewy/strategy/mastodon.rb | 27 | ||||
-rw-r--r-- | lib/mastodon/search_cli.rb | 129 | ||||
-rw-r--r-- | lib/mastodon/version.rb | 2 | ||||
-rw-r--r-- | lib/tasks/mastodon.rake | 8 |
5 files changed, 76 insertions, 101 deletions
diff --git a/lib/chewy/strategy/custom_sidekiq.rb b/lib/chewy/strategy/custom_sidekiq.rb deleted file mode 100644 index 794ae4ed4..000000000 --- a/lib/chewy/strategy/custom_sidekiq.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -module Chewy - class Strategy - class CustomSidekiq < Sidekiq - def update(_type, _objects, _options = {}) - super if Chewy.enabled? - end - end - end -end diff --git a/lib/chewy/strategy/mastodon.rb b/lib/chewy/strategy/mastodon.rb new file mode 100644 index 000000000..ee8b92186 --- /dev/null +++ b/lib/chewy/strategy/mastodon.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Chewy + class Strategy + class Mastodon < Base + def initialize + super + + @stash = Hash.new { |hash, key| hash[key] = [] } + end + + def update(type, objects, _options = {}) + @stash[type].concat(type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)) if Chewy.enabled? + end + + def leave + RedisConfiguration.with do |redis| + redis.pipelined do |pipeline| + @stash.each do |type, ids| + pipeline.sadd("chewy:queue:#{type.name}", ids) + end + end + end + end + end + end +end diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb index 74f980ba1..b579ebc14 100644 --- a/lib/mastodon/search_cli.rb +++ b/lib/mastodon/search_cli.rb @@ -16,19 +16,21 @@ module Mastodon StatusesIndex, ].freeze - option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' - option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch' + option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads' + option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch' option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' + option :import, type: :boolean, default: true, desc: 'Import data from the database to the index' + option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index' desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them' long_desc <<~LONG_DESC If Elasticsearch is empty, this command will create the necessary indices and then import data from the database into those indices. This command will also upgrade indices if the underlying schema has been - changed since the last run. + changed since the last run. Index upgrades erase index data. Even if creating or upgrading indices is not necessary, data from the - database will be imported into the indices. + database will be imported into the indices, unless overriden with --no-import. LONG_DESC def deploy if options[:concurrency] < 1 @@ -49,7 +51,9 @@ module Mastodon end end - progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) + pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10) + importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) } + progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) # First, ensure all indices are created and have the correct # structure, so that live data can already be written @@ -59,99 +63,46 @@ module Mastodon index.specification.lock! end + progress.title = 'Estimating workload ' + progress.total = indices.sum { |index| importers[index].estimate! } + reset_connection_pools! - pool = Concurrent::FixedThreadPool.new(options[:concurrency]) - added = Concurrent::AtomicFixnum.new(0) - removed = Concurrent::AtomicFixnum.new(0) + added = 0 + removed = 0 - progress.title = 'Estimating workload ' + indices.each do |index| + importer = importers[index] + importer.optimize_for_import! + + importer.on_progress do |(indexed, deleted)| + progress.total = nil if progress.progress + indexed + deleted > progress.total + progress.progress += indexed + deleted + added += indexed + removed += deleted + end - # Estimate the amount of data that has to be imported first - progress.total = indices.sum { |index| index.adapter.default_scope.count } + importer.on_failure do |reason| + progress.log(pastel.red("Error while importing #{index}: #{reason}")) + end - # Now import all the actual data. Mind that unlike chewy:sync, we don't - # fetch and compare all record IDs from the database and the index to - # find out which to add and which to remove from the index. Because with - # potentially millions of rows, the memory footprint of such a calculation - # is uneconomical. So we only ever add. - indices.each do |index| - progress.title = "Importing #{index} " - batch_size = options[:batch_size] - slice_size = (batch_size / options[:concurrency]).ceil - - index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| - futures = [] - - batch.each_slice(slice_size) do |records| - futures << Concurrent::Future.execute(executor: pool) do - begin - if !progress.total.nil? && progress.progress + records.size > progress.total - # The number of items has changed between start and now, - # since there is no good way to predict the final count from - # here, just change the progress bar to an indeterminate one - - progress.total = nil - end - - grouped_records = nil - bulk_body = nil - index_count = 0 - delete_count = 0 - - ActiveRecord::Base.connection_pool.with_connection do - grouped_records = records.to_a.group_by do |record| - index.adapter.send(:delete_from_index?, record) ? :delete : :to_index - end - - bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body - end - - index_count = grouped_records[:to_index].size if grouped_records.key?(:to_index) - delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) - - # The following is an optimization for statuses specifically, since - # we want to de-index statuses that cannot be searched by anybody, - # but can't use Chewy's delete_if logic because it doesn't use - # crutches and our searchable_by logic depends on them - if index == StatusesIndex - bulk_body.map! do |entry| - if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank? - index_count -= 1 - delete_count += 1 - - { delete: entry[:to_index].except(:data) } - else - entry - end - end - end - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body) - - progress.progress += records.size - - added.increment(index_count) - removed.increment(delete_count) - - sleep 1 - rescue => e - progress.log pastel.red("Error importing #{index}: #{e}") - ensure - RedisConfiguration.pool.checkin if Thread.current[:redis] - Thread.current[:redis] = nil - end - end - end - - futures.map(&:value) + if options[:import] + progress.title = "Importing #{index} " + importer.import! + end + + if options[:clean] + progress.title = "Cleaning #{index} " + importer.clean_up! end + ensure + importer.optimize_for_search! end - progress.title = '' - progress.stop + progress.title = 'Done! ' + progress.finish - say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) + say("Indexed #{added} records, de-indexed #{removed}", :green, true) end end end diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb index 4b4b4b262..75e283cce 100644 --- a/lib/mastodon/version.rb +++ b/lib/mastodon/version.rb @@ -13,7 +13,7 @@ module Mastodon end def patch - 1 + 3 end def flags diff --git a/lib/tasks/mastodon.rake b/lib/tasks/mastodon.rake index a89af6778..d652468b3 100644 --- a/lib/tasks/mastodon.rake +++ b/lib/tasks/mastodon.rake @@ -8,6 +8,14 @@ namespace :mastodon do prompt = TTY::Prompt.new env = {} + # When the application code gets loaded, it runs `lib/mastodon/redis_configuration.rb`. + # This happens before application environment configuration and sets REDIS_URL etc. + # These variables are then used even when REDIS_HOST etc. are changed, so clear them + # out so they don't interfer with our new configuration. + ENV.delete('REDIS_URL') + ENV.delete('CACHE_REDIS_URL') + ENV.delete('SIDEKIQ_REDIS_URL') + begin prompt.say('Your instance is identified by its domain name. Changing it afterward will break things.') env['LOCAL_DOMAIN'] = prompt.ask('Domain name:') do |q| |