diff options
Diffstat (limited to 'app/services')
39 files changed, 933 insertions, 179 deletions
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb new file mode 100644 index 000000000..ef54321de --- /dev/null +++ b/app/services/activitypub/fetch_collection_items_service.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +class ActivityPub::FetchCollectionItemsService < BaseService + include JsonLdHelper + + COOLDOWN = 30.minutes + + # Fetches objects in a collection from a URI or hash and queues them for processing. + # @param collection [Hash, String] Collection hash or URI + # @param account [Account] Owner of the collection + # @param page_limit [Integer] (10) Maximum number of pages to fetch from the collection. + # @param item_limit [Integer] (100) Maximum number of items to fetch from the collection. + # @option options [Boolean] :every_page (false) Whether to fetch every page in the collection, + # even if its items have been previously fetched. By default, fetching will stop if all the + # items on any page have already been fetched. + # @option options [Boolean] :look_ahead (false) Whether to check the next page for unfetched + # items if the current page's items have been previously fetched. If there are unfetched + # items on the next page, fetching will continue. + # @option options [Boolean] :skip_cooldown (false) Skip the fetch cooldown period on the a + # collection URI (e.g., for account migration). + # @option options [Boolean] :include_boosts (false) Whether to skip boosts. Including these + # will cause a LOT of server traffic. + # @return [void] + # @raise [Mastodon::RaceConditionError] Collection is already being fetched. + # @raise [Mastodon::UnexpectedResponseError] Server returned an error while fetching a page. + def call(collection, account, page_limit: 10, item_limit: 100, **options) + uri = value_or_id(collection) + return if uri.blank? || ActivityPub::TagManager.instance.local_uri?(uri) + + uri = collection['partOf'] if collection.is_a?(Hash) && collection['partOf'].present? + + @account = account + @account = account_from_uri(uri) if @account.blank? + set_fetch_account + + return if !options[:skip_cooldown] && Redis.current.get("fetch_collection_cooldown:#{uri}") + + collection = fetch_collection(collection) + return if collection.blank? + + if @account.blank? + @account = account_from_uri(collection['partOf'].presence || collection['id']) + set_fetch_account + end + + fetch_collection_pages(collection, page_limit, item_limit, **options) + end + + private + + def lock_options(uri) + { redis: Redis.current, key: "fetch_collection:#{uri}" } + end + + def set_fetch_account + @on_behalf_of = @account.present? ? @account.followers.local.random.first : nil + end + + def account_from_uri(uri) + ActivityPub::TagManager.instance.uri_to_resource(uri, Account) + end + + def account_id_from_uri(uri) + return if uri.blank? + + Rails.cache.fetch("account_id_from_uri:#{uri}", expires_in: 10.minutes) do + account_from_uri(uri)&.id + end + end + + def valid_item?(item) + item.is_a?(Hash) && + !invalid_uri?(item['id']) && + (item['attributedTo'].present? || item['actor'].present?) && ( + item['object'].blank? || item['type'] == 'Create' && !invalid_uri?(value_or_id(item['object'])) + ) + end + + def uri_with_account_id(item) + object = item['object'].presence || item + [value_or_id(object), object.is_a?(Hash) ? account_id_from_uri(object['attributedTo']) : account_id_from_uri(item['actor'])] + end + + def invalid_uri?(uri) + unsupported_uri_scheme?(uri) || !uri_allowed?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) + end + + def fetch_collection(collection_or_uri) + return (collection_or_uri['id'].present? ? collection_or_uri : nil) if collection_or_uri.is_a?(Hash) + return if !collection_or_uri.is_a?(String) || invalid_origin?(collection_or_uri) + + fetch_resource_without_id_validation(collection_or_uri, @on_behalf_of, true) + end + + def fetch_collection_pages(collection, page_limit, item_limit, **options) + uri = collection['partOf'].presence || collection['id'] + cooldown_key = "fetch_collection_cooldown:#{uri}" + + return if !options[:skip_cooldown] && Redis.current.get(cooldown_key) + + Redis.current.set(cooldown_key, 1, ex: COOLDOWN) + + RedisLock.acquire(lock_options(uri)) do |lock| + raise Mastodon::RaceConditionError unless lock.acquired? + + page = CollectionPage.find_or_create_by(uri: uri, account: @account) + every_page = options[:every_page] + + if page.next.present? + collection = fetch_collection(page.next) + fetch_collection_items(collection, page, page_limit, item_limit, **options) + every_page = false + end + + uri = collection['first'].presence || collection['id'] + page.update!(next: uri) + collection = fetch_collection(uri) if collection['id'] != uri + fetch_collection_items(collection, page, page_limit, item_limit, **options.merge({ every_page: every_page })) + end + end + + def fetch_collection_items(collection, page, page_limit, item_limit, **options) + page_count = 0 + item_count = 0 + seen_pages = Set[page.next] + have_items = false + + while collection.present? && collection['type'].present? + batch = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + break unless batch.is_a?(Array) + + batch_size = [batch.count, item_limit - item_count].min + batch = batch.take(batch_size).select { |item| valid_item?(item) }.map { |item| uri_with_account_id(item) } + result = CollectionItem.import([:uri, :account_id], batch, validate: false, on_duplicate_key_ignore: true) + + if !options[:every_page] && result.ids.blank? + break if have_items || !options[:look_ahead] + + have_items = true + elsif have_items + have_items = false + end + + item_count += result.ids.count + page_count += 1 + + next_page = collection['next'] + break unless item_count < item_limit && page_count < page_limit && next_page.present? + break if seen_pages.include?(next_page) + + sleep [page_count.to_f / 5, 1].min + + seen_pages << next_page + page.update!(next: next_page) + collection = fetch_collection(next_page) + end + + page.delete + ActivityPub::ProcessCollectionItemsWorker.perform_async + end +end diff --git a/app/services/activitypub/fetch_featured_collection_service.rb b/app/services/activitypub/fetch_featured_collection_service.rb index 2c2770466..0a20f5edc 100644 --- a/app/services/activitypub/fetch_featured_collection_service.rb +++ b/app/services/activitypub/fetch_featured_collection_service.rb @@ -22,9 +22,10 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService private def process_items(items) + first_local_follower = @account.followers.local.random.first status_ids = items.map { |item| value_or_id(item) } .reject { |uri| ActivityPub::TagManager.instance.local_uri?(uri) } - .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri) } + .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: first_local_follower) } .compact .select { |status| status.account_id == @account.id } .map(&:id) @@ -43,7 +44,7 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService StatusPin.where(account: @account, status_id: to_remove).delete_all unless to_remove.empty? to_add.each do |status_id| - StatusPin.create!(account: @account, status_id: status_id) + StatusPin.create(account: @account, status_id: status_id) end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 8cb309e52..e36ca9f39 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -1,49 +1,27 @@ # frozen_string_literal: true class ActivityPub::FetchRepliesService < BaseService - include JsonLdHelper - - def call(parent_status, collection_or_uri, allow_synchronous_requests = true) + def call(parent_status, collection, **options) @account = parent_status.account - @allow_synchronous_requests = allow_synchronous_requests - - @items = collection_items(collection_or_uri) - return if @items.nil? - - FetchReplyWorker.push_bulk(filtered_replies) - - @items + fetch_collection_items(collection, **options) + rescue ActiveRecord::RecordNotFound + nil end private - def collection_items(collection_or_uri) - collection = fetch_collection(collection_or_uri) - return unless collection.is_a?(Hash) - - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return unless @allow_synchronous_requests - return if invalid_origin?(collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, nil, true) - end - - def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. - - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(5) + def fetch_collection_items(collection, **options) + ActivityPub::FetchCollectionItemsService.new.call( + collection, + @account, + page_limit: 1, + item_limit: 20, + **options + ) + rescue Mastodon::RaceConditionError, Mastodon::UnexpectedResponseError + collection_uri = collection.is_a?(Hash) ? collection['id'] : collection + return unless collection_uri.present? && collection_uri.is_a?(String) + + ActivityPub::FetchRepliesWorker.perform_async(@account.id, collection_uri) end end diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb index 85b915ec6..7f17e460c 100644 --- a/app/services/activitypub/process_account_service.rb +++ b/app/services/activitypub/process_account_service.rb @@ -35,12 +35,13 @@ class ActivityPub::ProcessAccountService < BaseService return if @account.nil? after_protocol_change! if protocol_changed? - after_key_change! if key_changed? && !@options[:signed_with_known_key] clear_tombstones! if key_changed? + return after_key_change! if key_changed? && !@options[:signed_with_known_key] unless @options[:only_key] check_featured_collection! if @account.featured_collection_url.present? check_links! unless @account.fields.empty? + process_sync end @account @@ -86,6 +87,11 @@ class ActivityPub::ProcessAccountService < BaseService @account.also_known_as = as_array(@json['alsoKnownAs'] || []).map { |item| value_or_id(item) } @account.actor_type = actor_type @account.discoverable = @json['discoverable'] || false + @account.require_dereference = @json['requireDereference'] || false + @account.show_replies = @json['showReplies'] || true + @account.show_unlisted = @json['showUnlisted'] || true + @account.private = @json['private'] || false + @account.require_auth = @json['require_auth'] || false end def set_fetchable_attributes! @@ -104,7 +110,8 @@ class ActivityPub::ProcessAccountService < BaseService end def after_key_change! - RefollowWorker.perform_async(@account.id) + ResetAccountWorker.perform_async(@account.id) + nil end def check_featured_collection! @@ -288,4 +295,8 @@ class ActivityPub::ProcessAccountService < BaseService @account.identity_proofs.where(provider: provider, provider_username: provider_username).find_or_create_by(provider: provider, provider_username: provider_username, token: token) end + + def process_sync + ActivityPub::SyncAccountWorker.perform_async(@account.id) + end end diff --git a/app/services/activitypub/process_collection_items_service.rb b/app/services/activitypub/process_collection_items_service.rb new file mode 100644 index 000000000..9c30d81e9 --- /dev/null +++ b/app/services/activitypub/process_collection_items_service.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class ActivityPub::ProcessCollectionItemsService < BaseService + def call(account_id, on_behalf_of) + RedisLock.acquire(lock_options(account_id)) do |lock| + if lock.acquired? + CollectionItem.unprocessed.where(account_id: account_id).find_each do |item| + # Avoid failing servers holding up the rest of the queue. + next if item.retries.positive? && rand(3).positive? + + begin + FetchRemoteStatusService.new.call(item.uri, nil, on_behalf_of) + rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound + nil + rescue HTTP::TimeoutError + item.increment!(:retries) + end + + item.update!(processed: true) if item.retries.zero? || item.retries > 4 + end + end + end + end + + private + + def lock_options(account_id) + { redis: Redis.current, key: "process_collection_items:#{account_id}" } + end +end diff --git a/app/services/after_block_domain_from_account_service.rb b/app/services/after_block_domain_from_account_service.rb index 89d007c1c..30f925bda 100644 --- a/app/services/after_block_domain_from_account_service.rb +++ b/app/services/after_block_domain_from_account_service.rb @@ -11,6 +11,7 @@ class AfterBlockDomainFromAccountService < BaseService @domain = domain clear_notifications! + defederate_from_domain! remove_follows! reject_existing_followers! reject_pending_follow_requests! @@ -18,6 +19,10 @@ class AfterBlockDomainFromAccountService < BaseService private + def defederate_from_domain! + DefederateAccountService.new.call(@account, domain) + end + def remove_follows! @account.active_relationships.where(target_account: Account.where(domain: @domain)).includes(:target_account).reorder(nil).find_each do |follow| UnfollowService.new.call(@account, follow.target_account) diff --git a/app/services/after_block_service.rb b/app/services/after_block_service.rb index 314919df8..3ee7e2e56 100644 --- a/app/services/after_block_service.rb +++ b/app/services/after_block_service.rb @@ -8,6 +8,9 @@ class AfterBlockService < BaseService clear_home_feed! clear_notifications! clear_conversations! + + defederate_interactions! + unlink_interactions! end private @@ -23,4 +26,27 @@ class AfterBlockService < BaseService def clear_notifications! Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all end + + def unlink_interactions! + @target_account.statuses.where(in_reply_to_account_id: @account.id).in_batches.update_all(in_reply_to_account_id: nil) + @target_account.mentions.where(account_id: @account.id).in_batches.destroy_all + end + + def defederate_interactions! + defederate_statuses!(@account.statuses.where(in_reply_to_account_id: @target_account.id)) + defederate_statuses!(@account.statuses.joins(:mentions).where(mentions: { account_id: @target_account.id })) + defederate_statuses!(@account.statuses.joins(:reblog).where(reblogs_statuses: { account_id: @target_account.id })) + defederate_favourites! + end + + def defederate_statuses!(statuses) + statuses.find_each { |status| RemovalWorker.perform_async(status.id, unpublish: true, blocking: @target_account.id) } + end + + def defederate_favourites! + favourites = @account.favourites.joins(:status).where(statuses: { account_id: @target_account.id }) + favourites.pluck(:status_id).each do |status_id| + UnfavouriteWorker.perform_async(@account.id, status_id) + end + end end diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index 707672ee0..ef68dc5bc 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -73,18 +73,12 @@ class BatchedRemoveStatusService < BaseService redis.pipelined do redis.publish('timeline:public', payload) - if status.local? - redis.publish('timeline:public:local', payload) - else - redis.publish('timeline:public:remote', payload) - end + redis.publish('timeline:public:local', payload) if status.local? + redis.publish('timeline:public:remote', payload) if status.media_attachments.any? redis.publish('timeline:public:media', payload) - if status.local? - redis.publish('timeline:public:local:media', payload) - else - redis.publish('timeline:public:remote:media', payload) - end + redis.publish('timeline:public:local:media', payload) if status.local? + redis.publish('timeline:public:remote:media', payload) end @tags[status.id].each do |hashtag| diff --git a/app/services/block_domain_service.rb b/app/services/block_domain_service.rb index 1cf3382b3..98af0fdee 100644 --- a/app/services/block_domain_service.rb +++ b/app/services/block_domain_service.rb @@ -23,6 +23,7 @@ class BlockDomainService < BaseService if domain_block.silence? silence_accounts! elsif domain_block.suspend? + DefederateDomainService.new.call(domain_block.domain) suspend_accounts! end diff --git a/app/services/block_service.rb b/app/services/block_service.rb index 266a0f4b9..0b8ecd3e0 100644 --- a/app/services/block_service.rb +++ b/app/services/block_service.rb @@ -3,16 +3,16 @@ class BlockService < BaseService include Payloadable - def call(account, target_account) + def call(account, target_account, softblock: false) return if account.id == target_account.id - UnfollowService.new.call(account, target_account) if account.following?(target_account) - UnfollowService.new.call(target_account, account) if target_account.following?(account) + UnfollowService.new.call(account, target_account, force: softblock) if softblock || account.following?(target_account) + UnfollowService.new.call(target_account, account, force: softblock) if softblock || target_account.following?(account) RejectFollowService.new.call(target_account, account) if target_account.requested?(account) block = account.block!(target_account) - BlockWorker.perform_async(account.id, target_account.id) + BlockWorker.perform_async(account.id, target_account.id) unless softblock create_notification(block) if !target_account.local? && target_account.activitypub? block end diff --git a/app/services/concerns/payloadable.rb b/app/services/concerns/payloadable.rb index 3e45570c3..ba94539c8 100644 --- a/app/services/concerns/payloadable.rb +++ b/app/services/concerns/payloadable.rb @@ -15,6 +15,6 @@ module Payloadable end def signing_enabled? - ENV['AUTHORIZED_FETCH'] != 'true' && !Rails.configuration.x.whitelist_mode + true end end diff --git a/app/services/defederate_account_serivice.rb b/app/services/defederate_account_serivice.rb new file mode 100644 index 000000000..5d9cc1597 --- /dev/null +++ b/app/services/defederate_account_serivice.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +class DefederateAccountService < BaseService + include Payloadable + + def call(account, domains) + @account = account + @domains = domains + + return if account.blank? || !account.local? || domains.blank? + + distribute_delete_actor! + end + + private + + def distribute_delete_actor! + ActivityPub::DeliveryWorker.push_bulk(delivery_inboxes) do |inbox_url| + [delete_actor_json, @account.id, inbox_url] + end + + ActivityPub::LowPriorityDeliveryWorker.push_bulk(low_priority_delivery_inboxes) do |inbox_url| + [delete_actor_json, @account.id, inbox_url] + end + end + + def delete_actor_json + @delete_actor_json ||= Oj.dump(serialize_payload(@account, ActivityPub::DeleteActorSerializer, signer: @account)) + end + + def delivery_inboxes + @delivery_inboxes ||= @account.followers.where(domain: @domains).inboxes + end + + def low_priority_delivery_inboxes + Account.where(domain: @domains).inboxes - delivery_inboxes + end +end diff --git a/app/services/defederate_domain_service.rb b/app/services/defederate_domain_service.rb new file mode 100644 index 000000000..d40f88e3f --- /dev/null +++ b/app/services/defederate_domain_service.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class DefederateDomainService < BaseService + def call(domains) + return if domains.blank? + + Account.local.find_each do |account| + DefederateAccountService.new.call(account, domains) + end + end +end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 6fa98ce12..1fa8b2520 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -3,10 +3,11 @@ class FanOutOnWriteService < BaseService # Push a status into home and mentions feeds # @param [Status] status - def call(status) + def call(status, only_to_self: false) raise Mastodon::RaceConditionError if status.visibility.nil? deliver_to_self(status) if status.account.local? + return if only_to_self || !status.published? if status.direct_visibility? deliver_to_mentioned_followers(status) @@ -14,18 +15,21 @@ class FanOutOnWriteService < BaseService deliver_to_own_conversation(status) elsif status.limited_visibility? deliver_to_mentioned_followers(status) + deliver_to_lists(status) else deliver_to_followers(status) deliver_to_lists(status) end - return if status.account.silenced? || !status.public_visibility? + return if status.account.silenced? return if status.reblog? && !Setting.show_reblogs_in_public_timelines - render_anonymous_payload(status) - - deliver_to_hashtags(status) + if status.distributable? + render_anonymous_payload(status) + deliver_to_hashtags(status) + end + return unless status.public_visibility? return if status.reply? && status.in_reply_to_account_id != status.account_id && !Setting.show_replies_in_public_timelines deliver_to_public(status) @@ -87,29 +91,23 @@ class FanOutOnWriteService < BaseService def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - Redis.current.publish('timeline:public', @payload) - if status.local? - Redis.current.publish('timeline:public:local', @payload) - else - Redis.current.publish('timeline:public:remote', @payload) - end + Redis.current.publish('timeline:public', @payload) if status.curated? + Redis.current.publish('timeline:public:local', @payload) if status.local? + Redis.current.publish('timeline:public:remote', @payload) end def deliver_to_media(status) Rails.logger.debug "Delivering status #{status.id} to media timeline" - Redis.current.publish('timeline:public:media', @payload) - if status.local? - Redis.current.publish('timeline:public:local:media', @payload) - else - Redis.current.publish('timeline:public:remote:media', @payload) - end + Redis.current.publish('timeline:public:media', @payload) if status.curated? + Redis.current.publish('timeline:public:local:media', @payload) if status.local? + Redis.current.publish('timeline:public:remote:media', @payload) end def deliver_to_direct_timelines(status) Rails.logger.debug "Delivering status #{status.id} to direct timelines" - FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account| + FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select(&:local?)) do |account| [status.id, account.id, :direct] end end diff --git a/app/services/fetch_remote_status_service.rb b/app/services/fetch_remote_status_service.rb index eafde4d4a..4f98b51f6 100644 --- a/app/services/fetch_remote_status_service.rb +++ b/app/services/fetch_remote_status_service.rb @@ -1,14 +1,20 @@ # frozen_string_literal: true class FetchRemoteStatusService < BaseService - def call(url, prefetched_body = nil) + def call(url, prefetched_body = nil, on_behalf_of = nil) + status = ActivityPub::TagManager.instance.uri_to_resource(url, Status) + return status if status.present? + if prefetched_body.nil? - resource_url, resource_options = FetchResourceService.new.call(url) + resource_url, resource_options = FetchResourceService.new.call(url, on_behalf_of: on_behalf_of) else resource_url = url resource_options = { prefetched_body: prefetched_body } end - ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options) unless resource_url.nil? + return if resource_url.blank? + + resource_options ||= {} + ActivityPub::FetchRemoteStatusService.new.call(resource_url, **resource_options.merge({ on_behalf_of: on_behalf_of })) end end diff --git a/app/services/fetch_resource_service.rb b/app/services/fetch_resource_service.rb index 6c0093cd4..17e8024de 100644 --- a/app/services/fetch_resource_service.rb +++ b/app/services/fetch_resource_service.rb @@ -7,9 +7,11 @@ class FetchResourceService < BaseService attr_reader :response_code - def call(url) + def call(url, on_behalf_of: nil) return if url.blank? + @on_behalf_of = on_behalf_of || Account.representative + process(url) rescue HTTP::Error, OpenSSL::SSL::SSLError, Addressable::URI::InvalidURIError, Mastodon::HostValidationError, Mastodon::LengthValidationError => e Rails.logger.debug "Error fetching resource #{@url}: #{e}" @@ -18,8 +20,9 @@ class FetchResourceService < BaseService private - def process(url, terminal = false) + def process(url, terminal = false, retry_as_server = false) @url = url + @retry_as_server ||= retry_as_server perform_request { |response| process_response(response, terminal) } end @@ -35,13 +38,14 @@ class FetchResourceService < BaseService # and prevents even public resources from being fetched, so # don't do it - request.on_behalf_of(Account.representative) unless Rails.env.development? + request.on_behalf_of(@retry_as_server ? Account.representative : @on_behalf_of) unless Rails.env.development? end.perform(&block) end def process_response(response, terminal = false) @response_code = response.code - return nil if response.code != 200 + skip_retry = @retry_as_server || Rails.env.development? || @on_behalf_of.id == -99 + return (skip_retry ? nil : process(response.uri, terminal, true)) if response.code != 200 if ['application/activity+json', 'application/ld+json'].include?(response.mime_type) body = response.body_with_limit @@ -67,13 +71,13 @@ class FetchResourceService < BaseService page = Nokogiri::HTML(response.body_with_limit) json_link = page.xpath('//link[@rel="alternate"]').find { |link| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link['type']) } - process(json_link['href'], terminal: true) unless json_link.nil? + process(json_link['href'], true) unless json_link.nil? end def process_link_headers(link_header) json_link = link_header.find_link(%w(rel alternate), %w(type application/activity+json)) || link_header.find_link(%w(rel alternate), ['type', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"']) - process(json_link.href, terminal: true) unless json_link.nil? + process(json_link.href, true) unless json_link.nil? end def parse_link_header(response) diff --git a/app/services/keys/query_service.rb b/app/services/keys/query_service.rb index 286fbd834..f48dafb61 100644 --- a/app/services/keys/query_service.rb +++ b/app/services/keys/query_service.rb @@ -63,7 +63,7 @@ class Keys::QueryService < BaseService json = fetch_resource(@account.devices_url) - return if json['items'].blank? + return if json.blank? || json['items'].blank? @devices = json['items'].map do |device| Device.new(device_id: device['id'], name: device['name'], identity_key: device.dig('identityKey', 'publicKeyBase64'), fingerprint_key: device.dig('fingerprintKey', 'publicKeyBase64'), claim_url: device['claim']) diff --git a/app/services/mute_conversation_service.rb b/app/services/mute_conversation_service.rb new file mode 100644 index 000000000..a12bf9533 --- /dev/null +++ b/app/services/mute_conversation_service.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class MuteConversationService < BaseService + def call(account, conversation) + return if account.blank? || conversation.blank? + + account.mute_conversation!(conversation) + MuteConversationWorker.perform_async(account.id, conversation.id) + end +end diff --git a/app/services/mute_service.rb b/app/services/mute_service.rb index 9ae9afd62..67df92f5c 100644 --- a/app/services/mute_service.rb +++ b/app/services/mute_service.rb @@ -1,10 +1,10 @@ # frozen_string_literal: true class MuteService < BaseService - def call(account, target_account, notifications: nil, duration: 0) + def call(account, target_account, notifications: nil, timelines_only: nil, duration: 0) return if account.id == target_account.id - mute = account.mute!(target_account, notifications: notifications, duration: duration) + mute = account.mute!(target_account, notifications: notifications, timelines_only: timelines_only, duration: duration) if mute.hide_notifications? BlockWorker.perform_async(account.id, target_account.id) diff --git a/app/services/mute_status_service.rb b/app/services/mute_status_service.rb new file mode 100644 index 000000000..bdf99232c --- /dev/null +++ b/app/services/mute_status_service.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class MuteStatusService < BaseService + def call(account, status) + return if account.blank? || status.blank? + + account.mute_status!(status) + FeedManager.instance.unpush_status(account, status) + end +end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index fc187db40..c241c3ca0 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -51,6 +51,11 @@ class NotifyService < BaseService @following_sender = @recipient.following?(@notification.from_account) || @recipient.requested?(@notification.from_account) end + def following_recipient? + return @following_recipient if defined?(@following_recipient) + @following_recipient = @notification.from_account.following?(@recipient) + end + def optional_non_follower? @recipient.user.settings.interactions['must_be_follower'] && !@notification.from_account.following?(@recipient) end @@ -83,7 +88,7 @@ class NotifyService < BaseService end def hellbanned? - @notification.from_account.silenced? && !following_sender? + @notification.from_account.silenced? && !(following_sender? || following_recipient?) end def from_self? diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb index 250d0e8ed..216dedeac 100644 --- a/app/services/post_status_service.rb +++ b/app/services/post_status_service.rb @@ -2,6 +2,7 @@ class PostStatusService < BaseService include Redisable + include ImgProxyHelper MIN_SCHEDULE_OFFSET = 5.minutes.freeze @@ -12,7 +13,10 @@ class PostStatusService < BaseService # @option [Status] :thread Optional status to reply to # @option [Boolean] :sensitive # @option [String] :visibility + # @option [Boolean] :local_only # @option [String] :spoiler_text + # @option [String] :title + # @option [String] :footer # @option [String] :language # @option [String] :scheduled_at # @option [Hash] :poll Optional poll to attach @@ -20,12 +24,31 @@ class PostStatusService < BaseService # @option [Doorkeeper::Application] :application # @option [String] :idempotency Optional idempotency key # @option [Boolean] :with_rate_limit + # @option [Status] :status Edit an existing status + # @option [Enumerable] :mentions Optional array of Mentions to include + # @option [Enumerable] :tags Option array of tag names to include + # @option [Boolean] :publish If true, status will be published + # @option [Boolean] :notify If false, status will not be delivered to local timelines or mentions + # @option [String] :expires_at If set, automatically delete at this time (UTC) + # @option [String] :publish_at If set, automatically publish at this time (UTC) # @return [Status] def call(account, options = {}) @account = account @options = options @text = @options[:text] || '' @in_reply_to = @options[:thread] + @expires_at = @options[:expires_at]&.to_datetime + @publish_at = @options[:publish_at]&.to_datetime + + @expires_at ||= Time.now.utc + @account.user&.setting_unpublish_in.to_i.minutes if @account.user&.setting_unpublish_in.to_i.positive? + @publish_at ||= Time.now.utc + @account.user&.setting_publish_in.to_i.minutes if @account.user&.setting_publish_in.to_i.positive? + + @options[:publish] ||= !(account.user&.setting_manual_publish || @publish_at.present?) + + raise Mastodon::NotPermittedError if different_author? + + @tag_names = (@options[:tags] || []).select { |tag| tag =~ /\A(#{Tag::HASHTAG_NAME_RE})\z/i } + @mentions = @options[:mentions] || [] return idempotency_duplicate if idempotency_given? && idempotency_duplicate? @@ -34,10 +57,12 @@ class PostStatusService < BaseService if scheduled? schedule_status! + elsif @options[:status].present? && status_exists? + update_status! else process_status! postprocess_status! - bump_potential_friendship! + bump_potential_friendship! if @options[:publish] end redis.setex(idempotency_key, 3_600, @status.id) if idempotency_given? @@ -49,14 +74,14 @@ class PostStatusService < BaseService def preprocess_attributes! if @text.blank? && @options[:spoiler_text].present? - @text = '.' - if @media&.find(&:video?) || @media&.find(&:gifv?) - @text = '📹' - elsif @media&.find(&:audio?) - @text = '🎵' - elsif @media&.find(&:image?) - @text = '🖼' - end + @text = '.' + if @media&.find(&:video?) || @media&.find(&:gifv?) + @text = '📹' + elsif @media&.find(&:audio?) + @text = '🎵' + elsif @media&.find(&:image?) + @text = '🖼' + end end @sensitive = (@options[:sensitive].nil? ? @account.user&.setting_default_sensitive : @options[:sensitive]) || @options[:spoiler_text].present? @visibility = @options[:visibility] || @account.user&.setting_default_privacy @@ -75,8 +100,11 @@ class PostStatusService < BaseService @status = @account.statuses.create!(status_attributes) end - process_hashtags_service.call(@status) - process_mentions_service.call(@status) + @status.notify = @options[:notify] if @options[:notify].present? + + process_command_tags_service.call(@account, @status) + process_hashtags_service.call(@status, nil, @tag_names) + process_mentions_service.call(@status, mentions: @mentions, deliver: @options[:publish]) end def schedule_status! @@ -99,16 +127,25 @@ class PostStatusService < BaseService def postprocess_status! LinkCrawlWorker.perform_async(@status.id) unless @status.spoiler_text? DistributionWorker.perform_async(@status.id) + + return unless @options[:publish] + ActivityPub::DistributionWorker.perform_async(@status.id) unless @status.local_only? PollExpirationNotifyWorker.perform_at(@status.poll.expires_at, @status.poll.id) if @status.poll end + def update_status! + tags = Tag.find_or_create_by_names(@tag_names) + @status = UpdateStatusService.new.call(@options[:status], status_attributes, @mentions, tags) + end + def validate_media! return if @options[:media_ids].blank? || !@options[:media_ids].is_a?(Enumerable) raise Mastodon::ValidationError, I18n.t('media_attachments.validations.too_many') if @options[:media_ids].size > 4 || @options[:poll].present? - @media = @account.media_attachments.where(status_id: nil).where(id: @options[:media_ids].take(4).map(&:to_i)) + @media = @options[:status].present? ? @account.media_attachments.where(status_id: [nil, @options[:status].id]) : @account.media_attachments.where(status_id: nil) + @media = @media.where(id: @options[:media_ids].take(4).map(&:to_i)) raise Mastodon::ValidationError, I18n.t('media_attachments.validations.images_and_video') if @media.size > 1 && @media.find(&:audio_or_video?) raise Mastodon::ValidationError, I18n.t('media_attachments.validations.not_ready') if @media.any?(&:not_processed?) @@ -126,6 +163,10 @@ class PostStatusService < BaseService ProcessHashtagsService.new end + def process_command_tags_service + ProcessCommandTagsService.new + end + def scheduled? @scheduled_at.present? end @@ -156,24 +197,33 @@ class PostStatusService < BaseService def bump_potential_friendship! return if !@status.reply? || @account.id == @status.in_reply_to_account_id + ActivityTracker.increment('activity:interactions') return if @account.following?(@status.in_reply_to_account_id) + PotentialFriendshipTracker.record(@account.id, @status.in_reply_to_account_id, :reply) end def status_attributes { text: @text, + original_text: @text, media_attachments: @media || [], thread: @in_reply_to, poll_attributes: poll_attributes, sensitive: @sensitive, spoiler_text: @options[:spoiler_text] || '', + title: @options[:title], + footer: @options[:footer], visibility: @visibility, + local_only: @options[:local_only], language: language_from_option(@options[:language]) || @account.user&.setting_default_language&.presence || LanguageDetector.instance.detect(@text, @account), application: @options[:application], + published: @options[:publish], content_type: @options[:content_type] || @account.user&.setting_default_content_type, rate_limit: @options[:with_rate_limit], + expires_at: @expires_at, + publish_at: @publish_at, }.compact end @@ -198,6 +248,16 @@ class PostStatusService < BaseService options_hash[:scheduled_at] = nil options_hash[:idempotency] = nil options_hash[:with_rate_limit] = false + options_hash[:mention_ids] = options_hash.delete(:mentions)&.pluck(:id) + options_hash[:status_id] = options_hash.delete(:status)&.id end end + + def different_author? + @options[:status].present? && @options[:status].account_id != @account.id + end + + def status_exists? + !(@options[:status].discarded? || @options[:status].destroyed?) + end end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index b4fa70710..56d9d2f68 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -4,6 +4,7 @@ class PrecomputeFeedService < BaseService def call(account) FeedManager.instance.populate_home(account) FeedManager.instance.populate_direct_feed(account) + FeedManager.instance.populate_lists(account) ensure Redis.current.del("account:#{account.id}:regeneration") end diff --git a/app/services/process_command_tags_service.rb b/app/services/process_command_tags_service.rb new file mode 100644 index 000000000..6b6d46662 --- /dev/null +++ b/app/services/process_command_tags_service.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class ProcessCommandTagsService < BaseService + def call(account, status, raise_if_no_output: true) + CommandTag::Processor.new(account, status).process! + raise Mastodon::LengthValidationError, 'Text commands were processed successfully.' if raise_if_no_output && status.destroyed? + + status + end +end diff --git a/app/services/process_hashtags_service.rb b/app/services/process_hashtags_service.rb index e8e139b05..5ec5ea0c2 100644 --- a/app/services/process_hashtags_service.rb +++ b/app/services/process_hashtags_service.rb @@ -1,15 +1,19 @@ # frozen_string_literal: true class ProcessHashtagsService < BaseService - def call(status, tags = []) - tags = Extractor.extract_hashtags(status.text) if status.local? + def call(status, tags = nil, extra_tags = []) + tags ||= extra_tags | (status.local? ? Extractor.extract_hashtags(status.text) : []) records = [] + tag_ids = status.tag_ids.to_set + Tag.find_or_create_by_names(tags) do |tag| + next if tag_ids.include?(tag.id) + status.tags << tag records << tag - TrendingTags.record_use!(tag, status.account, status.created_at) if status.public_visibility? + TrendingTags.record_use!(tag, status.account, status.created_at) if status.distributable? end return unless status.distributable? diff --git a/app/services/process_mentions_service.rb b/app/services/process_mentions_service.rb index d5ea69da1..e4aad7147 100644 --- a/app/services/process_mentions_service.rb +++ b/app/services/process_mentions_service.rb @@ -7,70 +7,37 @@ class ProcessMentionsService < BaseService # local mention pointers, send Salmon notifications to mentioned # remote users # @param [Status] status - def call(status) - return unless status.local? + # @option [Enumerable] :mentions Mentions to include + # @option [Boolean] :deliver Deliver mention notifications + def call(status, mentions: [], deliver: true) + return unless status.local? && !(status.frozen? || status.destroyed?) - @status = status - mentions = [] + @status = status + @status.text, mentions = ResolveMentionsService.new.call(@status, mentions: mentions) + @status.save! - status.text = status.text.gsub(Account::MENTION_RE) do |match| - username, domain = Regexp.last_match(1).split('@') + return unless deliver - domain = begin - if TagManager.instance.local_domain?(domain) - nil - else - TagManager.instance.normalize_domain(domain) - end - end - - mentioned_account = Account.find_remote(username, domain) - - if mention_undeliverable?(mentioned_account) - begin - mentioned_account = resolve_account_service.call(Regexp.last_match(1)) - rescue Webfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError, Mastodon::UnexpectedResponseError - mentioned_account = nil - end - end - - next match if mention_undeliverable?(mentioned_account) || mentioned_account&.suspended? - - mention = mentioned_account.mentions.new(status: status) - mentions << mention if mention.save - - "@#{mentioned_account.acct}" - end - - status.save! check_for_spam(status) + @activitypub_json = {} mentions.each { |mention| create_notification(mention) } end private - def mention_undeliverable?(mentioned_account) - mentioned_account.nil? || (!mentioned_account.local? && mentioned_account.ostatus?) - end - def create_notification(mention) mentioned_account = mention.account if mentioned_account.local? - LocalNotificationWorker.perform_async(mentioned_account.id, mention.id, mention.class.name, :mention) + LocalNotificationWorker.perform_async(mentioned_account.id, mention.id, mention.class.name, :mention) unless !@status.notify? || mention.silent? elsif mentioned_account.activitypub? && !@status.local_only? - ActivityPub::DeliveryWorker.perform_async(activitypub_json, mention.status.account_id, mentioned_account.inbox_url) + ActivityPub::DeliveryWorker.perform_async(activitypub_json(mentioned_account.domain), mention.status.account_id, mentioned_account.inbox_url) end end - def activitypub_json - return @activitypub_json if defined?(@activitypub_json) - @activitypub_json = Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account)) - end - - def resolve_account_service - ResolveAccountService.new + def activitypub_json(domain) + @activitypub_json[domain] ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status, embed: false), ActivityPub::ActivitySerializer, signer: @status.account, target_domain: domain)) end def check_for_spam(status) diff --git a/app/services/publish_status_service.rb b/app/services/publish_status_service.rb new file mode 100644 index 000000000..e95c3dacd --- /dev/null +++ b/app/services/publish_status_service.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true +class PublishStatusService < BaseService + include Redisable + + def call(status) + return if status.published? + + @status = status + + update_status! + reset_status_caches + distribute + bump_potential_friendship! + end + + private + + def update_status! + @status.update!(published: true, publish_at: nil, expires_at: @status.expires_at.blank? ? nil : Time.now.utc + (@status.expires_at - @status.created_at)) + ProcessMentionsService.new.call(@status) + end + + def reset_status_caches + Rails.cache.delete_matched("statuses/#{@status.id}-*") + Rails.cache.delete("statuses/#{@status.id}") + Rails.cache.delete(@status) + Rails.cache.delete_matched("format:#{@status.id}:*") + redis.zremrangebyscore("spam_check:#{@status.account.id}", @status.id, @status.id) + end + + def distribute + LinkCrawlWorker.perform_in(rand(1..30).seconds, @status.id) unless @status.spoiler_text? + DistributionWorker.perform_async(@status.id) + ActivityPub::DistributionWorker.perform_async(@status.id) if @status.local? && !@status.local_only? + end + + def bump_potential_friendship! + return if !@status.reply? || @status.account.id == @status.in_reply_to_account_id + + ActivityTracker.increment('activity:interactions') + return if @status.account.following?(@status.in_reply_to_account_id) + + PotentialFriendshipTracker.record(@status.account.id, @status.in_reply_to_account_id, :reply) + end +end diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb index 6f018e24b..3188bbb69 100644 --- a/app/services/reblog_service.rb +++ b/app/services/reblog_service.rb @@ -28,7 +28,8 @@ class ReblogService < BaseService end end - reblog = account.statuses.create!(reblog: reblogged_status, text: '', visibility: visibility, rate_limit: options[:with_rate_limit]) + reblog = account.statuses.create!(reblog: reblogged_status, text: '', visibility: visibility, rate_limit: options[:with_rate_limit], sensitive: true, spoiler_text: options[:spoiler_text] || '', published: true) + curate!(reblogged_status) unless reblogged_status.curated? || !reblogged_status.published? DistributionWorker.perform_async(reblog.id) ActivityPub::DistributionWorker.perform_async(reblog.id) unless reblogged_status.local_only? @@ -60,6 +61,11 @@ class ReblogService < BaseService end def build_json(reblog) - Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(reblog), ActivityPub::ActivitySerializer, signer: reblog.account)) + Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(reblog, embed: false), ActivityPub::ActivitySerializer, signer: reblog.account, target_domain: reblog.account.domain)) + end + + def curate!(status) + status.curate! + DistributionWorker.perform_async(status.id) end end diff --git a/app/services/remove_hashtags_service.rb b/app/services/remove_hashtags_service.rb new file mode 100644 index 000000000..6bf77a068 --- /dev/null +++ b/app/services/remove_hashtags_service.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +class RemoveHashtagsService < BaseService + def call(status, tags) + tags = status.tags.matching_name(tags) if tags.is_a?(Array) + + status.account.featured_tags.where(tag: tags).each do |featured_tag| + featured_tag.decrement(status.id) + end + + if status.distributable? + delete_payload = Oj.dump(event: :delete, payload: status.id.to_s) + tags.pluck(:name).each do |hashtag| + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", delete_payload) + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", delete_payload) if status.local? + end + end + + status.tags -= tags + end +end diff --git a/app/services/remove_media_attachments_service.rb b/app/services/remove_media_attachments_service.rb new file mode 100644 index 000000000..de3cd9afb --- /dev/null +++ b/app/services/remove_media_attachments_service.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class RemoveMediaAttachmentsService < BaseService + # Remove a list of media attachments by their IDs + # @param [Enumerable] attachment_ids + def call(attachment_ids) + media_attachments = MediaAttachment.where(id: attachment_ids) + media_attachments.map(&:id).each { |id| Rails.cache.delete_matched("statuses/#{id}-*") } + media_attachments.destroy_all + end +end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index a5aafee21..4d07632d3 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -15,13 +15,13 @@ class RemoveStatusService < BaseService @status = status @account = status.account @tags = status.tags.pluck(:name).to_a - @mentions = status.active_mentions.includes(:account).to_a + @mentions = status.mentions.includes(:account).to_a @reblogs = status.reblogs.includes(:account).to_a @options = options RedisLock.acquire(lock_options) do |lock| if lock.acquired? - remove_from_self if status.account.local? + remove_from_self if status.account.local? && !@options[:unpublish] remove_from_followers remove_from_lists remove_from_affected @@ -30,10 +30,10 @@ class RemoveStatusService < BaseService remove_from_public remove_from_media if status.media_attachments.any? remove_from_direct if status.direct_visibility? - remove_from_spam_check - remove_media + remove_from_spam_check unless @options[:unpublish] + remove_media unless @options[:unpublish] - @status.destroy! if @options[:immediate] || !@status.reported? + @status.destroy! if @options[:immediate] || !((@options[:unpublish] && @status.local?) || @status.reported?) else raise Mastodon::RaceConditionError end @@ -44,10 +44,18 @@ class RemoveStatusService < BaseService # original object being removed implicitly removes reblogs # of it. The Delete activity of the original is forwarded # separately. - return if !@account.local? || @options[:original_removed] + return if !@account.local? || @options[:original_removed] || !(status.published? || @options[:unpublished]) remove_from_remote_followers remove_from_remote_affected + remove_from_remote_shared + + @status.mentions.where(account_id: @options[:blocking]).destroy_all if @options[:blocking] + + return unless @options[:unpublish] + + @status.update(published: false, expires_at: nil, local_only: @status.local?) + DistributionWorker.perform_async(@status.id) if @status.local? end private @@ -107,12 +115,18 @@ class RemoveStatusService < BaseService def relay! ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| + [signed_activity_json(Addressable::URI.parse(inbox_url).host), @account.id, inbox_url] + end + end + + def remove_from_remote_shared + ActivityPub::DeliveryWorker.push_bulk(Account.remote.activitypub.where.not(shared_inbox_url: '').distinct.select(:shared_inbox_url).pluck(:shared_inbox_url)) do |inbox_url| [signed_activity_json, @account.id, inbox_url] end end def signed_activity_json - @signed_activity_json ||= Oj.dump(serialize_payload(@status, @status.reblog? ? ActivityPub::UndoAnnounceSerializer : ActivityPub::DeleteSerializer, signer: @account)) + @signed_activity_json ||= Oj.dump(serialize_payload(@status, @status.reblog? && @status.spoiler_text.blank? ? ActivityPub::UndoAnnounceSerializer : ActivityPub::DeleteSerializer, signer: @account)) end def remove_reblogs @@ -130,7 +144,7 @@ class RemoveStatusService < BaseService featured_tag.decrement(@status.id) end - return unless @status.public_visibility? + return unless @status.distributable? @tags.each do |hashtag| redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", @payload) @@ -139,25 +153,19 @@ class RemoveStatusService < BaseService end def remove_from_public - return unless @status.public_visibility? + return unless @status.distributable? redis.publish('timeline:public', @payload) - if @status.local? - redis.publish('timeline:public:local', @payload) - else - redis.publish('timeline:public:remote', @payload) - end + redis.publish('timeline:public:local', @payload) if @status.local? + redis.publish('timeline:public:remote', @payload) end def remove_from_media - return unless @status.public_visibility? + return unless @status.distributable? redis.publish('timeline:public:media', @payload) - if @status.local? - redis.publish('timeline:public:local:media', @payload) - else - redis.publish('timeline:public:remote:media', @payload) - end + redis.publish('timeline:public:local:media', @payload) if @status.local? + redis.publish('timeline:public:remote:media', @payload) end def remove_from_direct diff --git a/app/services/resolve_mentions_service.rb b/app/services/resolve_mentions_service.rb new file mode 100644 index 000000000..6478dc902 --- /dev/null +++ b/app/services/resolve_mentions_service.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +class ResolveMentionsService < BaseService + # Scan text for mentions and create local mention pointers + # @param [Status] status Status to attach to mention pointers + # @option [String] :text Text containing mentions to resolve (default: use status text) + # @option [Enumerable] :mentions Additional mentions to include + # @return [Array] Array containing text with mentions resolved (String) and mention pointers (Set) + def call(status, text: nil, mentions: []) + mentions = Mention.includes(:account).where(id: mentions.pluck(:id), accounts: { suspended_at: nil }).or(status.mentions.includes(:account)) + implicit_mention_acct_ids = mentions.active.pluck(:account_id).to_set + text = status.text if text.nil? + mentions = mentions.to_set + + text.gsub(Account::MENTION_RE) do |match| + username, domain = Regexp.last_match(1).split('@') + + domain = begin + if TagManager.instance.local_domain?(domain) + nil + else + TagManager.instance.normalize_domain(domain) + end + end + + mentioned_account = Account.find_remote(username, domain) + + if mention_undeliverable?(mentioned_account) + begin + mentioned_account = resolve_account_service.call(Regexp.last_match(1)) + rescue Webfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError, Mastodon::UnexpectedResponseError + mentioned_account = nil + end + end + + next match if mention_undeliverable?(mentioned_account) || mentioned_account&.suspended? + + mention = mentioned_account.mentions.where(status: status).first_or_create(status: status, silent: false) + mention.update(silent: false) if mention.silent? + + mentions << mention + implicit_mention_acct_ids.delete(mentioned_account.id) + + "@#{mentioned_account.acct}" + end + + Mention.where(id: implicit_mention_acct_ids).update_all(silent: true) + + [text, mentions] + end + + private + + def mention_undeliverable?(mentioned_account) + mentioned_account.nil? || (!mentioned_account.local? && mentioned_account.ostatus?) + end + + def resolve_account_service + ResolveAccountService.new + end +end diff --git a/app/services/resolve_url_service.rb b/app/services/resolve_url_service.rb index 78080d878..bac41f961 100644 --- a/app/services/resolve_url_service.rb +++ b/app/services/resolve_url_service.rb @@ -23,7 +23,7 @@ class ResolveURLService < BaseService if equals_or_includes_any?(type, ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) ActivityPub::FetchRemoteAccountService.new.call(resource_url, prefetched_body: body) elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) - status = FetchRemoteStatusService.new.call(resource_url, body) + status = FetchRemoteStatusService.new.call(resource_url, body, @on_behalf_of) authorize_with @on_behalf_of, status, :show? unless status.nil? status end @@ -42,7 +42,7 @@ class ResolveURLService < BaseService end def fetched_resource - @fetched_resource ||= fetch_resource_service.call(@url) + @fetched_resource ||= fetch_resource_service.call(@url, on_behalf_of: @on_behalf_of) end def fetch_resource_service diff --git a/app/services/revoke_status_service.rb b/app/services/revoke_status_service.rb new file mode 100644 index 000000000..f4762631c --- /dev/null +++ b/app/services/revoke_status_service.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +class RevokeStatusService < BaseService + include Redisable + include Payloadable + + # Unpublish a status from a given set of local accounts' timelines and public, if visibility changed. + # @param [Status] status + # @param [Enumerable] account_ids + def call(status, account_ids) + @payload = Oj.dump(event: :delete, payload: status.id.to_s) + @status = status + @account = status.account + @account_ids = account_ids + @mentions = status.mentions.where(account_id: account_ids) + @reblogs = status.reblogs.where(account_id: account_ids) + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + remove_from_followers + remove_from_lists + remove_from_affected + remove_reblogs + remove_from_hashtags + remove_from_public + remove_from_media + remove_from_direct if status.direct_visibility? + else + raise Mastodon::RaceConditionError + end + end + end + + private + + def remove_from_followers + @account.followers_for_local_distribution.where(id: @account_ids).reorder(nil).find_each do |follower| + FeedManager.instance.unpush_from_home(follower, @status) + end + end + + def remove_from_lists + @account.lists_for_local_distribution.where(account_id: @account_ids).select(:id, :account_id).reorder(nil).find_each do |list| + FeedManager.instance.unpush_from_list(list, @status) + end + end + + def remove_from_affected + @mentions.map(&:account).select(&:local?).each do |account| + redis.publish("timeline:#{account.id}", @payload) + end + end + + def remove_reblogs + @reblogs.each do |reblog| + RemoveStatusService.new.call(reblog) + end + end + + def remove_from_hashtags + @account.featured_tags.where(tag_id: @status.tags.pluck(:id)).each do |featured_tag| + featured_tag.decrement(@status.id) + end + + return unless @status.distributable? + + @tags.each do |hashtag| + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", @payload) + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", @payload) if @status.local? + end + end + + def remove_from_public + return if @status.distributable? + + redis.publish('timeline:public', @payload) + redis.publish('timeline:public:local', @payload) if @status.local? + redis.publish('timeline:public:remote', @payload) + end + + def remove_from_media + return if @status.distributable? + + redis.publish('timeline:public:media', @payload) + redis.publish('timeline:public:local:media', @payload) if @status.local? + redis.publish('timeline:public:remote:media', @payload) + end + + def remove_from_direct + @mentions.each do |mention| + FeedManager.instance.unpush_from_direct(mention.account, @status) if mention.account.local? + end + end + + def lock_options + { redis: Redis.current, key: "distribute:#{@status.id}" } + end +end diff --git a/app/services/search_service.rb b/app/services/search_service.rb index 19500a8d4..819ce2c16 100644 --- a/app/services/search_service.rb +++ b/app/services/search_service.rb @@ -53,7 +53,7 @@ class SearchService < BaseService account_domains = results.map(&:account_domain) preloaded_relations = relations_map_for_account(@account, account_ids, account_domains) - results.reject { |status| StatusFilter.new(status, @account, preloaded_relations).filtered? } + results.reject { |status| StatusFilter.new(status, @account, true, preloaded_relations).filtered? } rescue Faraday::ConnectionFailed, Parslet::ParseFailed [] end diff --git a/app/services/suspend_account_service.rb b/app/services/suspend_account_service.rb index 5a079c3ac..7ad4777ee 100644 --- a/app/services/suspend_account_service.rb +++ b/app/services/suspend_account_service.rb @@ -18,7 +18,7 @@ class SuspendAccountService < BaseService def unmerge_from_home_timelines! @account.followers_for_local_distribution.find_each do |follower| - FeedManager.instance.unmerge_from_timeline(@account, follower) + FeedManager.instance.unmerge_from_home(@account, follower) end end @@ -37,9 +37,11 @@ class SuspendAccountService < BaseService styles = [:original] | attachment.styles.keys styles.each do |style| + next if attachment.path(style).blank? + case Paperclip::Attachment.default_options[:storage] when :s3 - attachment.s3_object(style).acl.put(:private) + attachment.s3_object(style).acl.put({ acl: 'private' }) when :fog # Not supported when :filesystem diff --git a/app/services/unallow_domain_service.rb b/app/services/unallow_domain_service.rb index fc5260761..0d2d8f254 100644 --- a/app/services/unallow_domain_service.rb +++ b/app/services/unallow_domain_service.rb @@ -4,7 +4,7 @@ class UnallowDomainService < BaseService include DomainControlHelper def call(domain_allow) - suspend_accounts!(domain_allow.domain) if whitelist_mode? + suspend_accounts!(domain_allow.domain) domain_allow.destroy end @@ -12,6 +12,7 @@ class UnallowDomainService < BaseService private def suspend_accounts!(domain) + DomainDefederationWorker.perform_async(domain) Account.where(domain: domain).in_batches.update_all(suspended_at: Time.now.utc) AfterUnallowDomainWorker.perform_async(domain) end diff --git a/app/services/unfollow_service.rb b/app/services/unfollow_service.rb index 151f3674f..c3e70d414 100644 --- a/app/services/unfollow_service.rb +++ b/app/services/unfollow_service.rb @@ -13,13 +13,15 @@ class UnfollowService < BaseService @target_account = target_account @options = options - unfollow! || undo_follow_request! + unfollow! + undo_follow_request! end private def unfollow! follow = Follow.find_by(account: @source_account, target_account: @target_account) + follow = Follow.create!(account: @source_account, target_account: @target_account) if follow.blank? && @options[:force] return unless follow @@ -34,6 +36,7 @@ class UnfollowService < BaseService def undo_follow_request! follow_request = FollowRequest.find_by(account: @source_account, target_account: @target_account) + follow_request = FollowRequest.create!(account: @source_account, target_account: @target_account) if follow_request.blank? && @options[:force] return unless follow_request diff --git a/app/services/update_status_service.rb b/app/services/update_status_service.rb new file mode 100644 index 000000000..9dc4fbbcd --- /dev/null +++ b/app/services/update_status_service.rb @@ -0,0 +1,161 @@ +# frozen_string_literal: true + +class UpdateStatusService < BaseService + include Redisable + include ImgProxyHelper + + ALLOWED_ATTRIBUTES = %i( + spoiler_text + title + text + original_text + footer + content_type + language + sensitive + visibility + local_only + media_attachments + media_attachment_ids + application + expires_at + ).freeze + + # Updates the content of an existing status. + # @param [Status] status The status to update. + # @param [Hash] params The attributes of the new status. + # @param [Enumerable] mentions Additional mentions added to the status. + # @param [Enumerable] tags New tags for the status to belong to (implicit tags are preserved). + def call(status, params, mentions = nil, tags = nil) + raise ActiveRecord::RecordNotFound if status.blank? || status.discarded? || status.destroyed? + return status if params.blank? + + @status = status + @account = @status.account + @params = params.with_indifferent_access.slice(*ALLOWED_ATTRIBUTES).compact + @mentions = (@status.mentions | (mentions || [])).to_set + @tags = (tags.nil? ? @status.tags : (tags || [])).to_set + + @params[:text] ||= '' + @params[:original_text] = @params[:text] + @params[:published] = true if @status.published? + @params[:local_only] = @status.local_only? if @params[:local_only] == true && (@status.edited.positive? || @status.published?) + @params[:edited] ||= 1 + @status.edited if @params[:published].presence || @status.published? + @params[:expires_at] ||= Time.now.utc + (@status.expires_at - @status.created_at) if @status.expires_at.present? + + @params[:originally_local_only] = @params[:local_only] unless @status.published? + + update_tags if @status.local? + + @delete_payload = Oj.dump(event: :delete, payload: @status.id.to_s) + @deleted_tag_ids = @status.tags.pluck(:id) - @tags.pluck(:id) + @deleted_tag_names = @status.tags.pluck(:name) - @tags.pluck(:name) + @deleted_attachment_ids = @status.media_attachment_ids - (@params[:media_attachment_ids] || @params[:media_attachments]&.pluck(:id) || []) + + ApplicationRecord.transaction do + @status.update!(@params) + + if @account.local? + ProcessCommandTagsService.new.call(@account, @status) + else + process_inline_images! + end + + update_mentions + @status.save! + + detach_deleted_tags + attach_updated_tags + end + + prune_tags + prune_attachments + reset_status_caches + + SpamCheck.perform(@status) if @status.published? + distribute + + @status + end + + private + + def prune_attachments + @new_inline_ids = @status.inlined_attachments.pluck(:media_attachment_id) + RemoveMediaAttachmentsWorker.perform_async(@deleted_attachment_ids) if @deleted_attachment_ids.present? + end + + def detach_deleted_tags + @status.tags -= Tag.where(id: @deleted_tag_ids) if @deleted_tag_ids.present? + end + + def prune_tags + @account.featured_tags.where(tag_id: @deleted_tag_ids).each do |featured_tag| + featured_tag.decrement(@status.id) + end + + return unless @status.distributable? && @deleted_tag_names.present? + + @deleted_tag_names.each do |hashtag| + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", @delete_payload) + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", @delete_payload) if @status.local? + end + end + + def update_tags + old_explicit_tags = Tag.matching_name(Extractor.extract_hashtags(@status.text)) + @tags |= Tag.find_or_create_by_names(Extractor.extract_hashtags(@params[:text])) + + # Preserve implicit tags attached to the original status. + # TODO: Let locals remove them from edits. + @tags |= @status.tags.where.not(id: old_explicit_tags.select(:id)) + end + + def update_mentions + @new_mention_ids = @mentions.pluck(:id) - @status.mention_ids + @status.text, @mentions = ResolveMentionsService.new.call(@status, mentions: @mentions) + @new_mention_ids |= (@mentions.pluck(:id) - @new_mention_ids) + end + + def attach_updated_tags + tag_ids = @status.tag_ids.to_set + new_tag_ids = [] + now = Time.now.utc + + @tags.each do |tag| + next if tag_ids.include?(tag.id) || /\A(#{Tag::HASHTAG_NAME_RE})\z/i =~ $LAST_READ_LINE + + @status.tags << tag + new_tag_ids << tag.id + TrendingTags.record_use!(tag, @account, now) if @status.distributable? + end + + return unless @status.local? && @status.distributable? + + @account.featured_tags.where(tag_id: new_tag_ids).each do |featured_tag| + featured_tag.increment(now) + end + end + + def reset_status_caches + Rails.cache.delete_matched("statuses/#{@status.id}-*") + Rails.cache.delete("statuses/#{@status.id}") + Rails.cache.delete(@status) + Rails.cache.delete_matched("format:#{@status.id}:*") + redis.zremrangebyscore("spam_check:#{@account.id}", @status.id, @status.id) + end + + def distribute + LinkCrawlWorker.perform_in(rand(1..30).seconds, @status.id) unless @status.spoiler_text? + DistributionWorker.perform_async(@status.id) + + return unless @status.published? + + ActivityPub::DistributionWorker.perform_async(@status.id) if @status.local? && !@status.local_only? + + return unless @status.notify? + + mentions = @status.active_mentions.includes(:account).where(id: @new_mention_ids, accounts: { domain: nil }) + mentions.each { |mention| LocalNotificationWorker.perform_async(mention.account.id, mention.id, mention.class.name) } + end +end |