diff options
author | Thibaut Girka <thib@sitedethib.com> | 2020-07-15 15:32:40 +0200 |
---|---|---|
committer | Thibaut Girka <thib@sitedethib.com> | 2020-07-15 15:32:40 +0200 |
commit | 3f60b096b51b000905290d69ea05b874b60fa9e0 (patch) | |
tree | 0b8c7011810b4a97523c2c63816735e519f0682c /lib | |
parent | 7a23347db5be3f262dbcafbecf768588dc648bda (diff) | |
parent | d9cad44ca54374cafa0c97775083bb1bc0a5a3ce (diff) |
Merge branch 'master' into glitch-soc/merge-upstream
Conflicts: - `config/routes.rb`: Upstream disabled E2EE routes, which we did earlier, but slightly differently. Took upstream's version.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mastodon/cli_helper.rb | 1 | ||||
-rw-r--r-- | lib/mastodon/search_cli.rb | 142 | ||||
-rw-r--r-- | lib/mastodon/version.rb | 2 |
3 files changed, 128 insertions, 17 deletions
diff --git a/lib/mastodon/cli_helper.rb b/lib/mastodon/cli_helper.rb index 4a20fa8d6..ed22f44b2 100644 --- a/lib/mastodon/cli_helper.rb +++ b/lib/mastodon/cli_helper.rb @@ -7,6 +7,7 @@ ActiveRecord::Base.logger = dev_null ActiveJob::Base.logger = dev_null HttpLog.configuration.logger = dev_null Paperclip.options[:log] = false +Chewy.logger = dev_null module Mastodon module CLIHelper diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb index 8bd5f9543..22a0acec8 100644 --- a/lib/mastodon/search_cli.rb +++ b/lib/mastodon/search_cli.rb @@ -6,8 +6,19 @@ require_relative 'cli_helper' module Mastodon class SearchCLI < Thor - option :processes, default: 2, aliases: [:p] - desc 'deploy', 'Create or update an ElasticSearch index and populate it' + include CLIHelper + + # Indices are sorted by amount of data to be expected in each, so that + # smaller indices can go online sooner + INDICES = [ + AccountsIndex, + TagsIndex, + StatusesIndex, + ].freeze + + option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' + option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' + desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them' long_desc <<~LONG_DESC If ElasticSearch is empty, this command will create the necessary indices and then import data from the database into those indices. @@ -15,27 +26,126 @@ module Mastodon This command will also upgrade indices if the underlying schema has been changed since the last run. - With the --processes option, parallelize execution of the command. The - default is 2. If "auto" is specified, the number is automatically - derived from available CPUs. + Even if creating or upgrading indices is not necessary, data from the + database will be imported into the indices. LONG_DESC def deploy - processed = Chewy::RakeHelper.upgrade(parallel: processes) - Chewy::RakeHelper.sync(except: processed, parallel: processes) - end + if options[:concurrency] < 1 + say('Cannot run with this concurrency setting, must be at least 1', :red) + exit(1) + end + + indices = begin + if options[:only] + options[:only].map { |str| "#{str.camelize}Index".constantize } + else + INDICES + end + end + + progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) + + # First, ensure all indices are created and have the correct + # structure, so that live data can already be written + indices.select { |index| index.specification.changed? }.each do |index| + progress.title = "Upgrading #{index} " + index.purge + index.specification.lock! + end + + ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1 + + pool = Concurrent::FixedThreadPool.new(options[:concurrency]) + added = Concurrent::AtomicFixnum.new(0) + removed = Concurrent::AtomicFixnum.new(0) + + progress.title = 'Estimating workload ' + + # Estimate the amount of data that has to be imported first + indices.each do |index| + index.types.each do |type| + progress.total = (progress.total || 0) + type.adapter.default_scope.count + end + end + + # Now import all the actual data. Mind that unlike chewy:sync, we don't + # fetch and compare all record IDs from the database and the index to + # find out which to add and which to remove from the index. Because with + # potentially millions of rows, the memory footprint of such a calculation + # is uneconomical. So we only ever add. + indices.each do |index| + progress.title = "Importing #{index} " + batch_size = 1_000 + slice_size = (batch_size / options[:concurrency]).ceil + + index.types.each do |type| + type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| + futures = [] + + batch.each_slice(slice_size) do |records| + futures << Concurrent::Future.execute(executor: pool) do + begin + if !progress.total.nil? && progress.progress + records.size > progress.total + # The number of items has changed between start and now, + # since there is no good way to predict the final count from + # here, just change the progress bar to an indeterminate one + + progress.total = nil + end + + grouped_records = nil + bulk_body = nil + index_count = 0 + delete_count = 0 + + ActiveRecord::Base.connection_pool.with_connection do + grouped_records = type.adapter.send(:grouped_objects, records) + bulk_body = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body + end - private + index_count = grouped_records[:index].size if grouped_records.key?(:index) + delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) - def processes - return true if options[:processes] == 'auto' + # The following is an optimization for statuses specifically, since + # we want to de-index statuses that cannot be searched by anybody, + # but can't use Chewy's delete_if logic because it doesn't use + # crutches and our searchable_by logic depends on them + if type == StatusesIndex::Status + bulk_body.map! do |entry| + if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? + index_count -= 1 + delete_count += 1 - num = options[:processes].to_i + { delete: entry[:index].except(:data) } + else + entry + end + end + end - if num < 2 - nil - else - num + Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body) + + progress.progress += records.size + + added.increment(index_count) + removed.increment(delete_count) + + sleep 1 + rescue => e + progress.log pastel.red("Error importing #{index}: #{e}") + end + end + end + + futures.map(&:value) + end + end end + + progress.title = '' + progress.stop + + say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) end end end diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb index fa85f8e37..87fc67deb 100644 --- a/lib/mastodon/version.rb +++ b/lib/mastodon/version.rb @@ -13,7 +13,7 @@ module Mastodon end def patch - 4 + 5 end def flags |