diff options
author | kibigo! <marrus-sh@users.noreply.github.com> | 2017-10-11 10:43:10 -0700 |
---|---|---|
committer | kibigo! <marrus-sh@users.noreply.github.com> | 2017-10-11 10:43:10 -0700 |
commit | 8d6b9ba4946b5b159af0fbd130637a226a286796 (patch) | |
tree | 9def26711682d29338cfa1b081822029a01669eb /app/lib/feed_manager.rb | |
parent | f0a2a6c875e9294f0ea1d4c6bc90529e41a2dc37 (diff) | |
parent | 476e79b8e340c9103352a0799e102e4aca1a5593 (diff) |
Merge upstream 2.0ish #165
Diffstat (limited to 'app/lib/feed_manager.rb')
-rw-r--r-- | app/lib/feed_manager.rb | 130 |
1 files changed, 103 insertions, 27 deletions
diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 3b6796142..f6a694135 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -7,8 +7,13 @@ class FeedManager MAX_ITEMS = 400 - def key(type, id) - "feed:#{type}:#{id}" + # Must be <= MAX_ITEMS or the tracking sets will grow forever + REBLOG_FALLOFF = 40 + + def key(type, id, subtype = nil) + return "feed:#{type}:#{id}" unless subtype + + "feed:#{type}:#{id}:#{subtype}" end def filter?(timeline_type, status, receiver_id) @@ -22,23 +27,36 @@ class FeedManager end def push(timeline_type, account, status) - timeline_key = key(timeline_type, account.id) + return false unless add_to_feed(timeline_type, account, status) - if status.reblog? - # If the original status is within 40 statuses from top, do not re-insert it into the feed - rank = redis.zrevrank(timeline_key, status.reblog_of_id) - return if !rank.nil? && rank < 40 - redis.zadd(timeline_key, status.id, status.reblog_of_id) - else - redis.zadd(timeline_key, status.id, status.id) - trim(timeline_type, account.id) - end + trim(timeline_type, account.id) PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id) + + true + end + + def unpush(timeline_type, account, status) + return false unless remove_from_feed(timeline_type, account, status) + + payload = Oj.dump(event: :delete, payload: status.id.to_s) + Redis.current.publish("timeline:#{account.id}", payload) + + true end def trim(type, account_id) - redis.zremrangebyrank(key(type, account_id), '0', (-(FeedManager::MAX_ITEMS + 1)).to_s) + timeline_key = key(type, account_id) + reblog_key = key(type, account_id, 'reblogs') + # Remove any items past the MAX_ITEMS'th entry in our feed + redis.zremrangebyrank(timeline_key, '0', (-(FeedManager::MAX_ITEMS + 1)).to_s) + + # Get the score of the REBLOG_FALLOFF'th item in our feed, and stop + # tracking anything after it for deduplication purposes. + falloff_rank = FeedManager::REBLOG_FALLOFF - 1 + falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true) + falloff_score = falloff_range&.first&.last&.to_i || 0 + redis.zremrangebyscore(reblog_key, 0, falloff_score) end def push_update_required?(timeline_type, account_id) @@ -54,11 +72,9 @@ class FeedManager query = query.where('id > ?', oldest_home_score) end - redis.pipelined do - query.each do |status| - next if status.direct_visibility? || filter?(:home, status, into_account) - redis.zadd(timeline_key, status.id, status.id) - end + query.each do |status| + next if status.direct_visibility? || filter?(:home, status, into_account) + add_to_feed(:home, into_account, status) end trim(:home, into_account.id) @@ -68,22 +84,28 @@ class FeedManager timeline_key = key(:home, into_account.id) oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0 - from_account.statuses.select('id').where('id > ?', oldest_home_score).reorder(nil).find_in_batches do |statuses| - redis.pipelined do - statuses.each do |status| - redis.zrem(timeline_key, status.id) - redis.zremrangebyscore(timeline_key, status.id, status.id) - end - end + from_account.statuses.select('id, reblog_of_id').where('id > ?', oldest_home_score).reorder(nil).find_each do |status| + unpush(:home, into_account, status) end end def clear_from_timeline(account, target_account) timeline_key = key(:home, account.id) timeline_status_ids = redis.zrange(timeline_key, 0, -1) - target_status_ids = Status.where(id: timeline_status_ids, account: target_account).ids + target_statuses = Status.where(id: timeline_status_ids, account: target_account) + + target_statuses.each do |status| + unpush(:home, account, status) + end + end - redis.zrem(timeline_key, target_status_ids) if target_status_ids.present? + def populate_feed(account) + prepopulate_limit = FeedManager::MAX_ITEMS / 4 + statuses = Status.as_home_timeline(account).order(account_id: :desc).limit(prepopulate_limit) + statuses.reverse_each do |status| + next if filter_from_home?(status, account) + add_to_feed(:home, account, status) + end end private @@ -137,4 +159,58 @@ class FeedManager should_filter end + + # Adds a status to an account's feed, returning true if a status was + # added, and false if it was not added to the feed. Note that this is + # an internal helper: callers must call trim or push updates if + # either action is appropriate. + def add_to_feed(timeline_type, account, status) + timeline_key = key(timeline_type, account.id) + reblog_key = key(timeline_type, account.id, 'reblogs') + + if status.reblog? + # If the original status or a reblog of it is within + # REBLOG_FALLOFF statuses from the top, do not re-insert it into + # the feed + rank = redis.zrevrank(timeline_key, status.reblog_of_id) + return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF + + reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id) + return false unless reblog_rank.nil? + + redis.zadd(timeline_key, status.id, status.id) + redis.zadd(reblog_key, status.id, status.reblog_of_id) + else + redis.zadd(timeline_key, status.id, status.id) + end + + true + end + + # Removes an individual status from a feed, correctly handling cases + # with reblogs, and returning true if a status was removed. As with + # `add_to_feed`, this does not trigger push updates, so callers must + # do so if appropriate. + def remove_from_feed(timeline_type, account, status) + timeline_key = key(timeline_type, account.id) + reblog_key = key(timeline_type, account.id, 'reblogs') + + if status.reblog? + # 1. If the reblogging status is not in the feed, stop. + status_rank = redis.zrevrank(timeline_key, status.id) + return false if status_rank.nil? + + # 2. Remove the reblogged status from the `:reblogs` zset. + redis.zrem(reblog_key, status.reblog_of_id) + + # 3. Add the reblogged status to the feed using the reblogging + # status' ID as its score, and the reblogged status' ID as its + # value. + redis.zadd(timeline_key, status.id, status.reblog_of_id) + + # 4. Remove the reblogging status from the feed (as normal) + end + + redis.zrem(timeline_key, status.id) + end end |