about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/distribution_worker.rb9
-rw-r--r--app/workers/scheduler/indexing_scheduler.rb26
2 files changed, 29 insertions, 6 deletions
diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb
index 474b4daaf..59cdbc7b2 100644
--- a/app/workers/distribution_worker.rb
+++ b/app/workers/distribution_worker.rb
@@ -3,14 +3,11 @@
 class DistributionWorker
   include Sidekiq::Worker
   include Redisable
+  include Lockable
 
   def perform(status_id, options = {})
-    RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
-      if lock.acquired?
-        FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("distribute:#{status_id}") do
+      FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
     end
   rescue ActiveRecord::RecordNotFound
     true
diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb
new file mode 100644
index 000000000..3a6f47a29
--- /dev/null
+++ b/app/workers/scheduler/indexing_scheduler.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+class Scheduler::IndexingScheduler
+  include Sidekiq::Worker
+  include Redisable
+
+  sidekiq_options retry: 0
+
+  def perform
+    indexes.each do |type|
+      with_redis do |redis|
+        ids = redis.smembers("chewy:queue:#{type.name}")
+
+        type.import!(ids)
+
+        redis.pipelined do |pipeline|
+          ids.each { |id| pipeline.srem("chewy:queue:#{type.name}", id) }
+        end
+      end
+    end
+  end
+
+  def indexes
+    [AccountsIndex, TagsIndex, StatusesIndex]
+  end
+end