diff options
Diffstat (limited to 'app/workers/activitypub')
3 files changed, 104 insertions, 0 deletions
diff --git a/app/workers/activitypub/process_collection_items_for_account_worker.rb b/app/workers/activitypub/process_collection_items_for_account_worker.rb new file mode 100644 index 000000000..4b5710c1d --- /dev/null +++ b/app/workers/activitypub/process_collection_items_for_account_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsForAccountWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 3 + + def perform(account_id) + @account_id = account_id + on_behalf_of = nil + + if account_id.present? + account = Account.find(account_id) + on_behalf_of = account.followers.local.random.first + end + + ActivityPub::ProcessCollectionItemsService.new.call(account_id, on_behalf_of) + rescue ActiveRecord::RecordNotFound + nil + end +end diff --git a/app/workers/activitypub/process_collection_items_worker.rb b/app/workers/activitypub/process_collection_items_worker.rb new file mode 100644 index 000000000..d830edaec --- /dev/null +++ b/app/workers/activitypub/process_collection_items_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 0 + + def perform + return if Sidekiq::Stats.new.workers_size > 3 + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + account_id = random_unprocessed_account_id + ActivityPub::ProcessCollectionItemsForAccountWorker.perform_async(account_id) if account_id.present? + end + end + end + + private + + def random_unprocessed_account_id + CollectionItem.unprocessed.pluck(:account_id).sample + end + + def lock_options + { redis: Redis.current, key: 'process_collection_items' } + end +end diff --git a/app/workers/activitypub/sync_account_worker.rb b/app/workers/activitypub/sync_account_worker.rb new file mode 100644 index 000000000..18825b20d --- /dev/null +++ b/app/workers/activitypub/sync_account_worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true +class ActivityPub::SyncAccountWorker + include Sidekiq::Worker + include ExponentialBackoff + + sidekiq_options queue: 'pull', retry: 5 + + def perform(account_id, every_page = false, skip_cooldown = false) + @account = Account.find(account_id) + return if @account.local? + + @from_migrated_account = @account.moved_to_account&.local? + return unless @from_migrated_account || @account.followers.local.exists? + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + fetch_collection_items(every_page, skip_cooldown) + elsif @from_migrated_account + # Cause a retry so server-to-server migrations can complete. + raise Mastodon::RaceConditionError + end + end + rescue ActiveRecord::RecordNotFound + nil + end + + private + + def lock_options + { redis: Redis.current, key: "account_sync:#{@account.id}" } + end + + # Limits for an account moving to this server. + def limits_migrated + { + page_limit: 2_000, + item_limit: 40_000, + look_ahead: true, + } + end + + # Limits for an account someone locally follows. + def limits_followed + { + page_limit: 25, + item_limit: 500, + look_ahead: @account.last_synced_at.blank?, + } + end + + def fetch_collection_items(every_page, skip_cooldown) + opts = @from_migrated_account && every_page ? limits_migrated : limits_followed + opts.merge!({ every_page: every_page, skip_cooldown: skip_cooldown }) + ActivityPub::FetchCollectionItemsService.new.call(@account.outbox_url, @account, **opts) + @account.update(last_synced_at: Time.now.utc) + end +end |