about summary refs log tree commit diff
path: root/lib/mastodon/search_cli.rb
diff options
context:
space:
mode:
authorTakeshi Umeda <noel.yoshiba@gmail.com>2021-11-19 06:02:08 +0900
committerGitHub <noreply@github.com>2021-11-18 22:02:08 +0100
commit3419d3ec84c3aa4f450265642e0a85dcdd3c36d0 (patch)
treedfd530fcfaed29a5421138087ef3a240d3c45a3c /lib/mastodon/search_cli.rb
parent2b6a25c609e2d1814e5dd586e9f40868d944c8bc (diff)
Bump chewy from 5.2.0 to 7.2.3 (supports Elasticsearch 7.x) (#16915)
* Bump chewy from 5.2.0 to 7.2.2

* fix style (codeclimate)

* fix style

* fix style

* Bump chewy from 7.2.2 to 7.2.3
Diffstat (limited to 'lib/mastodon/search_cli.rb')
-rw-r--r--lib/mastodon/search_cli.rb99
1 files changed, 48 insertions, 51 deletions
diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb
index 0126dfcff..2d1ca1c05 100644
--- a/lib/mastodon/search_cli.rb
+++ b/lib/mastodon/search_cli.rb
@@ -64,11 +64,7 @@ module Mastodon
       progress.title = 'Estimating workload '
 
       # Estimate the amount of data that has to be imported first
-      indices.each do |index|
-        index.types.each do |type|
-          progress.total = (progress.total || 0) + type.adapter.default_scope.count
-        end
-      end
+      progress.total = indices.sum { |index| index.adapter.default_scope.count }
 
       # Now import all the actual data. Mind that unlike chewy:sync, we don't
       # fetch and compare all record IDs from the database and the index to
@@ -80,67 +76,68 @@ module Mastodon
         batch_size     = 1_000
         slice_size     = (batch_size / options[:concurrency]).ceil
 
-        index.types.each do |type|
-          type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
-            futures = []
+        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
+          futures = []
 
-            batch.each_slice(slice_size) do |records|
-              futures << Concurrent::Future.execute(executor: pool) do
-                begin
-                  if !progress.total.nil? && progress.progress + records.size > progress.total
-                    # The number of items has changed between start and now,
-                    # since there is no good way to predict the final count from
-                    # here, just change the progress bar to an indeterminate one
+          batch.each_slice(slice_size) do |records|
+            futures << Concurrent::Future.execute(executor: pool) do
+              begin
+                if !progress.total.nil? && progress.progress + records.size > progress.total
+                  # The number of items has changed between start and now,
+                  # since there is no good way to predict the final count from
+                  # here, just change the progress bar to an indeterminate one
 
-                    progress.total = nil
-                  end
+                  progress.total = nil
+                end
 
-                  grouped_records = nil
-                  bulk_body       = nil
-                  index_count     = 0
-                  delete_count    = 0
+                grouped_records = nil
+                bulk_body       = nil
+                index_count     = 0
+                delete_count    = 0
 
-                  ActiveRecord::Base.connection_pool.with_connection do
-                    grouped_records = type.adapter.send(:grouped_objects, records)
-                    bulk_body       = Chewy::Type::Import::BulkBuilder.new(type, **grouped_records).bulk_body
+                ActiveRecord::Base.connection_pool.with_connection do
+                  grouped_records = records.to_a.group_by do |record|
+                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
                   end
 
-                  index_count  = grouped_records[:index].size  if grouped_records.key?(:index)
-                  delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)
-
-                  # The following is an optimization for statuses specifically, since
-                  # we want to de-index statuses that cannot be searched by anybody,
-                  # but can't use Chewy's delete_if logic because it doesn't use
-                  # crutches and our searchable_by logic depends on them
-                  if type == StatusesIndex::Status
-                    bulk_body.map! do |entry|
-                      if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
-                        index_count  -= 1
-                        delete_count += 1
-
-                        { delete: entry[:index].except(:data) }
-                      else
-                        entry
-                      end
+                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
+                end
+
+                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
+                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
+
+                # The following is an optimization for statuses specifically, since
+                # we want to de-index statuses that cannot be searched by anybody,
+                # but can't use Chewy's delete_if logic because it doesn't use
+                # crutches and our searchable_by logic depends on them
+                if index == StatusesIndex
+                  bulk_body.map! do |entry|
+                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
+                      index_count  -= 1
+                      delete_count += 1
+
+                      { delete: entry[:to_index].except(:data) }
+                    else
+                      entry
                     end
                   end
+                end
 
-                  Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)
+                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
 
-                  progress.progress += records.size
+                progress.progress += records.size
 
-                  added.increment(index_count)
-                  removed.increment(delete_count)
+                added.increment(index_count)
+                removed.increment(delete_count)
 
-                  sleep 1
-                rescue => e
-                  progress.log pastel.red("Error importing #{index}: #{e}")
-                end
+                sleep 1
+              rescue => e
+                progress.log pastel.red("Error importing #{index}: #{e}")
               end
             end
-
-            futures.map(&:value)
           end
+
+          futures.map(&:value)
         end
       end