diff options
-rw-r--r-- | app/controllers/api/v1/accounts/relationships_controller.rb | 5 | ||||
-rw-r--r-- | app/lib/feed_manager.rb | 128 | ||||
-rw-r--r-- | app/models/feed.rb | 2 | ||||
-rw-r--r-- | app/services/batched_remove_status_service.rb | 37 | ||||
-rw-r--r-- | app/services/precompute_feed_service.rb | 38 | ||||
-rw-r--r-- | app/services/remove_status_service.rb | 8 | ||||
-rw-r--r-- | db/migrate/20170920024819_status_ids_to_timestamp_ids.rb | 32 | ||||
-rw-r--r-- | db/migrate/20170920032311_fix_reblogs_in_feeds.rb | 63 | ||||
-rw-r--r-- | db/schema.rb | 2 | ||||
-rw-r--r-- | lib/mastodon/timestamp_ids.rb | 126 | ||||
-rw-r--r-- | lib/tasks/db.rake | 56 | ||||
-rw-r--r-- | spec/lib/feed_manager_spec.rb | 109 | ||||
-rw-r--r-- | spec/models/feed_spec.rb | 2 | ||||
-rw-r--r-- | spec/services/batched_remove_status_service_spec.rb | 3 | ||||
-rw-r--r-- | spec/services/precompute_feed_service_spec.rb | 2 |
15 files changed, 509 insertions, 104 deletions
diff --git a/app/controllers/api/v1/accounts/relationships_controller.rb b/app/controllers/api/v1/accounts/relationships_controller.rb index a88cf2021..91a942d75 100644 --- a/app/controllers/api/v1/accounts/relationships_controller.rb +++ b/app/controllers/api/v1/accounts/relationships_controller.rb @@ -7,7 +7,10 @@ class Api::V1::Accounts::RelationshipsController < Api::BaseController respond_to :json def index - @accounts = Account.where(id: account_ids).select('id') + accounts = Account.where(id: account_ids).select('id') + # .where doesn't guarantee that our results are in the same order + # we requested them, so return the "right" order to the requestor. + @accounts = accounts.index_by(&:id).values_at(*account_ids) render json: @accounts, each_serializer: REST::RelationshipSerializer, relationships: relationships end diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index b1ae11084..c509c5702 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -7,8 +7,13 @@ class FeedManager MAX_ITEMS = 400 - def key(type, id) - "feed:#{type}:#{id}" + # Must be <= MAX_ITEMS or the tracking sets will grow forever + REBLOG_FALLOFF = 40 + + def key(type, id, subtype = nil) + return "feed:#{type}:#{id}" unless subtype + + "feed:#{type}:#{id}:#{subtype}" end def filter?(timeline_type, status, receiver_id) @@ -22,23 +27,36 @@ class FeedManager end def push(timeline_type, account, status) - timeline_key = key(timeline_type, account.id) + return false unless add_to_feed(timeline_type, account, status) - if status.reblog? - # If the original status is within 40 statuses from top, do not re-insert it into the feed - rank = redis.zrevrank(timeline_key, status.reblog_of_id) - return if !rank.nil? && rank < 40 - redis.zadd(timeline_key, status.id, status.reblog_of_id) - else - redis.zadd(timeline_key, status.id, status.id) - trim(timeline_type, account.id) - end + trim(timeline_type, account.id) PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id) + + true + end + + def unpush(timeline_type, account, status) + return false unless remove_from_feed(timeline_type, account, status) + + payload = Oj.dump(event: :delete, payload: status.id.to_s) + Redis.current.publish("timeline:#{account.id}", payload) + + true end def trim(type, account_id) - redis.zremrangebyrank(key(type, account_id), '0', (-(FeedManager::MAX_ITEMS + 1)).to_s) + timeline_key = key(type, account_id) + reblog_key = key(type, account_id, 'reblogs') + # Remove any items past the MAX_ITEMS'th entry in our feed + redis.zremrangebyrank(timeline_key, '0', (-(FeedManager::MAX_ITEMS + 1)).to_s) + + # Get the score of the REBLOG_FALLOFF'th item in our feed, and stop + # tracking anything after it for deduplication purposes. + falloff_rank = FeedManager::REBLOG_FALLOFF - 1 + falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true) + falloff_score = falloff_range&.first&.last&.to_i || 0 + redis.zremrangebyscore(reblog_key, 0, falloff_score) end def push_update_required?(timeline_type, account_id) @@ -54,11 +72,9 @@ class FeedManager query = query.where('id > ?', oldest_home_score) end - redis.pipelined do - query.each do |status| - next if status.direct_visibility? || filter?(:home, status, into_account) - redis.zadd(timeline_key, status.id, status.id) - end + query.each do |status| + next if status.direct_visibility? || filter?(:home, status, into_account) + add_to_feed(:home, into_account, status) end trim(:home, into_account.id) @@ -69,11 +85,8 @@ class FeedManager oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0 from_account.statuses.select('id').where('id > ?', oldest_home_score).reorder(nil).find_in_batches do |statuses| - redis.pipelined do - statuses.each do |status| - redis.zrem(timeline_key, status.id) - redis.zremrangebyscore(timeline_key, status.id, status.id) - end + statuses.each do |status| + unpush(:home, into_account, status) end end end @@ -81,9 +94,20 @@ class FeedManager def clear_from_timeline(account, target_account) timeline_key = key(:home, account.id) timeline_status_ids = redis.zrange(timeline_key, 0, -1) - target_status_ids = Status.where(id: timeline_status_ids, account: target_account).ids + target_statuses = Status.where(id: timeline_status_ids, account: target_account) - redis.zrem(timeline_key, target_status_ids) if target_status_ids.present? + target_statuses.each do |status| + unpush(:home, account, status) + end + end + + def populate_feed(account) + prepopulate_limit = FeedManager::MAX_ITEMS / 4 + statuses = Status.as_home_timeline(account).order(account_id: :desc).limit(prepopulate_limit) + statuses.reverse_each do |status| + next if filter_from_home?(status, account) + add_to_feed(:home, account, status) + end end private @@ -131,4 +155,58 @@ class FeedManager should_filter end + + # Adds a status to an account's feed, returning true if a status was + # added, and false if it was not added to the feed. Note that this is + # an internal helper: callers must call trim or push updates if + # either action is appropriate. + def add_to_feed(timeline_type, account, status) + timeline_key = key(timeline_type, account.id) + reblog_key = key(timeline_type, account.id, 'reblogs') + + if status.reblog? + # If the original status or a reblog of it is within + # REBLOG_FALLOFF statuses from the top, do not re-insert it into + # the feed + rank = redis.zrevrank(timeline_key, status.reblog_of_id) + return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF + + reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id) + return false unless reblog_rank.nil? + + redis.zadd(timeline_key, status.id, status.id) + redis.zadd(reblog_key, status.id, status.reblog_of_id) + else + redis.zadd(timeline_key, status.id, status.id) + end + + true + end + + # Removes an individual status from a feed, correctly handling cases + # with reblogs, and returning true if a status was removed. As with + # `add_to_feed`, this does not trigger push updates, so callers must + # do so if appropriate. + def remove_from_feed(timeline_type, account, status) + timeline_key = key(timeline_type, account.id) + reblog_key = key(timeline_type, account.id, 'reblogs') + + if status.reblog? + # 1. If the reblogging status is not in the feed, stop. + status_rank = redis.zrevrank(timeline_key, status.id) + return false if status_rank.nil? + + # 2. Remove the reblogged status from the `:reblogs` zset. + redis.zrem(reblog_key, status.reblog_of_id) + + # 3. Add the reblogged status to the feed using the reblogging + # status' ID as its score, and the reblogged status' ID as its + # value. + redis.zadd(timeline_key, status.id, status.reblog_of_id) + + # 4. Remove the reblogging status from the feed (as normal) + end + + redis.zrem(timeline_key, status.id) + end end diff --git a/app/models/feed.rb b/app/models/feed.rb index beb4a8de3..5f7b7877a 100644 --- a/app/models/feed.rb +++ b/app/models/feed.rb @@ -19,7 +19,7 @@ class Feed def from_redis(limit, max_id, since_id) max_id = '+inf' if max_id.blank? since_id = '-inf' if since_id.blank? - unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:last).map(&:to_i) + unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i) Status.where(id: unhydrated).cache_ids end diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index 2fd623922..5d83771c9 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -29,7 +29,7 @@ class BatchedRemoveStatusService < BaseService statuses.group_by(&:account_id).each do |_, account_statuses| account = account_statuses.first.account - unpush_from_home_timelines(account_statuses) + unpush_from_home_timelines(account, account_statuses) if account.local? batch_stream_entries(account, account_statuses) @@ -72,14 +72,15 @@ class BatchedRemoveStatusService < BaseService end end - def unpush_from_home_timelines(statuses) - account = statuses.first.account - recipients = account.followers.local.pluck(:id) + def unpush_from_home_timelines(account, statuses) + recipients = account.followers.local.to_a - recipients << account.id if account.local? + recipients << account if account.local? - recipients.each do |follower_id| - unpush(follower_id, statuses) + recipients.each do |follower| + statuses.each do |status| + FeedManager.instance.unpush(:home, follower, status) + end end end @@ -109,28 +110,6 @@ class BatchedRemoveStatusService < BaseService end end - def unpush(follower_id, statuses) - key = FeedManager.instance.key(:home, follower_id) - - originals = statuses.reject(&:reblog?) - reblogs = statuses.select(&:reblog?) - - # Quickly remove all originals - redis.pipelined do - originals.each do |status| - redis.zremrangebyscore(key, status.id, status.id) - redis.publish("timeline:#{follower_id}", @json_payloads[status.id]) - end - end - - # For reblogs, re-add original status to feed, unless the reblog - # was not in the feed in the first place - reblogs.each do |status| - redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil? - redis.publish("timeline:#{follower_id}", @json_payloads[status.id]) - end - end - def redis Redis.current end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index 85635a008..36aabaa00 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -1,43 +1,7 @@ # frozen_string_literal: true class PrecomputeFeedService < BaseService - LIMIT = FeedManager::MAX_ITEMS / 4 - def call(account) - @account = account - populate_feed - end - - private - - attr_reader :account - - def populate_feed - pairs = statuses.reverse_each.lazy.reject(&method(:status_filtered?)).map(&method(:process_status)).to_a - - redis.pipelined do - redis.zadd(account_home_key, pairs) if pairs.any? - redis.del("account:#{@account.id}:regeneration") - end - end - - def process_status(status) - [status.id, status.reblog? ? status.reblog_of_id : status.id] - end - - def status_filtered?(status) - FeedManager.instance.filter?(:home, status, account.id) - end - - def account_home_key - FeedManager.instance.key(:home, account.id) - end - - def statuses - Status.as_home_timeline(account).order(account_id: :desc).limit(LIMIT) - end - - def redis - Redis.current + FeedManager.instance.populate_feed(account) end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 14f24908c..96d9208cc 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -102,13 +102,7 @@ class RemoveStatusService < BaseService end def unpush(type, receiver, status) - if status.reblog? && !redis.zscore(FeedManager.instance.key(type, receiver.id), status.reblog_of_id).nil? - redis.zadd(FeedManager.instance.key(type, receiver.id), status.reblog_of_id, status.reblog_of_id) - else - redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) - end - - Redis.current.publish("timeline:#{receiver.id}", @payload) + FeedManager.instance.unpush(type, receiver, status) end def remove_from_hashtags diff --git a/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb b/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb new file mode 100644 index 000000000..5d15817bd --- /dev/null +++ b/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb @@ -0,0 +1,32 @@ +class StatusIdsToTimestampIds < ActiveRecord::Migration[5.1] + def up + # Prepare the function we will use to generate IDs. + Rake::Task['db:define_timestamp_id'].execute + + # Set up the statuses.id column to use our timestamp-based IDs. + ActiveRecord::Base.connection.execute(<<~SQL) + ALTER TABLE statuses + ALTER COLUMN id + SET DEFAULT timestamp_id('statuses') + SQL + + # Make sure we have a sequence to use. + Rake::Task['db:ensure_id_sequences_exist'].execute + end + + def down + # Revert the column to the old method of just using the sequence + # value for new IDs. Set the current ID sequence to the maximum + # existing ID, such that the next sequence will be one higher. + + # We lock the table during this so that the ID won't get clobbered, + # but ID is indexed, so this should be a fast operation. + ActiveRecord::Base.connection.execute(<<~SQL) + LOCK statuses; + SELECT setval('statuses_id_seq', (SELECT MAX(id) FROM statuses)); + ALTER TABLE statuses + ALTER COLUMN id + SET DEFAULT nextval('statuses_id_seq');" + SQL + end +end diff --git a/db/migrate/20170920032311_fix_reblogs_in_feeds.rb b/db/migrate/20170920032311_fix_reblogs_in_feeds.rb new file mode 100644 index 000000000..c813ecd46 --- /dev/null +++ b/db/migrate/20170920032311_fix_reblogs_in_feeds.rb @@ -0,0 +1,63 @@ +class FixReblogsInFeeds < ActiveRecord::Migration[5.1] + def up + redis = Redis.current + fm = FeedManager.instance + + # find_each is batched on the database side. + User.includes(:account).find_each do |user| + account = user.account + + # Old scheme: + # Each user's feed zset had a series of score:value entries, + # where "regular" statuses had the same score and value (their + # ID). Reblogs had a score of the reblogging status' ID, and a + # value of the reblogged status' ID. + + # New scheme: + # The feed contains only entries with the same score and value. + # Reblogs result in the reblogging status being added to the + # feed, with an entry in a reblog tracking zset (where the score + # is once again set to the reblogging status' ID, and the value + # is set to the reblogged status' ID). This is safe for Redis' + # float coersion because in this reblog tracking zset, we only + # need the rebloggging status' ID to be able to stop tracking + # entries after they have gotten too far down the feed, which + # does not require an exact value. + + # So, first, we iterate over the user's feed to find any reblogs. + timeline_key = fm.key(:home, account.id) + reblog_key = fm.key(:home, account.id, 'reblogs') + redis.zrange(timeline_key, 0, -1, with_scores: true).each do |entry| + next if entry[0] == entry[1] + + # The score and value don't match, so this is a reblog. + # (note that we're transitioning from IDs < 53 bits so we + # don't have to worry about the loss of precision) + + reblogged_id, reblogging_id = entry + + # Remove the old entry + redis.zrem(timeline_key, reblogged_id) + + # Add a new one for the reblogging status + redis.zadd(timeline_key, reblogging_id, reblogging_id) + + # Track the fact that this was a reblog + redis.zadd(reblog_key, reblogging_id, reblogged_id) + end + end + end + + def down + # We *deliberately* do nothing here. This means that reverting + # this and the associated changes to the FeedManager code could + # allow one superfluous reblog of any given status, but in the case + # where things have gone wrong and a revert is necessary, this + # appears preferable to requiring a database hit for every status + # in every users' feed simply to revert. + + # Note that this is operating under the assumption that entries + # with >53-bit IDs have already been entered. Otherwise, we could + # just use the data in Redis to reverse this transition. + end +end diff --git a/db/schema.rb b/db/schema.rb index 2cb105553..00cc24bae 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -321,7 +321,7 @@ ActiveRecord::Schema.define(version: 20170927215609) do t.index ["account_id", "status_id"], name: "index_status_pins_on_account_id_and_status_id", unique: true end - create_table "statuses", force: :cascade do |t| + create_table "statuses", id: :bigint, default: -> { "timestamp_id('statuses'::text)" }, force: :cascade do |t| t.string "uri" t.text "text", default: "", null: false t.datetime "created_at", null: false diff --git a/lib/mastodon/timestamp_ids.rb b/lib/mastodon/timestamp_ids.rb new file mode 100644 index 000000000..d49b5c1b5 --- /dev/null +++ b/lib/mastodon/timestamp_ids.rb @@ -0,0 +1,126 @@ +# frozen_string_literal: true + +module Mastodon + module TimestampIds + def self.define_timestamp_id + conn = ActiveRecord::Base.connection + + # Make sure we don't already have a `timestamp_id` function. + unless conn.execute(<<~SQL).values.first.first + SELECT EXISTS( + SELECT * FROM pg_proc WHERE proname = 'timestamp_id' + ); + SQL + # The function doesn't exist, so we'll define it. + conn.execute(<<~SQL) + CREATE OR REPLACE FUNCTION timestamp_id(table_name text) + RETURNS bigint AS + $$ + DECLARE + time_part bigint; + sequence_base bigint; + tail bigint; + BEGIN + -- Our ID will be composed of the following: + -- 6 bytes (48 bits) of millisecond-level timestamp + -- 2 bytes (16 bits) of sequence data + + -- The 'sequence data' is intended to be unique within a + -- given millisecond, yet obscure the 'serial number' of + -- this row. + + -- To do this, we hash the following data: + -- * Table name (if provided, skipped if not) + -- * Secret salt (should not be guessable) + -- * Timestamp (again, millisecond-level granularity) + + -- We then take the first two bytes of that value, and add + -- the lowest two bytes of the table ID sequence number + -- (`table_name`_id_seq). This means that even if we insert + -- two rows at the same millisecond, they will have + -- distinct 'sequence data' portions. + + -- If this happens, and an attacker can see both such IDs, + -- they can determine which of the two entries was inserted + -- first, but not the total number of entries in the table + -- (even mod 2**16). + + -- The table name is included in the hash to ensure that + -- different tables derive separate sequence bases so rows + -- inserted in the same millisecond in different tables do + -- not reveal the table ID sequence number for one another. + + -- The secret salt is included in the hash to ensure that + -- external users cannot derive the sequence base given the + -- timestamp and table name, which would allow them to + -- compute the table ID sequence number. + + time_part := ( + -- Get the time in milliseconds + ((date_part('epoch', now()) * 1000))::bigint + -- And shift it over two bytes + << 16); + + sequence_base := ( + 'x' || + -- Take the first two bytes (four hex characters) + substr( + -- Of the MD5 hash of the data we documented + md5(table_name || + '#{SecureRandom.hex(16)}' || + time_part::text + ), + 1, 4 + ) + -- And turn it into a bigint + )::bit(16)::bigint; + + -- Finally, add our sequence number to our base, and chop + -- it to the last two bytes + tail := ( + (sequence_base + nextval(table_name || '_id_seq')) + & 65535); + + -- Return the time part and the sequence part. OR appears + -- faster here than addition, but they're equivalent: + -- time_part has no trailing two bytes, and tail is only + -- the last two bytes. + RETURN time_part | tail; + END + $$ LANGUAGE plpgsql VOLATILE; + SQL + end + end + + def self.ensure_id_sequences_exist + conn = ActiveRecord::Base.connection + + # Find tables using timestamp IDs. + default_regex = /timestamp_id\('(?<seq_prefix>\w+)'/ + conn.tables.each do |table| + # We're only concerned with "id" columns. + next unless (id_col = conn.columns(table).find { |col| col.name == 'id' }) + + # And only those that are using timestamp_id. + next unless (data = default_regex.match(id_col.default_function)) + + seq_name = data[:seq_prefix] + '_id_seq' + # If we were on Postgres 9.5+, we could do CREATE SEQUENCE IF + # NOT EXISTS, but we can't depend on that. Instead, catch the + # possible exception and ignore it. + # Note that seq_name isn't a column name, but it's a + # relation, like a column, and follows the same quoting rules + # in Postgres. + conn.execute(<<~SQL) + DO $$ + BEGIN + CREATE SEQUENCE #{conn.quote_column_name(seq_name)}; + EXCEPTION WHEN duplicate_table THEN + -- Do nothing, we have the sequence already. + END + $$ LANGUAGE plpgsql; + SQL + end + end + end +end diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake index 7a055bf25..66468d999 100644 --- a/lib/tasks/db.rake +++ b/lib/tasks/db.rake @@ -1,5 +1,36 @@ # frozen_string_literal: true +require Rails.root.join('lib', 'mastodon', 'timestamp_ids') + +def each_schema_load_environment + # If we're in development, also run this for the test environment. + # This is a somewhat hacky way to do this, so here's why: + # 1. We have to define this before we load the schema, or we won't + # have a timestamp_id function when we get to it in the schema. + # 2. db:setup calls db:schema:load_if_ruby, which calls + # db:schema:load, which we define above as having a prerequisite + # of this task. + # 3. db:schema:load ends up running + # ActiveRecord::Tasks::DatabaseTasks.load_schema_current, which + # calls a private method `each_current_configuration`, which + # explicitly also does the loading for the `test` environment + # if the current environment is `development`, so we end up + # needing to do the same, and we can't even use the same method + # to do it. + + if Rails.env == 'development' + test_conf = ActiveRecord::Base.configurations['test'] + if test_conf['database']&.present? + ActiveRecord::Base.establish_connection(:test) + yield + + ActiveRecord::Base.establish_connection(Rails.env.to_sym) + end + end + + yield +end + namespace :db do namespace :migrate do desc 'Setup the db or migrate depending on state of db' @@ -16,4 +47,29 @@ namespace :db do end end end + + # Before we load the schema, define the timestamp_id function. + # Idiomatically, we might do this in a migration, but then it + # wouldn't end up in schema.rb, so we'd need to figure out a way to + # get it in before doing db:setup as well. This is simpler, and + # ensures it's always in place. + Rake::Task['db:schema:load'].enhance ['db:define_timestamp_id'] + + # After we load the schema, make sure we have sequences for each + # table using timestamp IDs. + Rake::Task['db:schema:load'].enhance do + Rake::Task['db:ensure_id_sequences_exist'].invoke + end + + task :define_timestamp_id do + each_schema_load_environment do + Mastodon::TimestampIds.define_timestamp_id + end + end + + task :ensure_id_sequences_exist do + each_schema_load_environment do + Mastodon::TimestampIds.ensure_id_sequences_exist + end + end end diff --git a/spec/lib/feed_manager_spec.rb b/spec/lib/feed_manager_spec.rb index 22439cf35..923894ccb 100644 --- a/spec/lib/feed_manager_spec.rb +++ b/spec/lib/feed_manager_spec.rb @@ -1,6 +1,10 @@ require 'rails_helper' RSpec.describe FeedManager do + it 'tracks at least as many statuses as reblogs' do + expect(FeedManager::REBLOG_FALLOFF).to be <= FeedManager::MAX_ITEMS + end + describe '#key' do subject { FeedManager.instance.key(:home, 1) } @@ -150,5 +154,110 @@ RSpec.describe FeedManager do expect(Redis.current.zcard("feed:type:#{account.id}")).to eq FeedManager::MAX_ITEMS end + + it 'sends push updates for non-home timelines' do + account = Fabricate(:account) + status = Fabricate(:status) + allow(Redis.current).to receive_messages(publish: nil) + + FeedManager.instance.push('type', account, status) + + expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", any_args).at_least(:once) + end + + context 'reblogs' do + it 'saves reblogs of unseen statuses' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + reblog = Fabricate(:status, reblog: reblogged) + + expect(FeedManager.instance.push('type', account, reblog)).to be true + end + + it 'does not save a new reblog of a recent status' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + reblog = Fabricate(:status, reblog: reblogged) + + FeedManager.instance.push('type', account, reblogged) + + expect(FeedManager.instance.push('type', account, reblog)).to be false + end + + it 'saves a new reblog of an old status' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + reblog = Fabricate(:status, reblog: reblogged) + + FeedManager.instance.push('type', account, reblogged) + + # Fill the feed with intervening statuses + FeedManager::REBLOG_FALLOFF.times do + FeedManager.instance.push('type', account, Fabricate(:status)) + end + + expect(FeedManager.instance.push('type', account, reblog)).to be true + end + + it 'does not save a new reblog of a recently-reblogged status' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) } + + # The first reblog will be accepted + FeedManager.instance.push('type', account, reblogs.first) + + # The second reblog should be ignored + expect(FeedManager.instance.push('type', account, reblogs.last)).to be false + end + + it 'saves a new reblog of a long-ago-reblogged status' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) } + + # The first reblog will be accepted + FeedManager.instance.push('type', account, reblogs.first) + + # Fill the feed with intervening statuses + FeedManager::REBLOG_FALLOFF.times do + FeedManager.instance.push('type', account, Fabricate(:status)) + end + + # The second reblog should also be accepted + expect(FeedManager.instance.push('type', account, reblogs.last)).to be true + end + end + end + + describe '#unpush' do + it 'leaves a reblogged status when deleting the reblog' do + account = Fabricate(:account) + reblogged = Fabricate(:status) + status = Fabricate(:status, reblog: reblogged) + + FeedManager.instance.push('type', account, status) + + # The reblogging status should show up under normal conditions. + expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [status.id.to_s] + + FeedManager.instance.unpush('type', account, status) + + # Because we couldn't tell if the status showed up any other way, + # we had to stick the reblogged status in by itself. + expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [reblogged.id.to_s] + end + + it 'sends push updates' do + account = Fabricate(:account) + status = Fabricate(:status) + FeedManager.instance.push('type', account, status) + + allow(Redis.current).to receive_messages(publish: nil) + FeedManager.instance.unpush('type', account, status) + + deletion = Oj.dump(event: :delete, payload: status.id.to_s) + expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", deletion) + end end end diff --git a/spec/models/feed_spec.rb b/spec/models/feed_spec.rb index 1c377c17f..5433f44bd 100644 --- a/spec/models/feed_spec.rb +++ b/spec/models/feed_spec.rb @@ -9,7 +9,7 @@ RSpec.describe Feed, type: :model do Fabricate(:status, account: account, id: 3) Fabricate(:status, account: account, id: 10) Redis.current.zadd(FeedManager.instance.key(:home, account.id), - [[4, 'deleted'], [3, 'val3'], [2, 'val2'], [1, 'val1']]) + [[4, 4], [3, 3], [2, 2], [1, 1]]) feed = Feed.new(:home, account) results = feed.get(3) diff --git a/spec/services/batched_remove_status_service_spec.rb b/spec/services/batched_remove_status_service_spec.rb index f5c9adfb5..c82c45e09 100644 --- a/spec/services/batched_remove_status_service_spec.rb +++ b/spec/services/batched_remove_status_service_spec.rb @@ -5,7 +5,7 @@ RSpec.describe BatchedRemoveStatusService do let!(:alice) { Fabricate(:account) } let!(:bob) { Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://example.com/salmon') } - let!(:jeff) { Fabricate(:account) } + let!(:jeff) { Fabricate(:user).account } let!(:hank) { Fabricate(:account, username: 'hank', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox') } let(:status1) { PostStatusService.new.call(alice, 'Hello @bob@example.com') } @@ -19,6 +19,7 @@ RSpec.describe BatchedRemoveStatusService do stub_request(:post, 'http://example.com/inbox').to_return(status: 200) Fabricate(:subscription, account: alice, callback_url: 'http://example.com/push', confirmed: true, expires_at: 30.days.from_now) + jeff.user.update(current_sign_in_at: Time.now) jeff.follow!(alice) hank.follow!(alice) diff --git a/spec/services/precompute_feed_service_spec.rb b/spec/services/precompute_feed_service_spec.rb index dbd08ac1b..d1ef6c184 100644 --- a/spec/services/precompute_feed_service_spec.rb +++ b/spec/services/precompute_feed_service_spec.rb @@ -16,7 +16,7 @@ RSpec.describe PrecomputeFeedService do subject.call(account) - expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id + expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id.to_f end it 'does not raise an error even if it could not find any status' do |