about summary refs log tree commit diff
path: root/lib/mastodon/search_cli.rb
blob: 2d1ca1c05425d557e429454d16a3ee80ca398150 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# frozen_string_literal: true

require_relative '../../config/boot'
require_relative '../../config/environment'
require_relative 'cli_helper'

module Mastodon
  class SearchCLI < Thor
    include CLIHelper

    # Indices are sorted by amount of data to be expected in each, so that
    # smaller indices can go online sooner
    INDICES = [
      AccountsIndex,
      TagsIndex,
      StatusesIndex,
    ].freeze

    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
    option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
    desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
    long_desc <<~LONG_DESC
      If ElasticSearch is empty, this command will create the necessary indices
      and then import data from the database into those indices.

      This command will also upgrade indices if the underlying schema has been
      changed since the last run.

      Even if creating or upgrading indices is not necessary, data from the
      database will be imported into the indices.
    LONG_DESC
    def deploy
      if options[:concurrency] < 1
        say('Cannot run with this concurrency setting, must be at least 1', :red)
        exit(1)
      end

      indices = begin
        if options[:only]
          options[:only].map { |str| "#{str.camelize}Index".constantize }
        else
          INDICES
        end
      end

      progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)

      # First, ensure all indices are created and have the correct
      # structure, so that live data can already be written
      indices.select { |index| index.specification.changed? }.each do |index|
        progress.title = "Upgrading #{index} "
        index.purge
        index.specification.lock!
      end

      db_config = ActiveRecord::Base.configurations[Rails.env].dup
      db_config['pool'] = options[:concurrency] + 1
      ActiveRecord::Base.establish_connection(db_config)

      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
      added   = Concurrent::AtomicFixnum.new(0)
      removed = Concurrent::AtomicFixnum.new(0)

      progress.title = 'Estimating workload '

      # Estimate the amount of data that has to be imported first
      progress.total = indices.sum { |index| index.adapter.default_scope.count }

      # Now import all the actual data. Mind that unlike chewy:sync, we don't
      # fetch and compare all record IDs from the database and the index to
      # find out which to add and which to remove from the index. Because with
      # potentially millions of rows, the memory footprint of such a calculation
      # is uneconomical. So we only ever add.
      indices.each do |index|
        progress.title = "Importing #{index} "
        batch_size     = 1_000
        slice_size     = (batch_size / options[:concurrency]).ceil

        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
          futures = []

          batch.each_slice(slice_size) do |records|
            futures << Concurrent::Future.execute(executor: pool) do
              begin
                if !progress.total.nil? && progress.progress + records.size > progress.total
                  # The number of items has changed between start and now,
                  # since there is no good way to predict the final count from
                  # here, just change the progress bar to an indeterminate one

                  progress.total = nil
                end

                grouped_records = nil
                bulk_body       = nil
                index_count     = 0
                delete_count    = 0

                ActiveRecord::Base.connection_pool.with_connection do
                  grouped_records = records.to_a.group_by do |record|
                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
                  end

                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
                end

                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)

                # The following is an optimization for statuses specifically, since
                # we want to de-index statuses that cannot be searched by anybody,
                # but can't use Chewy's delete_if logic because it doesn't use
                # crutches and our searchable_by logic depends on them
                if index == StatusesIndex
                  bulk_body.map! do |entry|
                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
                      index_count  -= 1
                      delete_count += 1

                      { delete: entry[:to_index].except(:data) }
                    else
                      entry
                    end
                  end
                end

                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)

                progress.progress += records.size

                added.increment(index_count)
                removed.increment(delete_count)

                sleep 1
              rescue => e
                progress.log pastel.red("Error importing #{index}: #{e}")
              end
            end
          end

          futures.map(&:value)
        end
      end

      progress.title = ''
      progress.stop

      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
    end
  end
end