about summary refs log tree commit diff
path: root/app/services/process_feed_service.rb
blob: cd9bd8e93277bb658960e0a92a89233c083a6abe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class ProcessFeedService < BaseService
  ACTIVITY_NS = 'http://activitystrea.ms/spec/1.0/'.freeze
  THREAD_NS   = 'http://purl.org/syndication/thread/1.0'.freeze

  # Create local statuses from an Atom feed
  # @param [String] body Atom feed
  # @param [Account] account Account this feed belongs to
  # @return [Enumerable] created statuses
  def call(body, account)
    xml = Nokogiri::XML(body)
    update_remote_profile_service.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) unless xml.at_xpath('/xmlns:feed').nil?
    xml.xpath('//xmlns:entry').reverse_each.map { |entry| process_entry(account, entry) }.compact
  end

  private

  def process_entry(account, entry)
    return unless [:note, :comment, :activity].include? object_type(entry)

    status = Status.find_by(uri: activity_id(entry))

    # If we already have a post and the verb is now "delete", we gotta delete it and move on!
    if !status.nil? && verb(entry) == :delete
      delete_post!(status)
      return
    end

    return unless status.nil?

    status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry))

    if verb(entry) == :share
      add_reblog!(entry, status)
    elsif verb(entry) == :post
      if thread_id(entry).nil?
        add_post!(entry, status)
      else
        add_reply!(entry, status)
      end
    else
      return
    end

    # If we added a status, go through accounts it mentions and create respective relations
    # Also record all media attachments for the status and for the reblogged status if present
    unless status.new_record?
      record_remote_mentions(status, entry.xpath('./xmlns:link[@rel="mentioned"]'))
      record_remote_mentions(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:link[@rel="mentioned"]')) if status.reblog?

      process_attachments(entry, status)
      process_attachments(entry.xpath('./activity:object', activity: ACTIVITY_NS), status.reblog) if status.reblog?

      DistributionWorker.perform_async(status.id)
      return status
    end
  end

  def record_remote_mentions(status, links)
    # Here we have to do a reverse lookup of local accounts by their URL!
    # It's not pretty at all! I really wish all these protocols sticked to
    # using acct:username@domain only! It would make things so much easier
    # and tidier

    links.each do |mention_link|
      href_val = mention_link.attribute('href').value

      next if href_val == 'http://activityschema.org/collection/public'

      href = Addressable::URI.parse(href_val)

      if TagManager.instance.local_domain?(href.host)
        # A local user is mentioned
        mentioned_account = Account.find_local(href.path.gsub('/users/', ''))

        unless mentioned_account.nil?
          mentioned_account.mentions.where(status: status).first_or_create(status: status)
          NotificationMailer.mention(mentioned_account, status).deliver_later unless mentioned_account.blocking?(status.account)
        end
      else
        # What to do about remote user?
        # This is kinda dodgy because URLs could change, we don't index them
        mentioned_account = Account.find_by(url: href.to_s)

        if mentioned_account.nil?
          mentioned_account = FetchRemoteAccountService.new.call(href)
        end

        unless mentioned_account.nil?
          mentioned_account.mentions.where(status: status).first_or_create(status: status)
        end
      end
    end
  end

  def process_attachments(entry, status)
    entry.xpath('./xmlns:link[@rel="enclosure"]').each do |enclosure_link|
      next if enclosure_link.attribute('href').nil?

      media = MediaAttachment.where(status: status, remote_url: enclosure_link.attribute('href').value).first

      next unless media.nil?

      media = MediaAttachment.new(account: status.account, status: status, remote_url: enclosure_link.attribute('href').value)
      media.file_remote_url = enclosure_link.attribute('href').value
      media.save
    end
  end

  def add_post!(_entry, status)
    status.save!
  end

  def add_reblog!(entry, status)
    status.reblog = find_original_status(entry, target_id(entry))

    if status.reblog.nil?
      status.reblog = fetch_remote_status(entry)
    end

    if !status.reblog.nil?
      status.save!
      NotificationMailer.reblog(status.reblog, status.account).deliver_later if status.reblog.local? && !status.reblog.account.blocking?(status.account)
    end
  end

  def add_reply!(entry, status)
    status.thread = find_original_status(entry, thread_id(entry))
    status.save!

    if status.thread.nil? && !thread_href(entry).nil?
      ThreadResolveWorker.perform_async(status.id, thread_href(entry))
    end
  end

  def delete_post!(status)
    remove_status_service.call(status)
  end

  def find_original_status(_xml, id)
    return nil if id.nil?

    if TagManager.instance.local_id?(id)
      Status.find(TagManager.instance.unique_tag_to_local_id(id, 'Status'))
    else
      Status.find_by(uri: id)
    end
  end

  def fetch_remote_status(xml)
    username = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:name').content
    url      = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:uri').content
    domain   = Addressable::URI.parse(url).host
    account  = Account.find_remote(username, domain)

    if account.nil?
      account = follow_remote_account_service.call("#{username}@#{domain}")
    end

    status = Status.new(account: account, uri: target_id(xml), text: target_content(xml), url: target_url(xml), created_at: published(xml), updated_at: updated(xml))
    status.thread = find_original_status(xml, thread_id(xml))

    if status.save && status.thread.nil? && !thread_href(xml).nil?
      ThreadResolveWorker.perform_async(status.id, thread_href(xml))
    end

    status
  rescue Goldfinger::Error, HTTP::Error
    nil
  end

  def published(xml)
    xml.at_xpath('./xmlns:published').content
  end

  def updated(xml)
    xml.at_xpath('./xmlns:updated').content
  end

  def content(xml)
    xml.at_xpath('./xmlns:content').try(:content)
  end

  def thread_id(xml)
    xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('ref').value
  rescue
    nil
  end

  def thread_href(xml)
    xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('href').value
  rescue
    nil
  end

  def target_id(xml)
    xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:id').content
  rescue
    nil
  end

  def activity_id(xml)
    xml.at_xpath('./xmlns:id').content
  end

  def activity_link(xml)
    xml.at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value
  rescue
    ''
  end

  def target_content(xml)
    xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:content').content
  end

  def target_url(xml)
    xml.at_xpath('.//activity:object').at_xpath('./xmlns:link[@rel="alternate"]', activity: ACTIVITY_NS).attribute('href').value
  end

  def object_type(xml)
    xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
  rescue
    :activity
  end

  def verb(xml)
    xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
  rescue
    :post
  end

  def follow_remote_account_service
    @follow_remote_account_service ||= FollowRemoteAccountService.new
  end

  def update_remote_profile_service
    @update_remote_profile_service ||= UpdateRemoteProfileService.new
  end

  def remove_status_service
    @remove_status_service ||= RemoveStatusService.new
  end
end