about summary refs log tree commit diff
path: root/lib/mastodon/search_cli.rb
blob: 22a0acec80e992f376ddf0d1c085488c81bf8a5a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# frozen_string_literal: true

require_relative '../../config/boot'
require_relative '../../config/environment'
require_relative 'cli_helper'

module Mastodon
  class SearchCLI < Thor
    include CLIHelper

    # Indices are sorted by amount of data to be expected in each, so that
    # smaller indices can go online sooner
    INDICES = [
      AccountsIndex,
      TagsIndex,
      StatusesIndex,
    ].freeze

    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
    option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
    desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
    long_desc <<~LONG_DESC
      If ElasticSearch is empty, this command will create the necessary indices
      and then import data from the database into those indices.

      This command will also upgrade indices if the underlying schema has been
      changed since the last run.

      Even if creating or upgrading indices is not necessary, data from the
      database will be imported into the indices.
    LONG_DESC
    def deploy
      if options[:concurrency] < 1
        say('Cannot run with this concurrency setting, must be at least 1', :red)
        exit(1)
      end

      indices = begin
        if options[:only]
          options[:only].map { |str| "#{str.camelize}Index".constantize }
        else
          INDICES
        end
      end

      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)

      # First, ensure all indices are created and have the correct
      # structure, so that live data can already be written
      indices.select { |index| index.specification.changed? }.each do |index|
        progress.title = "Upgrading #{index} "
        index.purge
        index.specification.lock!
      end

      ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1

      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
      added   = Concurrent::AtomicFixnum.new(0)
      removed = Concurrent::AtomicFixnum.new(0)

      progress.title = 'Estimating workload '

      # Estimate the amount of data that has to be imported first
      indices.each do |index|
        index.types.each do |type|
          progress.total = (progress.total || 0) + type.adapter.default_scope.count
        end
      end

      # Now import all the actual data. Mind that unlike chewy:sync, we don't
      # fetch and compare all record IDs from the database and the index to
      # find out which to add and which to remove from the index. Because with
      # potentially millions of rows, the memory footprint of such a calculation
      # is uneconomical. So we only ever add.
      indices.each do |index|
        progress.title = "Importing #{index} "
        batch_size     = 1_000
        slice_size     = (batch_size / options[:concurrency]).ceil

        index.types.each do |type|
          type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
            futures = []

            batch.each_slice(slice_size) do |records|
              futures << Concurrent::Future.execute(executor: pool) do
                begin
                  if !progress.total.nil? && progress.progress + records.size > progress.total
                    # The number of items has changed between start and now,
                    # since there is no good way to predict the final count from
                    # here, just change the progress bar to an indeterminate one

                    progress.total = nil
                  end

                  grouped_records = nil
                  bulk_body       = nil
                  index_count     = 0
                  delete_count    = 0

                  ActiveRecord::Base.connection_pool.with_connection do
                    grouped_records = type.adapter.send(:grouped_objects, records)
                    bulk_body       = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body
                  end

                  index_count  = grouped_records[:index].size  if grouped_records.key?(:index)
                  delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)

                  # The following is an optimization for statuses specifically, since
                  # we want to de-index statuses that cannot be searched by anybody,
                  # but can't use Chewy's delete_if logic because it doesn't use
                  # crutches and our searchable_by logic depends on them
                  if type == StatusesIndex::Status
                    bulk_body.map! do |entry|
                      if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
                        index_count  -= 1
                        delete_count += 1

                        { delete: entry[:index].except(:data) }
                      else
                        entry
                      end
                    end
                  end

                  Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)

                  progress.progress += records.size

                  added.increment(index_count)
                  removed.increment(delete_count)

                  sleep 1
                rescue => e
                  progress.log pastel.red("Error importing #{index}: #{e}")
                end
              end
            end

            futures.map(&:value)
          end
        end
      end

      progress.title = ''
      progress.stop

      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
    end
  end
end