From 23aeef52cc4540b4514e9f3b935b21f0530a3746 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Sat, 6 Jul 2019 23:26:16 +0200 Subject: Remove Salmon and PubSubHubbub (#11205) * Remove Salmon and PubSubHubbub endpoints * Add error when trying to follow OStatus accounts * Fix new accounts not being created in ResolveAccountService --- app/workers/after_remote_follow_request_worker.rb | 24 +------ app/workers/after_remote_follow_worker.rb | 24 +------ app/workers/notification_worker.rb | 4 +- app/workers/processing_worker.rb | 4 +- app/workers/pubsubhubbub/confirmation_worker.rb | 75 +--------------------- app/workers/pubsubhubbub/delivery_worker.rb | 74 +-------------------- app/workers/pubsubhubbub/distribution_worker.rb | 25 +------- .../pubsubhubbub/raw_distribution_worker.rb | 15 +---- app/workers/pubsubhubbub/subscribe_worker.rb | 27 +------- app/workers/pubsubhubbub/unsubscribe_worker.rb | 8 +-- app/workers/remote_profile_update_worker.rb | 6 +- app/workers/salmon_worker.rb | 6 +- app/workers/scheduler/subscriptions_scheduler.rb | 10 +-- 13 files changed, 13 insertions(+), 289 deletions(-) (limited to 'app/workers') diff --git a/app/workers/after_remote_follow_request_worker.rb b/app/workers/after_remote_follow_request_worker.rb index 84eb6ade2..ce9c65834 100644 --- a/app/workers/after_remote_follow_request_worker.rb +++ b/app/workers/after_remote_follow_request_worker.rb @@ -5,27 +5,5 @@ class AfterRemoteFollowRequestWorker sidekiq_options queue: 'pull', retry: 5 - attr_reader :follow_request - - def perform(follow_request_id) - @follow_request = FollowRequest.find(follow_request_id) - process_follow_service if processing_required? - rescue ActiveRecord::RecordNotFound - true - end - - private - - def process_follow_service - follow_request.destroy - FollowService.new.call(follow_request.account, updated_account.acct) - end - - def processing_required? - !updated_account.nil? && !updated_account.locked? - end - - def updated_account - @_updated_account ||= FetchRemoteAccountService.new.call(follow_request.target_account.remote_url) - end + def perform(follow_request_id); end end diff --git a/app/workers/after_remote_follow_worker.rb b/app/workers/after_remote_follow_worker.rb index edab83f85..d9719f2bf 100644 --- a/app/workers/after_remote_follow_worker.rb +++ b/app/workers/after_remote_follow_worker.rb @@ -5,27 +5,5 @@ class AfterRemoteFollowWorker sidekiq_options queue: 'pull', retry: 5 - attr_reader :follow - - def perform(follow_id) - @follow = Follow.find(follow_id) - process_follow_service if processing_required? - rescue ActiveRecord::RecordNotFound - true - end - - private - - def process_follow_service - follow.destroy - FollowService.new.call(follow.account, updated_account.acct) - end - - def updated_account - @_updated_account ||= FetchRemoteAccountService.new.call(follow.target_account.remote_url) - end - - def processing_required? - !updated_account.nil? && updated_account.locked? - end + def perform(follow_id); end end diff --git a/app/workers/notification_worker.rb b/app/workers/notification_worker.rb index da1d6ab45..1c0f001cf 100644 --- a/app/workers/notification_worker.rb +++ b/app/workers/notification_worker.rb @@ -5,7 +5,5 @@ class NotificationWorker sidekiq_options queue: 'push', retry: 5 - def perform(xml, source_account_id, target_account_id) - SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id)) - end + def perform(xml, source_account_id, target_account_id); end end diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb index 978c3aba2..cf3bd8397 100644 --- a/app/workers/processing_worker.rb +++ b/app/workers/processing_worker.rb @@ -5,7 +5,5 @@ class ProcessingWorker sidekiq_options backtrace: true - def perform(account_id, body) - ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true) - end + def perform(account_id, body); end end diff --git a/app/workers/pubsubhubbub/confirmation_worker.rb b/app/workers/pubsubhubbub/confirmation_worker.rb index c0e7b677e..783a8c95f 100644 --- a/app/workers/pubsubhubbub/confirmation_worker.rb +++ b/app/workers/pubsubhubbub/confirmation_worker.rb @@ -2,81 +2,8 @@ class Pubsubhubbub::ConfirmationWorker include Sidekiq::Worker - include RoutingHelper sidekiq_options queue: 'push', retry: false - attr_reader :subscription, :mode, :secret, :lease_seconds - - def perform(subscription_id, mode, secret = nil, lease_seconds = nil) - @subscription = Subscription.find(subscription_id) - @mode = mode - @secret = secret - @lease_seconds = lease_seconds - process_confirmation - end - - private - - def process_confirmation - prepare_subscription - - callback_get_with_params - logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}" - - update_subscription - end - - def update_subscription - if successful_subscribe? - subscription.save! - elsif successful_unsubscribe? - subscription.destroy! - end - end - - def successful_subscribe? - subscribing? && response_matches_challenge? - end - - def successful_unsubscribe? - (unsubscribing? && response_matches_challenge?) || !subscription.confirmed? - end - - def response_matches_challenge? - @callback_response_body == challenge - end - - def subscribing? - mode == 'subscribe' - end - - def unsubscribing? - mode == 'unsubscribe' - end - - def callback_get_with_params - Request.new(:get, subscription.callback_url, params: callback_params).perform do |response| - @callback_response_body = response.body_with_limit - end - end - - def callback_params - { - 'hub.topic': account_url(subscription.account, format: :atom), - 'hub.mode': mode, - 'hub.challenge': challenge, - 'hub.lease_seconds': subscription.lease_seconds, - } - end - - def prepare_subscription - subscription.secret = secret - subscription.lease_seconds = lease_seconds - subscription.confirmed = true - end - - def challenge - @_challenge ||= SecureRandom.hex - end + def perform(subscription_id, mode, secret = nil, lease_seconds = nil); end end diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb index 619bfa48a..1260060bd 100644 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -2,80 +2,8 @@ class Pubsubhubbub::DeliveryWorker include Sidekiq::Worker - include RoutingHelper sidekiq_options queue: 'push', retry: 3, dead: false - sidekiq_retry_in do |count| - 5 * (count + 1) - end - - attr_reader :subscription, :payload - - def perform(subscription_id, payload) - @subscription = Subscription.find(subscription_id) - @payload = payload - process_delivery unless blocked_domain? - rescue => e - raise e.class, "Delivery failed for #{subscription&.callback_url}: #{e.message}", e.backtrace[0] - end - - private - - def process_delivery - callback_post_payload do |payload_delivery| - raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery - end - - subscription.touch(:last_successful_delivery_at) - end - - def callback_post_payload(&block) - request = Request.new(:post, subscription.callback_url, body: payload) - request.add_headers(headers) - request.perform(&block) - end - - def blocked_domain? - DomainBlock.blocked?(host) - end - - def host - Addressable::URI.parse(subscription.callback_url).normalized_host - end - - def headers - { - 'Content-Type' => 'application/atom+xml', - 'Link' => link_header, - }.merge(signature_headers.to_h) - end - - def link_header - LinkHeader.new([hub_link_header, self_link_header]).to_s - end - - def hub_link_header - [api_push_url, [%w(rel hub)]] - end - - def self_link_header - [account_url(subscription.account, format: :atom), [%w(rel self)]] - end - - def signature_headers - { 'X-Hub-Signature' => payload_signature } if subscription.secret? - end - - def payload_signature - "sha1=#{hmac_payload_digest}" - end - - def hmac_payload_digest - OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload) - end - - def response_successful?(payload_delivery) - payload_delivery.code > 199 && payload_delivery.code < 300 - end + def perform(subscription_id, payload); end end diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb index fed5e917d..75bac5d6f 100644 --- a/app/workers/pubsubhubbub/distribution_worker.rb +++ b/app/workers/pubsubhubbub/distribution_worker.rb @@ -5,28 +5,5 @@ class Pubsubhubbub::DistributionWorker sidekiq_options queue: 'push' - def perform(stream_entry_ids) - stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.hidden? } - - return if stream_entries.empty? - - @account = stream_entries.first.account - @subscriptions = active_subscriptions.to_a - - distribute_public!(stream_entries) - end - - private - - def distribute_public!(stream_entries) - @payload = OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.feed(@account, stream_entries)) - - Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription_id| - [subscription_id, @payload] - end - end - - def active_subscriptions - Subscription.where(account: @account).active.pluck(:id) - end + def perform(stream_entry_ids); end end diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb index 16962a623..ece9c80ac 100644 --- a/app/workers/pubsubhubbub/raw_distribution_worker.rb +++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb @@ -5,18 +5,5 @@ class Pubsubhubbub::RawDistributionWorker sidekiq_options queue: 'push' - def perform(xml, source_account_id) - @account = Account.find(source_account_id) - @subscriptions = active_subscriptions.to_a - - Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| - [subscription.id, xml] - end - end - - private - - def active_subscriptions - Subscription.where(account: @account).active.select('id, callback_url, domain') - end + def perform(xml, source_account_id); end end diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb index 2e176d1c1..b861b5e67 100644 --- a/app/workers/pubsubhubbub/subscribe_worker.rb +++ b/app/workers/pubsubhubbub/subscribe_worker.rb @@ -5,30 +5,5 @@ class Pubsubhubbub::SubscribeWorker sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false - sidekiq_retry_in do |count| - case count - when 0 - 30.minutes.seconds - when 1 - 2.hours.seconds - when 2 - 12.hours.seconds - else - 24.hours.seconds * (count - 2) - end - end - - sidekiq_retries_exhausted do |msg, _e| - account = Account.find(msg['args'].first) - Sidekiq.logger.error "PuSH subscription attempts for #{account.acct} exhausted. Unsubscribing" - ::UnsubscribeService.new.call(account) - end - - def perform(account_id) - account = Account.find(account_id) - logger.debug "PuSH re-subscribing to #{account.acct}" - ::SubscribeService.new.call(account) - rescue => e - raise e.class, "Subscribe failed for #{account&.acct}: #{e.message}", e.backtrace[0] - end + def perform(account_id); end end diff --git a/app/workers/pubsubhubbub/unsubscribe_worker.rb b/app/workers/pubsubhubbub/unsubscribe_worker.rb index a271715b7..0c1c263f6 100644 --- a/app/workers/pubsubhubbub/unsubscribe_worker.rb +++ b/app/workers/pubsubhubbub/unsubscribe_worker.rb @@ -5,11 +5,5 @@ class Pubsubhubbub::UnsubscribeWorker sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false - def perform(account_id) - account = Account.find(account_id) - logger.debug "PuSH unsubscribing from #{account.acct}" - ::UnsubscribeService.new.call(account) - rescue ActiveRecord::RecordNotFound - true - end + def perform(account_id); end end diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb index 03585ad2d..01e8daf8f 100644 --- a/app/workers/remote_profile_update_worker.rb +++ b/app/workers/remote_profile_update_worker.rb @@ -5,9 +5,5 @@ class RemoteProfileUpdateWorker sidekiq_options queue: 'pull' - def perform(account_id, body, resubscribe) - UpdateRemoteProfileService.new.call(body, Account.find(account_id), resubscribe) - rescue ActiveRecord::RecordNotFound - true - end + def perform(account_id, body, resubscribe); end end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index d37d40432..10200b06c 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -5,9 +5,5 @@ class SalmonWorker sidekiq_options backtrace: true - def perform(account_id, body) - ProcessInteractionService.new.call(body, Account.find(account_id)) - rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound - true - end + def perform(account_id, body); end end diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb index d5873bccb..6903cadc7 100644 --- a/app/workers/scheduler/subscriptions_scheduler.rb +++ b/app/workers/scheduler/subscriptions_scheduler.rb @@ -5,13 +5,5 @@ class Scheduler::SubscriptionsScheduler sidekiq_options unique: :until_executed, retry: 0 - def perform - Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id)) - end - - private - - def expiring_accounts - Account.expiring(1.day.from_now).partitioned - end + def perform; end end -- cgit From 406b46395d6f79e87b286585f6b6867374d198c1 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Sun, 7 Jul 2019 03:37:01 +0200 Subject: Fix URLs appearing twice in errors of ActivityPub::DeliveryWorker (#11231) --- app/lib/request.rb | 2 +- app/workers/activitypub/delivery_worker.rb | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'app/workers') diff --git a/app/lib/request.rb b/app/lib/request.rb index 5f7075a3c..322457ad7 100644 --- a/app/lib/request.rb +++ b/app/lib/request.rb @@ -59,7 +59,7 @@ class Request begin response = http_client.public_send(@verb, @url.to_s, @options.merge(headers: headers)) rescue => e - raise e.class, "#{e.message} on #{@url}", e.backtrace[0] + raise e.class, "#{e.message} on #{@url}", e.backtrace end begin diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 818fd8f5d..8b52b8e49 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -18,13 +18,15 @@ class ActivityPub::DeliveryWorker @source_account = Account.find(source_account_id) @inbox_url = inbox_url @host = Addressable::URI.parse(inbox_url).normalized_site + @performed = false perform_request - - failure_tracker.track_success! - rescue => e - failure_tracker.track_failure! - raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0] + ensure + if @performed + failure_tracker.track_success! + else + failure_tracker.track_failure! + end end private @@ -40,6 +42,8 @@ class ActivityPub::DeliveryWorker request_pool.with(@host) do |http_client| build_request(http_client).perform do |response| raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) + + @performed = true end end end -- cgit From 5d3feed191bcbe2769512119752b426108152fe9 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 10 Jul 2019 18:59:28 +0200 Subject: Refactor fetching of remote resources (#11251) --- app/helpers/jsonld_helper.rb | 47 ++++++----- app/lib/request.rb | 2 +- .../activitypub/fetch_remote_status_service.rb | 20 ++--- app/services/fetch_atom_service.rb | 93 --------------------- app/services/fetch_link_card_service.rb | 2 +- app/services/fetch_remote_account_service.rb | 2 +- app/services/fetch_remote_status_service.rb | 2 +- app/services/fetch_resource_service.rb | 68 +++++++++++++++ app/services/resolve_url_service.rb | 47 ++++------- app/workers/activitypub/delivery_worker.rb | 16 ++-- spec/services/fetch_atom_service_spec.rb | 96 ---------------------- spec/services/fetch_remote_account_service_spec.rb | 1 + spec/services/fetch_resource_service_spec.rb | 96 ++++++++++++++++++++++ spec/services/resolve_url_service_spec.rb | 44 ++-------- 14 files changed, 231 insertions(+), 305 deletions(-) delete mode 100644 app/services/fetch_atom_service.rb create mode 100644 app/services/fetch_resource_service.rb delete mode 100644 spec/services/fetch_atom_service_spec.rb create mode 100644 spec/services/fetch_resource_service_spec.rb (limited to 'app/workers') diff --git a/app/helpers/jsonld_helper.rb b/app/helpers/jsonld_helper.rb index 5b4011275..34a657e06 100644 --- a/app/helpers/jsonld_helper.rb +++ b/app/helpers/jsonld_helper.rb @@ -16,13 +16,15 @@ module JsonLdHelper # The url attribute can be a string, an array of strings, or an array of objects. # The objects could include a mimeType. Not-included mimeType means it's text/html. def url_to_href(value, preferred_type = nil) - single_value = if value.is_a?(Array) && !value.first.is_a?(String) - value.find { |link| preferred_type.nil? || ((link['mimeType'].presence || 'text/html') == preferred_type) } - elsif value.is_a?(Array) - value.first - else - value - end + single_value = begin + if value.is_a?(Array) && !value.first.is_a?(String) + value.find { |link| preferred_type.nil? || ((link['mimeType'].presence || 'text/html') == preferred_type) } + elsif value.is_a?(Array) + value.first + else + value + end + end if single_value.nil? || single_value.is_a?(String) single_value @@ -64,7 +66,9 @@ module JsonLdHelper def fetch_resource(uri, id, on_behalf_of = nil) unless id json = fetch_resource_without_id_validation(uri, on_behalf_of) + return unless json + uri = json['id'] end @@ -74,24 +78,26 @@ module JsonLdHelper def fetch_resource_without_id_validation(uri, on_behalf_of = nil, raise_on_temporary_error = false) build_request(uri, on_behalf_of).perform do |response| - unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error - raise Mastodon::UnexpectedResponseError, response - end + raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error + return body_to_json(response.body_with_limit) if response.code == 200 end + # If request failed, retry without doing it on behalf of a user return if on_behalf_of.nil? + build_request(uri).perform do |response| - unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error - raise Mastodon::UnexpectedResponseError, response - end + raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) || !raise_on_temporary_error + response.code == 200 ? body_to_json(response.body_with_limit) : nil end end def body_to_json(body, compare_id: nil) json = body.is_a?(String) ? Oj.load(body, mode: :strict) : body + return if compare_id.present? && json['id'] != compare_id + json rescue Oj::ParseError nil @@ -105,35 +111,34 @@ module JsonLdHelper end end - private - def response_successful?(response) (200...300).cover?(response.code) end def response_error_unsalvageable?(response) - (400...500).cover?(response.code) && response.code != 429 + response.code == 501 || ((400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)) end def build_request(uri, on_behalf_of = nil) - request = Request.new(:get, uri) - request.on_behalf_of(on_behalf_of) if on_behalf_of - request.add_headers('Accept' => 'application/activity+json, application/ld+json') - request + Request.new(:get, uri).tap do |request| + request.on_behalf_of(on_behalf_of) if on_behalf_of + request.add_headers('Accept' => 'application/activity+json, application/ld+json') + end end def load_jsonld_context(url, _options = {}, &_block) json = Rails.cache.fetch("jsonld:context:#{url}", expires_in: 30.days, raw: true) do request = Request.new(:get, url) request.add_headers('Accept' => 'application/ld+json') - request.perform do |res| raise JSON::LD::JsonLdError::LoadingDocumentFailed unless res.code == 200 && res.mime_type == 'application/ld+json' + res.body_with_limit end end doc = JSON::LD::API::RemoteDocument.new(url, json) + block_given? ? yield(doc) : doc end end diff --git a/app/lib/request.rb b/app/lib/request.rb index 322457ad7..1fd3f5190 100644 --- a/app/lib/request.rb +++ b/app/lib/request.rb @@ -41,7 +41,7 @@ class Request end def on_behalf_of(account, key_id_format = :acct, sign_with: nil) - raise ArgumentError unless account.local? + raise ArgumentError, 'account must be local' unless account&.local? @account = account @keypair = sign_with.present? ? OpenSSL::PKey::RSA.new(sign_with) : @account.keypair diff --git a/app/services/activitypub/fetch_remote_status_service.rb b/app/services/activitypub/fetch_remote_status_service.rb index 469821032..cf4f62899 100644 --- a/app/services/activitypub/fetch_remote_status_service.rb +++ b/app/services/activitypub/fetch_remote_status_service.rb @@ -5,18 +5,18 @@ class ActivityPub::FetchRemoteStatusService < BaseService # Should be called when uri has already been checked for locality def call(uri, id: true, prefetched_body: nil, on_behalf_of: nil) - @json = if prefetched_body.nil? - fetch_resource(uri, id, on_behalf_of) - else - body_to_json(prefetched_body, compare_id: id ? uri : nil) - end + @json = begin + if prefetched_body.nil? + fetch_resource(uri, id, on_behalf_of) + else + body_to_json(prefetched_body, compare_id: id ? uri : nil) + end + end - return unless supported_context? && expected_type? - - return if actor_id.nil? || !trustworthy_attribution?(@json['id'], actor_id) + return if !(supported_context? && expected_type?) || actor_id.nil? || !trustworthy_attribution?(@json['id'], actor_id) actor = ActivityPub::TagManager.instance.uri_to_resource(actor_id, Account) - actor = ActivityPub::FetchRemoteAccountService.new.call(actor_id, id: true) if actor.nil? || needs_update(actor) + actor = ActivityPub::FetchRemoteAccountService.new.call(actor_id, id: true) if actor.nil? || needs_update?(actor) return if actor.nil? || actor.suspended? @@ -46,7 +46,7 @@ class ActivityPub::FetchRemoteStatusService < BaseService equals_or_includes_any?(@json['type'], ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) end - def needs_update(actor) + def needs_update?(actor) actor.possibly_stale? end end diff --git a/app/services/fetch_atom_service.rb b/app/services/fetch_atom_service.rb deleted file mode 100644 index d6508a988..000000000 --- a/app/services/fetch_atom_service.rb +++ /dev/null @@ -1,93 +0,0 @@ -# frozen_string_literal: true - -class FetchAtomService < BaseService - include JsonLdHelper - - def call(url) - return if url.blank? - - result = process(url) - - # retry without ActivityPub - result ||= process(url) if @unsupported_activity - - result - rescue OpenSSL::SSL::SSLError => e - Rails.logger.debug "SSL error: #{e}" - nil - rescue HTTP::ConnectionError => e - Rails.logger.debug "HTTP ConnectionError: #{e}" - nil - end - - private - - def process(url, terminal = false) - @url = url - perform_request { |response| process_response(response, terminal) } - end - - def perform_request(&block) - accept = 'text/html' - accept = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", application/atom+xml, ' + accept unless @unsupported_activity - - Request.new(:get, @url).add_headers('Accept' => accept).perform(&block) - end - - def process_response(response, terminal = false) - return nil if response.code != 200 - - if response.mime_type == 'application/atom+xml' - [@url, { prefetched_body: response.body_with_limit }, :ostatus] - elsif ['application/activity+json', 'application/ld+json'].include?(response.mime_type) - body = response.body_with_limit - json = body_to_json(body) - if supported_context?(json) && equals_or_includes_any?(json['type'], ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) && json['inbox'].present? - [json['id'], { prefetched_body: body, id: true }, :activitypub] - elsif supported_context?(json) && expected_type?(json) - [json['id'], { prefetched_body: body, id: true }, :activitypub] - else - @unsupported_activity = true - nil - end - elsif !terminal - link_header = response['Link'] && parse_link_header(response) - - if link_header&.find_link(%w(rel alternate)) - process_link_headers(link_header) - elsif response.mime_type == 'text/html' - process_html(response) - end - end - end - - def expected_type?(json) - equals_or_includes_any?(json['type'], ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) - end - - def process_html(response) - page = Nokogiri::HTML(response.body_with_limit) - - json_link = page.xpath('//link[@rel="alternate"]').find { |link| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link['type']) } - atom_link = page.xpath('//link[@rel="alternate"]').find { |link| link['type'] == 'application/atom+xml' } - - result ||= process(json_link['href'], terminal: true) unless json_link.nil? || @unsupported_activity - result ||= process(atom_link['href'], terminal: true) unless atom_link.nil? - - result - end - - def process_link_headers(link_header) - json_link = link_header.find_link(%w(rel alternate), %w(type application/activity+json)) || link_header.find_link(%w(rel alternate), ['type', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"']) - atom_link = link_header.find_link(%w(rel alternate), %w(type application/atom+xml)) - - result ||= process(json_link.href, terminal: true) unless json_link.nil? || @unsupported_activity - result ||= process(atom_link.href, terminal: true) unless atom_link.nil? - - result - end - - def parse_link_header(response) - LinkHeader.parse(response['Link'].is_a?(Array) ? response['Link'].first : response['Link']) - end -end diff --git a/app/services/fetch_link_card_service.rb b/app/services/fetch_link_card_service.rb index 75fbd0e8c..4e75c370f 100644 --- a/app/services/fetch_link_card_service.rb +++ b/app/services/fetch_link_card_service.rb @@ -29,7 +29,7 @@ class FetchLinkCardService < BaseService end attach_card if @card&.persisted? - rescue HTTP::Error, Addressable::URI::InvalidURIError, Mastodon::HostValidationError, Mastodon::LengthValidationError => e + rescue HTTP::Error, OpenSSL::SSL::SSLError, Addressable::URI::InvalidURIError, Mastodon::HostValidationError, Mastodon::LengthValidationError => e Rails.logger.debug "Error fetching link #{@url}: #{e}" nil end diff --git a/app/services/fetch_remote_account_service.rb b/app/services/fetch_remote_account_service.rb index a7f95603d..3cd06e30f 100644 --- a/app/services/fetch_remote_account_service.rb +++ b/app/services/fetch_remote_account_service.rb @@ -3,7 +3,7 @@ class FetchRemoteAccountService < BaseService def call(url, prefetched_body = nil, protocol = :ostatus) if prefetched_body.nil? - resource_url, resource_options, protocol = FetchAtomService.new.call(url) + resource_url, resource_options, protocol = FetchResourceService.new.call(url) else resource_url = url resource_options = { prefetched_body: prefetched_body } diff --git a/app/services/fetch_remote_status_service.rb b/app/services/fetch_remote_status_service.rb index aac39dfd5..208dc7809 100644 --- a/app/services/fetch_remote_status_service.rb +++ b/app/services/fetch_remote_status_service.rb @@ -3,7 +3,7 @@ class FetchRemoteStatusService < BaseService def call(url, prefetched_body = nil, protocol = :ostatus) if prefetched_body.nil? - resource_url, resource_options, protocol = FetchAtomService.new.call(url) + resource_url, resource_options, protocol = FetchResourceService.new.call(url) else resource_url = url resource_options = { prefetched_body: prefetched_body } diff --git a/app/services/fetch_resource_service.rb b/app/services/fetch_resource_service.rb new file mode 100644 index 000000000..c0473f3ad --- /dev/null +++ b/app/services/fetch_resource_service.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +class FetchResourceService < BaseService + include JsonLdHelper + + ACCEPT_HEADER = 'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams", text/html' + + def call(url) + return if url.blank? + + process(url) + rescue HTTP::Error, OpenSSL::SSL::SSLError, Addressable::URI::InvalidURIError, Mastodon::HostValidationError, Mastodon::LengthValidationError => e + Rails.logger.debug "Error fetching resource #{@url}: #{e}" + nil + end + + private + + def process(url, terminal = false) + @url = url + + perform_request { |response| process_response(response, terminal) } + end + + def perform_request(&block) + Request.new(:get, @url).add_headers('Accept' => ACCEPT_HEADER).perform(&block) + end + + def process_response(response, terminal = false) + return nil if response.code != 200 + + if ['application/activity+json', 'application/ld+json'].include?(response.mime_type) + body = response.body_with_limit + json = body_to_json(body) + + [json['id'], { prefetched_body: body, id: true }, :activitypub] if supported_context?(json) && (equals_or_includes_any?(json['type'], ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) || expected_type?(json)) + elsif !terminal + link_header = response['Link'] && parse_link_header(response) + + if link_header&.find_link(%w(rel alternate)) + process_link_headers(link_header) + elsif response.mime_type == 'text/html' + process_html(response) + end + end + end + + def expected_type?(json) + equals_or_includes_any?(json['type'], ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) + end + + def process_html(response) + page = Nokogiri::HTML(response.body_with_limit) + json_link = page.xpath('//link[@rel="alternate"]').find { |link| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link['type']) } + + process(json_link['href'], terminal: true) unless json_link.nil? + end + + def process_link_headers(link_header) + json_link = link_header.find_link(%w(rel alternate), %w(type application/activity+json)) || link_header.find_link(%w(rel alternate), ['type', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"']) + + process(json_link.href, terminal: true) unless json_link.nil? + end + + def parse_link_header(response) + LinkHeader.parse(response['Link'].is_a?(Array) ? response['Link'].first : response['Link']) + end +end diff --git a/app/services/resolve_url_service.rb b/app/services/resolve_url_service.rb index f941b489a..80381c16b 100644 --- a/app/services/resolve_url_service.rb +++ b/app/services/resolve_url_service.rb @@ -4,64 +4,49 @@ class ResolveURLService < BaseService include JsonLdHelper include Authorization - attr_reader :url - def call(url, on_behalf_of: nil) - @url = url + @url = url @on_behalf_of = on_behalf_of - return process_local_url if local_url? - - process_url unless fetched_atom_feed.nil? + if local_url? + process_local_url + elsif !fetched_resource.nil? + process_url + end end private def process_url if equals_or_includes_any?(type, ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) - FetchRemoteAccountService.new.call(atom_url, body, protocol) + FetchRemoteAccountService.new.call(resource_url, body, protocol) elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) - FetchRemoteStatusService.new.call(atom_url, body, protocol) + FetchRemoteStatusService.new.call(resource_url, body, protocol) end end - def fetched_atom_feed - @_fetched_atom_feed ||= FetchAtomService.new.call(url) + def fetched_resource + @fetched_resource ||= FetchResourceService.new.call(@url) end - def atom_url - fetched_atom_feed.first + def resource_url + fetched_resource.first end def body - fetched_atom_feed.second[:prefetched_body] + fetched_resource.second[:prefetched_body] end def protocol - fetched_atom_feed.third + fetched_resource.third end def type return json_data['type'] if protocol == :activitypub - - case xml_root - when 'feed' - 'Person' - when 'entry' - 'Note' - end end def json_data - @_json_data ||= body_to_json(body) - end - - def xml_root - xml_data.root.name - end - - def xml_data - @_xml_data ||= Nokogiri::XML(body, nil, 'utf-8') + @json_data ||= body_to_json(body) end def local_url? @@ -83,10 +68,10 @@ class ResolveURLService < BaseService def check_local_status(status) return if status.nil? + authorize_with @on_behalf_of, status, :show? status rescue Mastodon::NotPermittedError - # Do not disclose the existence of status the user is not authorized to see nil end end diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 8b52b8e49..5457d9d4b 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -2,6 +2,7 @@ class ActivityPub::DeliveryWorker include Sidekiq::Worker + include JsonLdHelper STOPLIGHT_FAILURE_THRESHOLD = 10 STOPLIGHT_COOLDOWN = 60 @@ -32,9 +33,10 @@ class ActivityPub::DeliveryWorker private def build_request(http_client) - request = Request.new(:post, @inbox_url, body: @json, http_client: http_client) - request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) - request.add_headers(HEADERS) + Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request| + request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) + request.add_headers(HEADERS) + end end def perform_request @@ -53,14 +55,6 @@ class ActivityPub::DeliveryWorker .run end - def response_successful?(response) - (200...300).cover?(response.code) - end - - def response_error_unsalvageable?(response) - response.code == 501 || ((400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)) - end - def failure_tracker @failure_tracker ||= DeliveryFailureTracker.new(@inbox_url) end diff --git a/spec/services/fetch_atom_service_spec.rb b/spec/services/fetch_atom_service_spec.rb deleted file mode 100644 index 495540004..000000000 --- a/spec/services/fetch_atom_service_spec.rb +++ /dev/null @@ -1,96 +0,0 @@ -require 'rails_helper' - -RSpec.describe FetchAtomService, type: :service do - describe '#call' do - let(:url) { 'http://example.com' } - subject { FetchAtomService.new.call(url) } - - context 'url is blank' do - let(:url) { '' } - it { is_expected.to be_nil } - end - - context 'request failed' do - before do - WebMock.stub_request(:get, url).to_return(status: 500, body: '', headers: {}) - end - - it { is_expected.to be_nil } - end - - context 'raise OpenSSL::SSL::SSLError' do - before do - allow(Request).to receive_message_chain(:new, :add_headers, :perform).and_raise(OpenSSL::SSL::SSLError) - end - - it 'output log and return nil' do - expect_any_instance_of(ActiveSupport::Logger).to receive(:debug).with('SSL error: OpenSSL::SSL::SSLError') - is_expected.to be_nil - end - end - - context 'raise HTTP::ConnectionError' do - before do - allow(Request).to receive_message_chain(:new, :add_headers, :perform).and_raise(HTTP::ConnectionError) - end - - it 'output log and return nil' do - expect_any_instance_of(ActiveSupport::Logger).to receive(:debug).with('HTTP ConnectionError: HTTP::ConnectionError') - is_expected.to be_nil - end - end - - context 'response success' do - let(:body) { '' } - let(:headers) { { 'Content-Type' => content_type } } - let(:json) { - { id: 1, - '@context': ActivityPub::TagManager::CONTEXT, - type: 'Note', - }.to_json - } - - before do - WebMock.stub_request(:get, url).to_return(status: 200, body: body, headers: headers) - end - - context 'content type is application/atom+xml' do - let(:content_type) { 'application/atom+xml' } - - it { is_expected.to eq [url, { :prefetched_body => "" }, :ostatus] } - end - - context 'content_type is activity+json' do - let(:content_type) { 'application/activity+json; charset=utf-8' } - let(:body) { json } - - it { is_expected.to eq [1, { prefetched_body: body, id: true }, :activitypub] } - end - - context 'content_type is ld+json with profile' do - let(:content_type) { 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' } - let(:body) { json } - - it { is_expected.to eq [1, { prefetched_body: body, id: true }, :activitypub] } - end - - before do - WebMock.stub_request(:get, url).to_return(status: 200, body: body, headers: headers) - WebMock.stub_request(:get, 'http://example.com/foo').to_return(status: 200, body: json, headers: { 'Content-Type' => 'application/activity+json' }) - end - - context 'has link header' do - let(:headers) { { 'Link' => '; rel="alternate"; type="application/activity+json"', } } - - it { is_expected.to eq [1, { prefetched_body: json, id: true }, :activitypub] } - end - - context 'content type is text/html' do - let(:content_type) { 'text/html' } - let(:body) { '' } - - it { is_expected.to eq [1, { prefetched_body: json, id: true }, :activitypub] } - end - end - end -end diff --git a/spec/services/fetch_remote_account_service_spec.rb b/spec/services/fetch_remote_account_service_spec.rb index 37e9910d4..ee7325be2 100644 --- a/spec/services/fetch_remote_account_service_spec.rb +++ b/spec/services/fetch_remote_account_service_spec.rb @@ -4,6 +4,7 @@ RSpec.describe FetchRemoteAccountService, type: :service do let(:url) { 'https://example.com/alice' } let(:prefetched_body) { nil } let(:protocol) { :ostatus } + subject { FetchRemoteAccountService.new.call(url, prefetched_body, protocol) } let(:actor) do diff --git a/spec/services/fetch_resource_service_spec.rb b/spec/services/fetch_resource_service_spec.rb new file mode 100644 index 000000000..17c192c44 --- /dev/null +++ b/spec/services/fetch_resource_service_spec.rb @@ -0,0 +1,96 @@ +require 'rails_helper' + +RSpec.describe FetchResourceService, type: :service do + let!(:representative) { Fabricate(:account) } + + describe '#call' do + let(:url) { 'http://example.com' } + subject { described_class.new.call(url) } + + context 'url is blank' do + let(:url) { '' } + it { is_expected.to be_nil } + end + + context 'request failed' do + before do + WebMock.stub_request(:get, url).to_return(status: 500, body: '', headers: {}) + end + + it { is_expected.to be_nil } + end + + context 'raise OpenSSL::SSL::SSLError' do + before do + allow(Request).to receive_message_chain(:new, :add_headers, :perform).and_raise(OpenSSL::SSL::SSLError) + end + + it 'return nil' do + is_expected.to be_nil + end + end + + context 'raise HTTP::ConnectionError' do + before do + allow(Request).to receive_message_chain(:new, :add_headers, :perform).and_raise(HTTP::ConnectionError) + end + + it 'return nil' do + is_expected.to be_nil + end + end + + context 'response success' do + let(:body) { '' } + let(:headers) { { 'Content-Type' => content_type } } + let(:json) { + { id: 1, + '@context': ActivityPub::TagManager::CONTEXT, + type: 'Note', + }.to_json + } + + before do + WebMock.stub_request(:get, url).to_return(status: 200, body: body, headers: headers) + end + + context 'content type is application/atom+xml' do + let(:content_type) { 'application/atom+xml' } + + it { is_expected.to eq nil } + end + + context 'content_type is activity+json' do + let(:content_type) { 'application/activity+json; charset=utf-8' } + let(:body) { json } + + it { is_expected.to eq [1, { prefetched_body: body, id: true }, :activitypub] } + end + + context 'content_type is ld+json with profile' do + let(:content_type) { 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"' } + let(:body) { json } + + it { is_expected.to eq [1, { prefetched_body: body, id: true }, :activitypub] } + end + + before do + WebMock.stub_request(:get, url).to_return(status: 200, body: body, headers: headers) + WebMock.stub_request(:get, 'http://example.com/foo').to_return(status: 200, body: json, headers: { 'Content-Type' => 'application/activity+json' }) + end + + context 'has link header' do + let(:headers) { { 'Link' => '; rel="alternate"; type="application/activity+json"', } } + + it { is_expected.to eq [1, { prefetched_body: json, id: true }, :activitypub] } + end + + context 'content type is text/html' do + let(:content_type) { 'text/html' } + let(:body) { '' } + + it { is_expected.to eq [1, { prefetched_body: json, id: true }, :activitypub] } + end + end + end +end diff --git a/spec/services/resolve_url_service_spec.rb b/spec/services/resolve_url_service_spec.rb index 7bb5d1940..aa4204637 100644 --- a/spec/services/resolve_url_service_spec.rb +++ b/spec/services/resolve_url_service_spec.rb @@ -6,48 +6,14 @@ describe ResolveURLService, type: :service do subject { described_class.new } describe '#call' do - it 'returns nil when there is no atom url' do - url = 'http://example.com/missing-atom' + it 'returns nil when there is no resource url' do + url = 'http://example.com/missing-resource' service = double - allow(FetchAtomService).to receive(:new).and_return service - allow(service).to receive(:call).with(url).and_return(nil) - - result = subject.call(url) - expect(result).to be_nil - end - - it 'fetches remote accounts for feed types' do - url = 'http://example.com/atom-feed' - service = double - allow(FetchAtomService).to receive(:new).and_return service - feed_url = 'http://feed-url' - feed_content = 'contents' - allow(service).to receive(:call).with(url).and_return([feed_url, { prefetched_body: feed_content }]) - - account_service = double - allow(FetchRemoteAccountService).to receive(:new).and_return(account_service) - allow(account_service).to receive(:call) - - _result = subject.call(url) - expect(account_service).to have_received(:call).with(feed_url, feed_content, nil) - end - - it 'fetches remote statuses for entry types' do - url = 'http://example.com/atom-entry' - service = double - allow(FetchAtomService).to receive(:new).and_return service - feed_url = 'http://feed-url' - feed_content = 'contents' - allow(service).to receive(:call).with(url).and_return([feed_url, { prefetched_body: feed_content }]) - - account_service = double - allow(FetchRemoteStatusService).to receive(:new).and_return(account_service) - allow(account_service).to receive(:call) - - _result = subject.call(url) + allow(FetchResourceService).to receive(:new).and_return service + allow(service).to receive(:call).with(url).and_return(nil) - expect(account_service).to have_received(:call).with(feed_url, feed_content, nil) + expect(subject.call(url)).to be_nil end end end -- cgit From 402302776c82c3853e723fe0c0c4dc99c69da3d9 Mon Sep 17 00:00:00 2001 From: "han@highemelry" Date: Sat, 13 Jul 2019 01:46:21 +0900 Subject: Change the retry limit in error of web push notification (#11292) - Change the maximum count of retry for web push notification (Default -> 5). - In case of high load of subscribe server, the retries will be repeated many times. - Because the retries occupy the default queue, maximum retry count should be reduced. --- app/workers/web/push_notification_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb index 8e8a35973..901043975 100644 --- a/app/workers/web/push_notification_worker.rb +++ b/app/workers/web/push_notification_worker.rb @@ -3,7 +3,7 @@ class Web::PushNotificationWorker include Sidekiq::Worker - sidekiq_options backtrace: true + sidekiq_options backtrace: true, retry: 5 def perform(subscription_id, notification_id) subscription = ::Web::PushSubscription.find(subscription_id) -- cgit From b3f44aa186487b4f93f3a457607870c90caaf5df Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 15 Jul 2019 07:50:14 +0200 Subject: Add periodic removal of older thumbnails for preview cards (#11304) --- app/workers/maintenance/uncache_preview_worker.rb | 18 ++++++++++++++++++ .../scheduler/preview_cards_cleanup_scheduler.rb | 22 ++++++++++++++++++++++ config/sidekiq.yml | 3 +++ 3 files changed, 43 insertions(+) create mode 100644 app/workers/maintenance/uncache_preview_worker.rb create mode 100644 app/workers/scheduler/preview_cards_cleanup_scheduler.rb (limited to 'app/workers') diff --git a/app/workers/maintenance/uncache_preview_worker.rb b/app/workers/maintenance/uncache_preview_worker.rb new file mode 100644 index 000000000..810ffd8cc --- /dev/null +++ b/app/workers/maintenance/uncache_preview_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class Maintenance::UncachePreviewWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull' + + def perform(preview_card_id) + preview_card = PreviewCard.find(preview_card_id) + + return if preview_card.image.blank? + + preview_card.image.destroy + preview_card.save + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/scheduler/preview_cards_cleanup_scheduler.rb b/app/workers/scheduler/preview_cards_cleanup_scheduler.rb new file mode 100644 index 000000000..2b38792f0 --- /dev/null +++ b/app/workers/scheduler/preview_cards_cleanup_scheduler.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Scheduler::PreviewCardsCleanupScheduler + include Sidekiq::Worker + + sidekiq_options unique: :until_executed, retry: 0 + + def perform + Maintenance::UncachePreviewWorker.push_bulk(recent_link_preview_cards.pluck(:id)) + Maintenance::UncachePreviewWorker.push_bulk(older_preview_cards.pluck(:id)) + end + + private + + def recent_link_preview_cards + PreviewCard.where(type: :link).where('updated_at < ?', 1.month.ago) + end + + def older_preview_cards + PreviewCard.where('updated_at < ?', 6.months.ago) + end +end diff --git a/config/sidekiq.yml b/config/sidekiq.yml index a16dea967..5c652792c 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -27,6 +27,9 @@ ip_cleanup_scheduler: cron: '<%= Random.rand(0..59) %> <%= Random.rand(3..5) %> * * *' class: Scheduler::IpCleanupScheduler + preview_cards_cleanup_scheduler: + cron: '<%= Random.rand(0..59) %> <%= Random.rand(3..5) %> * * *' + class: Scheduler::PreviewCardsCleanupScheduler email_scheduler: cron: '0 10 * * 2' class: Scheduler::EmailScheduler -- cgit