about summary refs log tree commit diff
path: root/app/services/activitypub/process_collection_items_service.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/services/activitypub/process_collection_items_service.rb')
-rw-r--r--app/services/activitypub/process_collection_items_service.rb30
1 files changed, 30 insertions, 0 deletions
diff --git a/app/services/activitypub/process_collection_items_service.rb b/app/services/activitypub/process_collection_items_service.rb
new file mode 100644
index 000000000..9c30d81e9
--- /dev/null
+++ b/app/services/activitypub/process_collection_items_service.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+class ActivityPub::ProcessCollectionItemsService < BaseService
+  def call(account_id, on_behalf_of)
+    RedisLock.acquire(lock_options(account_id)) do |lock|
+      if lock.acquired?
+        CollectionItem.unprocessed.where(account_id: account_id).find_each do |item|
+          # Avoid failing servers holding up the rest of the queue.
+          next if item.retries.positive? && rand(3).positive?
+
+          begin
+            FetchRemoteStatusService.new.call(item.uri, nil, on_behalf_of)
+          rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound
+            nil
+          rescue HTTP::TimeoutError
+            item.increment!(:retries)
+          end
+
+          item.update!(processed: true) if item.retries.zero? || item.retries > 4
+        end
+      end
+    end
+  end
+
+  private
+
+  def lock_options(account_id)
+    { redis: Redis.current, key: "process_collection_items:#{account_id}" }
+  end
+end