diff options
-rw-r--r-- | app/services/process_feed_service.rb | 342 |
1 files changed, 147 insertions, 195 deletions
diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb index e1125cf05..7ec46cac2 100644 --- a/app/services/process_feed_service.rb +++ b/app/services/process_feed_service.rb @@ -2,257 +2,209 @@ class ProcessFeedService < BaseService ACTIVITY_NS = 'http://activitystrea.ms/spec/1.0/'.freeze THREAD_NS = 'http://purl.org/syndication/thread/1.0'.freeze - # Create local statuses from an Atom feed - # @param [String] body Atom feed - # @param [Account] account Account this feed belongs to - # @return [Enumerable] created statuses def call(body, account) xml = Nokogiri::XML(body) - update_remote_profile_service.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) unless xml.at_xpath('/xmlns:feed').nil? - xml.xpath('//xmlns:entry').reverse_each.map { |entry| process_entry(account, entry) }.compact + + update_author(xml, account) + process_entries(xml, account) end private - def process_entry(account, entry) - return unless [:note, :comment, :activity].include? object_type(entry) - - status = Status.find_by(uri: activity_id(entry)) + def update_author(xml, account) + return if xml.at_xpath('/xmlns:feed').nil? + UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) + end - # If we already have a post and the verb is now "delete", we gotta delete it and move on! - if !status.nil? && verb(entry) == :delete - delete_post!(status) - return - end + def process_entries(xml, account) + xml.xpath('//xmlns:entry').reverse_each.map { |entry| ProcessEntry.new.call(entry, account) }.compact + end - return unless status.nil? + class ProcessEntry + def call(xml, account) + @account = account + @xml = xml - status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry)) + return if skip_unsupported_type? - if verb(entry) == :share - add_reblog!(entry, status) - elsif verb(entry) == :post - if thread_id(entry).nil? - add_post!(entry, status) - else - add_reply!(entry, status) + case verb + when :post, :share + return create_status + when :delete + return delete_status end - else - return end - # If we added a status, go through accounts it mentions and create respective relations - # Also record all media attachments for the status and for the reblogged status if present - unless status.new_record? - record_remote_mentions(status, entry.xpath('./xmlns:link[@rel="mentioned"]')) - record_remote_mentions(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:link[@rel="mentioned"]')) if status.reblog? + private - if status.reblog? - ProcessHashtagsService.new.call(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:category').map { |category| category['term'] }) - else - ProcessHashtagsService.new.call(status, entry.xpath('./xmlns:category').map { |category| category['term'] }) - end + def create_status + Rails.logger.debug "Creating remote status #{id}" + status = status_from_xml(@xml) - process_attachments(entry, status) - process_attachments(entry.xpath('./activity:object', activity: ACTIVITY_NS), status.reblog) if status.reblog? + if verb == :share + original_status = status_from_xml(xml.at_xpath('.//activity:object', activity: ACTIVITY_NS)) + status.reblog = original_status + end - Rails.logger.debug "Queuing remote status #{status.id} for distribution" + status.save! + Rails.logger.debug "Queuing remote status #{status.id} (#{id}) for distribution" DistributionWorker.perform_async(status.id) - return status + status end - end - def record_remote_mentions(status, links) - return if status.local? + def delete_status + Rails.logger.debug "Deleting remote status #{id}" + status = Status.find_by(uri: id) + RemoveStatusService.new.call(status) unless status.nil? + nil + end - # Here we have to do a reverse lookup of local accounts by their URL! - # It's not pretty at all! I really wish all these protocols sticked to - # using acct:username@domain only! It would make things so much easier - # and tidier + def skip_unsupported_type? + !([:post, :share, :delete].include?(verb) && [:activity, :note, :comment].include?(type)) + end - links.each do |mention_link| - href_val = mention_link.attribute('href').value + def status_from_xml(entry) + # Return early if status already exists in db + status = find_status(id(entry)) + return status unless status.nil? + + status = Status.create!({ + uri: id(entry), + url: url(entry), + account: account?(entry) ? find_or_resolve_account(acct(entry)) : @account, + text: content(entry), + created_at: published(entry), + }) + + if thread?(entry) + status.thread = find_or_resolve_status(status, *thread(entry)) + end - next if href_val == 'http://activityschema.org/collection/public' + mentions_from_xml(status, entry) + hashtags_from_xml(status, entry) + media_from_xml(status, entry) - href = Addressable::URI.parse(href_val) + status + end - if TagManager.instance.local_domain?(href.host) - # A local user is mentioned - mentioned_account = Account.find_local(href.path.gsub('/users/', '')) + def find_or_resolve_account(acct) + FollowRemoteAccountService.new.call(acct) + end - unless mentioned_account.nil? - mentioned_account.mentions.where(status: status).first_or_create(status: status) - NotificationMailer.mention(mentioned_account, status).deliver_later unless mentioned_account.blocking?(status.account) - end - else - # What to do about remote user? - # This is kinda dodgy because URLs could change, we don't index them - mentioned_account = Account.find_by(url: href.to_s) + def find_or_resolve_status(parent, uri, url) + status = find_status(uri) + ThreadResolveWorker.perform_async(parent.id, url) if status.nil? - if mentioned_account.nil? - mentioned_account = FetchRemoteAccountService.new.call(href) - end + status + end - unless mentioned_account.nil? - mentioned_account.mentions.where(status: status).first_or_create(status: status) - end + def find_status(uri) + if TagManager.instance.local_id?(uri) + local_id = TagManager.instance.unique_tag_to_local_id(uri, 'Status') + return Status.find(local_id) end + + Status.find_by(uri: uri) end - end - def process_attachments(entry, status) - return if status.local? + def mentions_from_xml(parent, xml) + processed_account_ids = [] - entry.xpath('./xmlns:link[@rel="enclosure"]').each do |enclosure_link| - next if enclosure_link.attribute('href').nil? + xml.xpath('./xmlns:link[@rel="mentioned"]').each do |link| + next if link['href'] == 'http://activityschema.org/collection/public' - media = MediaAttachment.where(status: status, remote_url: enclosure_link.attribute('href').value).first + url = Addressable::URI.parse(link['href']) - next unless media.nil? + mentioned_account = if TagManager.instance.local_domain?(url.host) + Account.find_local(url.path.gsub('/users/', '')) + else + Account.find_by(url: link['href']) || FetchRemoteAccountService.new.call(link['href']) + end - begin - media = MediaAttachment.new(account: status.account, status: status, remote_url: enclosure_link.attribute('href').value) - media.file_remote_url = enclosure_link.attribute('href').value - media.save - rescue Paperclip::Errors::NotIdentifiedByImageMagickError - Rails.logger.debug "Error saving attachment from #{enclosure_link.attribute('href').value}" - next - end - end - end + next if mentioned_account.nil? || processed_account_ids.include?(mentioned_account.id) - def add_post!(_entry, status) - status.save! - end + if mentioned_account.local? + # Send notifications + NotificationMailer.mention(mentioned_account, parent).deliver_later unless mentioned_account.blocking?(parent.account) + end - def add_reblog!(entry, status) - status.reblog = find_original_status(entry, target_id(entry)) + mentioned_account.mentions.where(status: parent).first_or_create(status: parent) - if status.reblog.nil? - status.reblog = fetch_remote_status(entry) - end - - if !status.reblog.nil? - status.save! - NotificationMailer.reblog(status.reblog, status.account).deliver_later if status.reblog.local? && !status.reblog.account.blocking?(status.account) + # So we can skip duplicate mentions + processed_account_ids << mentioned_account.id + end end - end - def add_reply!(entry, status) - status.thread = find_original_status(entry, thread_id(entry)) - status.save! - - if status.thread.nil? && !thread_href(entry).nil? - ThreadResolveWorker.perform_async(status.id, thread_href(entry)) + def hashtags_from_xml(parent, xml) + tags = xml.xpath('./xmlns:category').map { |category| category['term'] } + ProcessHashtagsService.new.call(parent, tags) end - end - def delete_post!(status) - remove_status_service.call(status) - end + def media_from_xml(parent, xml) + xml.xpath('./xmlns:link[@rel="enclosure"]').each do |link| + next unless link['href'] - def find_original_status(_xml, id) - return nil if id.nil? + media = MediaAttachment.where(status: parent, remote_url: link['href']).first_or_initialize(account: parent.account, status: parent, remote_url: link['href']) - if TagManager.instance.local_id?(id) - Status.find(TagManager.instance.unique_tag_to_local_id(id, 'Status')) - else - Status.find_by(uri: id) + begin + media.file_remote_url = link['href'] + media.save + rescue Paperclip::Errors::NotIdentifiedByImageMagickError + next + end + end end - end - - def fetch_remote_status(xml) - username = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:name').content - url = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:uri').content - domain = Addressable::URI.parse(url).host - account = Account.find_remote(username, domain) - if account.nil? - account = follow_remote_account_service.call("#{username}@#{domain}") + def id(xml = @xml) + xml.at_xpath('./xmlns:id').content end - status = Status.new(account: account, uri: target_id(xml), text: target_content(xml), url: target_url(xml), created_at: published(xml), updated_at: updated(xml)) - status.thread = find_original_status(xml, thread_id(xml)) - - if status.save && status.thread.nil? && !thread_href(xml).nil? - ThreadResolveWorker.perform_async(status.id, thread_href(xml)) + def verb(xml = @xml) + raw = xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content + raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym + rescue + :post end - status - rescue Goldfinger::Error, HTTP::Error - nil - end - - def published(xml) - xml.at_xpath('./xmlns:published').content - end - - def updated(xml) - xml.at_xpath('./xmlns:updated').content - end - - def content(xml) - xml.at_xpath('./xmlns:content').try(:content) - end - - def thread_id(xml) - xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('ref').value - rescue - nil - end - - def thread_href(xml) - xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('href').value - rescue - nil - end - - def target_id(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:id').content - rescue - nil - end - - def activity_id(xml) - xml.at_xpath('./xmlns:id').content - end + def type(xml = @xml) + raw = xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content + raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym + rescue + :activity + end - def activity_link(xml) - xml.at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value - rescue - '' - end + def url(xml = @xml) + link = xml.at_xpath('./xmlns:link[@rel="alternate"]') + link['href'] + end - def target_content(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:content').content - end + def content(xml = @xml) + xml.at_xpath('./xmlns:content').content + end - def target_url(xml) - xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value - end + def published(xml = @xml) + xml.at_xpath('./xmlns:published').content + end - def object_type(xml) - xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym - rescue - :activity - end + def thread?(xml = @xml) + !xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).nil? + end - def verb(xml) - xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym - rescue - :post - end + def thread(xml = @xml) + thr = xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS) + [thr['ref'], thr['href']] + end - def follow_remote_account_service - @follow_remote_account_service ||= FollowRemoteAccountService.new - end + def account?(xml = @xml) + !xml.at_xpath('./xmlns:author').nil? + end - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new - end + def acct(xml = @xml) + username = xml.at_xpath('./xmlns:author/xmlns:name').content + url = xml.at_xpath('./xmlns:author/xmlns:uri').content + domain = Addressable::URI.parse(url).host - def remove_status_service - @remove_status_service ||= RemoveStatusService.new + "#{username}@#{domain}" + end end end |