about summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/chewy/strategy/custom_sidekiq.rb11
-rw-r--r--lib/chewy/strategy/mastodon.rb27
-rw-r--r--lib/mastodon/search_cli.rb129
-rw-r--r--lib/mastodon/version.rb2
-rw-r--r--lib/tasks/mastodon.rake8
5 files changed, 76 insertions, 101 deletions
diff --git a/lib/chewy/strategy/custom_sidekiq.rb b/lib/chewy/strategy/custom_sidekiq.rb
deleted file mode 100644
index 794ae4ed4..000000000
--- a/lib/chewy/strategy/custom_sidekiq.rb
+++ /dev/null
@@ -1,11 +0,0 @@
-# frozen_string_literal: true
-
-module Chewy
-  class Strategy
-    class CustomSidekiq < Sidekiq
-      def update(_type, _objects, _options = {})
-        super if Chewy.enabled?
-      end
-    end
-  end
-end
diff --git a/lib/chewy/strategy/mastodon.rb b/lib/chewy/strategy/mastodon.rb
new file mode 100644
index 000000000..ee8b92186
--- /dev/null
+++ b/lib/chewy/strategy/mastodon.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+module Chewy
+  class Strategy
+    class Mastodon < Base
+      def initialize
+        super
+
+        @stash = Hash.new { |hash, key| hash[key] = [] }
+      end
+
+      def update(type, objects, _options = {})
+        @stash[type].concat(type.root.id ? Array.wrap(objects) : type.adapter.identify(objects)) if Chewy.enabled?
+      end
+
+      def leave
+        RedisConfiguration.with do |redis|
+          redis.pipelined do |pipeline|
+            @stash.each do |type, ids|
+              pipeline.sadd("chewy:queue:#{type.name}", ids)
+            end
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 74f980ba1..b579ebc14 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -16,19 +16,21 @@ module Mastodon
       StatusesIndex,
     ].freeze
 
-    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
-    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
+    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
+    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
     option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
+    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
+    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
     desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
     long_desc <<~LONG_DESC
       If Elasticsearch is empty, this command will create the necessary indices
       and then import data from the database into those indices.
 
       This command will also upgrade indices if the underlying schema has been
-      changed since the last run.
+      changed since the last run. Index upgrades erase index data.
 
       Even if creating or upgrading indices is not necessary, data from the
-      database will be imported into the indices.
+      database will be imported into the indices, unless overriden with --no-import.
     LONG_DESC
     def deploy
       if options[:concurrency] < 1
@@ -49,7 +51,9 @@ module Mastodon
         end
       end
 
-      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
+      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
+      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
+      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
 
       # First, ensure all indices are created and have the correct
       # structure, so that live data can already be written
@@ -59,99 +63,46 @@ module Mastodon
         index.specification.lock!
       end
 
+      progress.title = 'Estimating workload '
+      progress.total = indices.sum { |index| importers[index].estimate! }
+
       reset_connection_pools!
 
-      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
-      added   = Concurrent::AtomicFixnum.new(0)
-      removed = Concurrent::AtomicFixnum.new(0)
+      added   = 0
+      removed = 0
 
-      progress.title = 'Estimating workload '
+      indices.each do |index|
+        importer = importers[index]
+        importer.optimize_for_import!
+
+        importer.on_progress do |(indexed, deleted)|
+          progress.total = nil if progress.progress + indexed + deleted > progress.total
+          progress.progress += indexed + deleted
+          added   += indexed
+          removed += deleted
+        end
 
-      # Estimate the amount of data that has to be imported first
-      progress.total = indices.sum { |index| index.adapter.default_scope.count }
+        importer.on_failure do |reason|
+          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
+        end
 
-      # Now import all the actual data. Mind that unlike chewy:sync, we don't
-      # fetch and compare all record IDs from the database and the index to
-      # find out which to add and which to remove from the index. Because with
-      # potentially millions of rows, the memory footprint of such a calculation
-      # is uneconomical. So we only ever add.
-      indices.each do |index|
-        progress.title = "Importing #{index} "
-        batch_size     = options[:batch_size]
-        slice_size     = (batch_size / options[:concurrency]).ceil
-
-        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-          futures = []
-
-          batch.each_slice(slice_size) do |records|
-            futures << Concurrent::Future.execute(executor: pool) do
-              begin
-                if !progress.total.nil? && progress.progress + records.size > progress.total
-                  # The number of items has changed between start and now,
-                  # since there is no good way to predict the final count from
-                  # here, just change the progress bar to an indeterminate one
-
-                  progress.total = nil
-                end
-
-                grouped_records = nil
-                bulk_body       = nil
-                index_count     = 0
-                delete_count    = 0
-
-                ActiveRecord::Base.connection_pool.with_connection do
-                  grouped_records = records.to_a.group_by do |record|
-                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
-                  end
-
-                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
-                end
-
-                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
-                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
-
-                # The following is an optimization for statuses specifically, since
-                # we want to de-index statuses that cannot be searched by anybody,
-                # but can't use Chewy's delete_if logic because it doesn't use
-                # crutches and our searchable_by logic depends on them
-                if index == StatusesIndex
-                  bulk_body.map! do |entry|
-                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
-                      index_count  -= 1
-                      delete_count += 1
-
-                      { delete: entry[:to_index].except(:data) }
-                    else
-                      entry
-                    end
-                  end
-                end
-
-                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
-
-                progress.progress += records.size
-
-                added.increment(index_count)
-                removed.increment(delete_count)
-
-                sleep 1
-              rescue => e
-                progress.log pastel.red("Error importing #{index}: #{e}")
-              ensure
-                RedisConfiguration.pool.checkin if Thread.current[:redis]
-                Thread.current[:redis] = nil
-              end
-            end
-          end
-
-          futures.map(&:value)
+        if options[:import]
+          progress.title = "Importing #{index} "
+          importer.import!
+        end
+
+        if options[:clean]
+          progress.title = "Cleaning #{index} "
+          importer.clean_up!
         end
+      ensure
+        importer.optimize_for_search!
       end
 
-      progress.title = ''
-      progress.stop
+      progress.title = 'Done! '
+      progress.finish
 
-      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
+      say("Indexed #{added} records, de-indexed #{removed}", :green, true)
     end
   end
 end
diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb
index 4b4b4b262..75e283cce 100644
--- a/lib/mastodon/version.rb
+++ b/lib/mastodon/version.rb
@@ -13,7 +13,7 @@ module Mastodon
     end
 
     def patch
-      1
+      3
     end
 
     def flags
diff --git a/lib/tasks/mastodon.rake b/lib/tasks/mastodon.rake
index a89af6778..d652468b3 100644
--- a/lib/tasks/mastodon.rake
+++ b/lib/tasks/mastodon.rake
@@ -8,6 +8,14 @@ namespace :mastodon do
     prompt = TTY::Prompt.new
     env    = {}
 
+    # When the application code gets loaded, it runs `lib/mastodon/redis_configuration.rb`.
+    # This happens before application environment configuration and sets REDIS_URL etc.
+    # These variables are then used even when REDIS_HOST etc. are changed, so clear them
+    # out so they don't interfer with our new configuration.
+    ENV.delete('REDIS_URL')
+    ENV.delete('CACHE_REDIS_URL')
+    ENV.delete('SIDEKIQ_REDIS_URL')
+
     begin
       prompt.say('Your instance is identified by its domain name. Changing it afterward will break things.')
       env['LOCAL_DOMAIN'] = prompt.ask('Domain name:') do |q|