about summary refs log tree commit diff
path: root/app/lib/activitypub/activity.rb
blob: 3aeecb4ec00d9349b15c2507f42a08857cb71334 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# frozen_string_literal: true

class ActivityPub::Activity
  include JsonLdHelper
  include Redisable

  SUPPORTED_TYPES = %w(Note Question).freeze
  CONVERTED_TYPES = %w(Image Audio Video Article Page Event).freeze

  def initialize(json, account, **options)
    @json    = json
    @account = account
    @object  = @json['object']
    @options = options
  end

  def perform
    raise NotImplementedError
  end

  class << self
    def factory(json, account, **options)
      @json = json
      klass&.new(json, account, **options)
    end

    private

    def klass
      case @json['type']
      when 'Create'
        ActivityPub::Activity::Create
      when 'Announce'
        ActivityPub::Activity::Announce
      when 'Delete'
        ActivityPub::Activity::Delete
      when 'Follow'
        ActivityPub::Activity::Follow
      when 'Like'
        ActivityPub::Activity::Like
      when 'Block'
        ActivityPub::Activity::Block
      when 'Update'
        ActivityPub::Activity::Update
      when 'Undo'
        ActivityPub::Activity::Undo
      when 'Accept'
        ActivityPub::Activity::Accept
      when 'Reject'
        ActivityPub::Activity::Reject
      when 'Flag'
        ActivityPub::Activity::Flag
      when 'Add'
        ActivityPub::Activity::Add
      when 'Remove'
        ActivityPub::Activity::Remove
      when 'Move'
        ActivityPub::Activity::Move
      end
    end
  end

  protected

  def status_from_uri(uri)
    ActivityPub::TagManager.instance.uri_to_resource(uri, Status)
  end

  def account_from_uri(uri)
    ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
  end

  def object_uri
    @object_uri ||= begin
      str = value_or_id(@object)

      if str&.start_with?('bear:')
        Addressable::URI.parse(str).query_values['u']
      else
        str
      end
    end
  end

  def unsupported_object_type?
    @object.is_a?(String) || !(supported_object_type? || converted_object_type?)
  end

  def supported_object_type?
    equals_or_includes_any?(@object['type'], SUPPORTED_TYPES)
  end

  def converted_object_type?
    equals_or_includes_any?(@object['type'], CONVERTED_TYPES)
  end

  def distribute(status)
    crawl_links(status)

    notify_about_reblog(status) if reblog_of_local_account?(status) && !reblog_by_following_group_account?(status)
    notify_about_mentions(status)

    # Only continue if the status is supposed to have arrived in real-time.
    # Note that if @options[:override_timestamps] isn't set, the status
    # may have a lower snowflake id than other existing statuses, potentially
    # "hiding" it from paginated API calls
    return unless @options[:override_timestamps] || status.within_realtime_window?

    distribute_to_followers(status)
  end

  def reblog_of_local_account?(status)
    status.reblog? && status.reblog.account.local?
  end

  def reblog_by_following_group_account?(status)
    status.reblog? && status.account.group? && status.reblog.account.following?(status.account)
  end

  def notify_about_reblog(status)
    NotifyService.new.call(status.reblog.account, :reblog, status)
  end

  def notify_about_mentions(status)
    status.active_mentions.includes(:account).each do |mention|
      next unless mention.account.local? && audience_includes?(mention.account)
      NotifyService.new.call(mention.account, :mention, mention)
    end
  end

  def crawl_links(status)
    # Spread out crawling randomly to avoid DDoSing the link
    LinkCrawlWorker.perform_in(rand(1..59).seconds, status.id)
  end

  def distribute_to_followers(status)
    ::DistributionWorker.perform_async(status.id)
  end

  def delete_arrived_first?(uri)
    redis.exists?("delete_upon_arrival:#{@account.id}:#{uri}")
  end

  def delete_later!(uri)
    redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
  end

  def status_from_object
    # If the status is already known, return it
    status = status_from_uri(object_uri)

    return status unless status.nil?

    # If the boosted toot is embedded and it is a self-boost, handle it like a Create
    unless unsupported_object_type?
      actor_id = value_or_id(first_of_value(@object['attributedTo']))

      if actor_id == @account.uri
        return ActivityPub::Activity.factory({ 'type' => 'Create', 'actor' => actor_id, 'object' => @object }, @account).perform
      end
    end

    fetch_remote_original_status
  end

  def dereference_object!
    return unless @object.is_a?(String)

    dereferencer = ActivityPub::Dereferencer.new(@object, permitted_origin: @account.uri, signature_account: signed_fetch_account)

    @object = dereferencer.object unless dereferencer.object.nil?
  end

  def signed_fetch_account
    return Account.find(@options[:delivered_to_account_id]) if @options[:delivered_to_account_id].present?

    first_mentioned_local_account || first_local_follower
  end

  def first_mentioned_local_account
    audience = (as_array(@json['to']) + as_array(@json['cc'])).map { |x| value_or_id(x) }.uniq
    local_usernames = audience.select { |uri| ActivityPub::TagManager.instance.local_uri?(uri) }
                              .map { |uri| ActivityPub::TagManager.instance.uri_to_local_id(uri, :username) }

    return if local_usernames.empty?

    Account.local.where(username: local_usernames).first
  end

  def first_local_follower
    @account.followers.local.first
  end

  def follow_request_from_object
    @follow_request ||= FollowRequest.find_by(target_account: @account, uri: object_uri) unless object_uri.nil?
  end

  def follow_from_object
    @follow ||= ::Follow.find_by(target_account: @account, uri: object_uri) unless object_uri.nil?
  end

  def fetch_remote_original_status
    if object_uri.start_with?('http')
      return if ActivityPub::TagManager.instance.local_uri?(object_uri)
      ActivityPub::FetchRemoteStatusService.new.call(object_uri, id: true, on_behalf_of: @account.followers.local.first)
    elsif @object['url'].present?
      ::FetchRemoteStatusService.new.call(@object['url'])
    end
  end

  def lock_or_return(key, expire_after = 2.hours.seconds)
    yield if redis.set(key, true, nx: true, ex: expire_after)
  ensure
    redis.del(key)
  end

  def lock_or_fail(key, expire_after = 15.minutes.seconds)
    RedisLock.acquire({ redis: Redis.current, key: key, autorelease: expire_after }) do |lock|
      if lock.acquired?
        yield
      else
        raise Mastodon::RaceConditionError
      end
    end
  end

  def fetch?
    !@options[:delivery]
  end

  def followed_by_local_accounts?
    @account.passive_relationships.exists? || @options[:relayed_through_account]&.passive_relationships&.exists?
  end

  def requested_through_relay?
    @options[:relayed_through_account] && Relay.find_by(inbox_url: @options[:relayed_through_account].inbox_url)&.enabled?
  end

  def reject_payload!
    Rails.logger.info("Rejected #{@json['type']} activity #{@json['id']} from #{@account.uri}#{@options[:relayed_through_account] && "via #{@options[:relayed_through_account].uri}"}")
    nil
  end
end