about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--app/controllers/api/v1/accounts/relationships_controller.rb5
-rw-r--r--app/lib/feed_manager.rb128
-rw-r--r--app/models/feed.rb2
-rw-r--r--app/services/batched_remove_status_service.rb37
-rw-r--r--app/services/precompute_feed_service.rb38
-rw-r--r--app/services/remove_status_service.rb8
-rw-r--r--db/migrate/20170920024819_status_ids_to_timestamp_ids.rb32
-rw-r--r--db/migrate/20170920032311_fix_reblogs_in_feeds.rb63
-rw-r--r--db/schema.rb2
-rw-r--r--lib/mastodon/timestamp_ids.rb126
-rw-r--r--lib/tasks/db.rake56
-rw-r--r--spec/lib/feed_manager_spec.rb109
-rw-r--r--spec/models/feed_spec.rb2
-rw-r--r--spec/services/batched_remove_status_service_spec.rb3
-rw-r--r--spec/services/precompute_feed_service_spec.rb2
15 files changed, 509 insertions, 104 deletions
diff --git a/app/controllers/api/v1/accounts/relationships_controller.rb b/app/controllers/api/v1/accounts/relationships_controller.rb
index a88cf2021..91a942d75 100644
--- a/app/controllers/api/v1/accounts/relationships_controller.rb
+++ b/app/controllers/api/v1/accounts/relationships_controller.rb
@@ -7,7 +7,10 @@ class Api::V1::Accounts::RelationshipsController < Api::BaseController
   respond_to :json
 
   def index
-    @accounts = Account.where(id: account_ids).select('id')
+    accounts = Account.where(id: account_ids).select('id')
+    # .where doesn't guarantee that our results are in the same order
+    # we requested them, so return the "right" order to the requestor.
+    @accounts = accounts.index_by(&:id).values_at(*account_ids)
     render json: @accounts, each_serializer: REST::RelationshipSerializer, relationships: relationships
   end
 
diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb
index b1ae11084..c509c5702 100644
--- a/app/lib/feed_manager.rb
+++ b/app/lib/feed_manager.rb
@@ -7,8 +7,13 @@ class FeedManager
 
   MAX_ITEMS = 400
 
-  def key(type, id)
-    "feed:#{type}:#{id}"
+  # Must be <= MAX_ITEMS or the tracking sets will grow forever
+  REBLOG_FALLOFF = 40
+
+  def key(type, id, subtype = nil)
+    return "feed:#{type}:#{id}" unless subtype
+
+    "feed:#{type}:#{id}:#{subtype}"
   end
 
   def filter?(timeline_type, status, receiver_id)
@@ -22,23 +27,36 @@ class FeedManager
   end
 
   def push(timeline_type, account, status)
-    timeline_key = key(timeline_type, account.id)
+    return false unless add_to_feed(timeline_type, account, status)
 
-    if status.reblog?
-      # If the original status is within 40 statuses from top, do not re-insert it into the feed
-      rank = redis.zrevrank(timeline_key, status.reblog_of_id)
-      return if !rank.nil? && rank < 40
-      redis.zadd(timeline_key, status.id, status.reblog_of_id)
-    else
-      redis.zadd(timeline_key, status.id, status.id)
-      trim(timeline_type, account.id)
-    end
+    trim(timeline_type, account.id)
 
     PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id)
+
+    true
+  end
+
+  def unpush(timeline_type, account, status)
+    return false unless remove_from_feed(timeline_type, account, status)
+
+    payload = Oj.dump(event: :delete, payload: status.id.to_s)
+    Redis.current.publish("timeline:#{account.id}", payload)
+
+    true
   end
 
   def trim(type, account_id)
-    redis.zremrangebyrank(key(type, account_id), '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)
+    timeline_key = key(type, account_id)
+    reblog_key = key(type, account_id, 'reblogs')
+    # Remove any items past the MAX_ITEMS'th entry in our feed
+    redis.zremrangebyrank(timeline_key, '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)
+
+    # Get the score of the REBLOG_FALLOFF'th item in our feed, and stop
+    # tracking anything after it for deduplication purposes.
+    falloff_rank = FeedManager::REBLOG_FALLOFF - 1
+    falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true)
+    falloff_score = falloff_range&.first&.last&.to_i || 0
+    redis.zremrangebyscore(reblog_key, 0, falloff_score)
   end
 
   def push_update_required?(timeline_type, account_id)
@@ -54,11 +72,9 @@ class FeedManager
       query = query.where('id > ?', oldest_home_score)
     end
 
-    redis.pipelined do
-      query.each do |status|
-        next if status.direct_visibility? || filter?(:home, status, into_account)
-        redis.zadd(timeline_key, status.id, status.id)
-      end
+    query.each do |status|
+      next if status.direct_visibility? || filter?(:home, status, into_account)
+      add_to_feed(:home, into_account, status)
     end
 
     trim(:home, into_account.id)
@@ -69,11 +85,8 @@ class FeedManager
     oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0
 
     from_account.statuses.select('id').where('id > ?', oldest_home_score).reorder(nil).find_in_batches do |statuses|
-      redis.pipelined do
-        statuses.each do |status|
-          redis.zrem(timeline_key, status.id)
-          redis.zremrangebyscore(timeline_key, status.id, status.id)
-        end
+      statuses.each do |status|
+        unpush(:home, into_account, status)
       end
     end
   end
@@ -81,9 +94,20 @@ class FeedManager
   def clear_from_timeline(account, target_account)
     timeline_key = key(:home, account.id)
     timeline_status_ids = redis.zrange(timeline_key, 0, -1)
-    target_status_ids = Status.where(id: timeline_status_ids, account: target_account).ids
+    target_statuses = Status.where(id: timeline_status_ids, account: target_account)
 
-    redis.zrem(timeline_key, target_status_ids) if target_status_ids.present?
+    target_statuses.each do |status|
+      unpush(:home, account, status)
+    end
+  end
+
+  def populate_feed(account)
+    prepopulate_limit = FeedManager::MAX_ITEMS / 4
+    statuses = Status.as_home_timeline(account).order(account_id: :desc).limit(prepopulate_limit)
+    statuses.reverse_each do |status|
+      next if filter_from_home?(status, account)
+      add_to_feed(:home, account, status)
+    end
   end
 
   private
@@ -131,4 +155,58 @@ class FeedManager
 
     should_filter
   end
+
+  # Adds a status to an account's feed, returning true if a status was
+  # added, and false if it was not added to the feed. Note that this is
+  # an internal helper: callers must call trim or push updates if
+  # either action is appropriate.
+  def add_to_feed(timeline_type, account, status)
+    timeline_key = key(timeline_type, account.id)
+    reblog_key = key(timeline_type, account.id, 'reblogs')
+
+    if status.reblog?
+      # If the original status or a reblog of it is within
+      # REBLOG_FALLOFF statuses from the top, do not re-insert it into
+      # the feed
+      rank = redis.zrevrank(timeline_key, status.reblog_of_id)
+      return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF
+
+      reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id)
+      return false unless reblog_rank.nil?
+
+      redis.zadd(timeline_key, status.id, status.id)
+      redis.zadd(reblog_key, status.id, status.reblog_of_id)
+    else
+      redis.zadd(timeline_key, status.id, status.id)
+    end
+
+    true
+  end
+
+  # Removes an individual status from a feed, correctly handling cases
+  # with reblogs, and returning true if a status was removed. As with
+  # `add_to_feed`, this does not trigger push updates, so callers must
+  # do so if appropriate.
+  def remove_from_feed(timeline_type, account, status)
+    timeline_key = key(timeline_type, account.id)
+    reblog_key = key(timeline_type, account.id, 'reblogs')
+
+    if status.reblog?
+      # 1. If the reblogging status is not in the feed, stop.
+      status_rank = redis.zrevrank(timeline_key, status.id)
+      return false if status_rank.nil?
+
+      # 2. Remove the reblogged status from the `:reblogs` zset.
+      redis.zrem(reblog_key, status.reblog_of_id)
+
+      # 3. Add the reblogged status to the feed using the reblogging
+      # status' ID as its score, and the reblogged status' ID as its
+      # value.
+      redis.zadd(timeline_key, status.id, status.reblog_of_id)
+
+      # 4. Remove the reblogging status from the feed (as normal)
+    end
+
+    redis.zrem(timeline_key, status.id)
+  end
 end
diff --git a/app/models/feed.rb b/app/models/feed.rb
index beb4a8de3..5f7b7877a 100644
--- a/app/models/feed.rb
+++ b/app/models/feed.rb
@@ -19,7 +19,7 @@ class Feed
   def from_redis(limit, max_id, since_id)
     max_id     = '+inf' if max_id.blank?
     since_id   = '-inf' if since_id.blank?
-    unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:last).map(&:to_i)
+    unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
     Status.where(id: unhydrated).cache_ids
   end
 
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index 2fd623922..5d83771c9 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -29,7 +29,7 @@ class BatchedRemoveStatusService < BaseService
     statuses.group_by(&:account_id).each do |_, account_statuses|
       account = account_statuses.first.account
 
-      unpush_from_home_timelines(account_statuses)
+      unpush_from_home_timelines(account, account_statuses)
 
       if account.local?
         batch_stream_entries(account, account_statuses)
@@ -72,14 +72,15 @@ class BatchedRemoveStatusService < BaseService
     end
   end
 
-  def unpush_from_home_timelines(statuses)
-    account    = statuses.first.account
-    recipients = account.followers.local.pluck(:id)
+  def unpush_from_home_timelines(account, statuses)
+    recipients = account.followers.local.to_a
 
-    recipients << account.id if account.local?
+    recipients << account if account.local?
 
-    recipients.each do |follower_id|
-      unpush(follower_id, statuses)
+    recipients.each do |follower|
+      statuses.each do |status|
+        FeedManager.instance.unpush(:home, follower, status)
+      end
     end
   end
 
@@ -109,28 +110,6 @@ class BatchedRemoveStatusService < BaseService
     end
   end
 
-  def unpush(follower_id, statuses)
-    key = FeedManager.instance.key(:home, follower_id)
-
-    originals = statuses.reject(&:reblog?)
-    reblogs   = statuses.select(&:reblog?)
-
-    # Quickly remove all originals
-    redis.pipelined do
-      originals.each do |status|
-        redis.zremrangebyscore(key, status.id, status.id)
-        redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
-      end
-    end
-
-    # For reblogs, re-add original status to feed, unless the reblog
-    # was not in the feed in the first place
-    reblogs.each do |status|
-      redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil?
-      redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
-    end
-  end
-
   def redis
     Redis.current
   end
diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb
index 85635a008..36aabaa00 100644
--- a/app/services/precompute_feed_service.rb
+++ b/app/services/precompute_feed_service.rb
@@ -1,43 +1,7 @@
 # frozen_string_literal: true
 
 class PrecomputeFeedService < BaseService
-  LIMIT = FeedManager::MAX_ITEMS / 4
-
   def call(account)
-    @account = account
-    populate_feed
-  end
-
-  private
-
-  attr_reader :account
-
-  def populate_feed
-    pairs = statuses.reverse_each.lazy.reject(&method(:status_filtered?)).map(&method(:process_status)).to_a
-
-    redis.pipelined do
-      redis.zadd(account_home_key, pairs) if pairs.any?
-      redis.del("account:#{@account.id}:regeneration")
-    end
-  end
-
-  def process_status(status)
-    [status.id, status.reblog? ? status.reblog_of_id : status.id]
-  end
-
-  def status_filtered?(status)
-    FeedManager.instance.filter?(:home, status, account.id)
-  end
-
-  def account_home_key
-    FeedManager.instance.key(:home, account.id)
-  end
-
-  def statuses
-    Status.as_home_timeline(account).order(account_id: :desc).limit(LIMIT)
-  end
-
-  def redis
-    Redis.current
+    FeedManager.instance.populate_feed(account)
   end
 end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index 14f24908c..96d9208cc 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -102,13 +102,7 @@ class RemoveStatusService < BaseService
   end
 
   def unpush(type, receiver, status)
-    if status.reblog? && !redis.zscore(FeedManager.instance.key(type, receiver.id), status.reblog_of_id).nil?
-      redis.zadd(FeedManager.instance.key(type, receiver.id), status.reblog_of_id, status.reblog_of_id)
-    else
-      redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id)
-    end
-
-    Redis.current.publish("timeline:#{receiver.id}", @payload)
+    FeedManager.instance.unpush(type, receiver, status)
   end
 
   def remove_from_hashtags
diff --git a/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb b/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb
new file mode 100644
index 000000000..5d15817bd
--- /dev/null
+++ b/db/migrate/20170920024819_status_ids_to_timestamp_ids.rb
@@ -0,0 +1,32 @@
+class StatusIdsToTimestampIds < ActiveRecord::Migration[5.1]
+  def up
+    # Prepare the function we will use to generate IDs.
+    Rake::Task['db:define_timestamp_id'].execute
+
+    # Set up the statuses.id column to use our timestamp-based IDs.
+    ActiveRecord::Base.connection.execute(<<~SQL)
+      ALTER TABLE statuses
+      ALTER COLUMN id
+      SET DEFAULT timestamp_id('statuses')
+    SQL
+
+    # Make sure we have a sequence to use.
+    Rake::Task['db:ensure_id_sequences_exist'].execute
+  end
+
+  def down
+    # Revert the column to the old method of just using the sequence
+    # value for new IDs. Set the current ID sequence to the maximum
+    # existing ID, such that the next sequence will be one higher.
+
+    # We lock the table during this so that the ID won't get clobbered,
+    # but ID is indexed, so this should be a fast operation.
+    ActiveRecord::Base.connection.execute(<<~SQL)
+      LOCK statuses;
+      SELECT setval('statuses_id_seq', (SELECT MAX(id) FROM statuses));
+      ALTER TABLE statuses
+        ALTER COLUMN id
+        SET DEFAULT nextval('statuses_id_seq');"
+    SQL
+  end
+end
diff --git a/db/migrate/20170920032311_fix_reblogs_in_feeds.rb b/db/migrate/20170920032311_fix_reblogs_in_feeds.rb
new file mode 100644
index 000000000..c813ecd46
--- /dev/null
+++ b/db/migrate/20170920032311_fix_reblogs_in_feeds.rb
@@ -0,0 +1,63 @@
+class FixReblogsInFeeds < ActiveRecord::Migration[5.1]
+  def up
+    redis = Redis.current
+    fm = FeedManager.instance
+
+    # find_each is batched on the database side.
+    User.includes(:account).find_each do |user|
+      account = user.account
+
+      # Old scheme:
+      # Each user's feed zset had a series of score:value entries,
+      # where "regular" statuses had the same score and value (their
+      # ID). Reblogs had a score of the reblogging status' ID, and a
+      # value of the reblogged status' ID.
+
+      # New scheme:
+      # The feed contains only entries with the same score and value.
+      # Reblogs result in the reblogging status being added to the
+      # feed, with an entry in a reblog tracking zset (where the score
+      # is once again set to the reblogging status' ID, and the value
+      # is set to the reblogged status' ID). This is safe for Redis'
+      # float coersion because in this reblog tracking zset, we only
+      # need the rebloggging status' ID to be able to stop tracking
+      # entries after they have gotten too far down the feed, which
+      # does not require an exact value.
+
+      # So, first, we iterate over the user's feed to find any reblogs.
+      timeline_key = fm.key(:home, account.id)
+      reblog_key = fm.key(:home, account.id, 'reblogs')
+      redis.zrange(timeline_key, 0, -1, with_scores: true).each do |entry|
+        next if entry[0] == entry[1]
+
+        # The score and value don't match, so this is a reblog.
+        # (note that we're transitioning from IDs < 53 bits so we
+        # don't have to worry about the loss of precision)
+
+        reblogged_id, reblogging_id = entry
+
+        # Remove the old entry
+        redis.zrem(timeline_key, reblogged_id)
+
+        # Add a new one for the reblogging status
+        redis.zadd(timeline_key, reblogging_id, reblogging_id)
+
+        # Track the fact that this was a reblog
+        redis.zadd(reblog_key, reblogging_id, reblogged_id)
+      end
+    end
+  end
+
+  def down
+    # We *deliberately* do nothing here. This means that reverting
+    # this and the associated changes to the FeedManager code could
+    # allow one superfluous reblog of any given status, but in the case
+    # where things have gone wrong and a revert is necessary, this
+    # appears preferable to requiring a database hit for every status
+    # in every users' feed simply to revert.
+
+    # Note that this is operating under the assumption that entries
+    # with >53-bit IDs have already been entered. Otherwise, we could
+    # just use the data in Redis to reverse this transition.
+  end
+end
diff --git a/db/schema.rb b/db/schema.rb
index 2cb105553..00cc24bae 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -321,7 +321,7 @@ ActiveRecord::Schema.define(version: 20170927215609) do
     t.index ["account_id", "status_id"], name: "index_status_pins_on_account_id_and_status_id", unique: true
   end
 
-  create_table "statuses", force: :cascade do |t|
+  create_table "statuses", id: :bigint, default: -> { "timestamp_id('statuses'::text)" }, force: :cascade do |t|
     t.string "uri"
     t.text "text", default: "", null: false
     t.datetime "created_at", null: false
diff --git a/lib/mastodon/timestamp_ids.rb b/lib/mastodon/timestamp_ids.rb
new file mode 100644
index 000000000..d49b5c1b5
--- /dev/null
+++ b/lib/mastodon/timestamp_ids.rb
@@ -0,0 +1,126 @@
+# frozen_string_literal: true
+
+module Mastodon
+  module TimestampIds
+    def self.define_timestamp_id
+      conn = ActiveRecord::Base.connection
+
+      # Make sure we don't already have a `timestamp_id` function.
+      unless conn.execute(<<~SQL).values.first.first
+        SELECT EXISTS(
+          SELECT * FROM pg_proc WHERE proname = 'timestamp_id'
+        );
+      SQL
+        # The function doesn't exist, so we'll define it.
+        conn.execute(<<~SQL)
+          CREATE OR REPLACE FUNCTION timestamp_id(table_name text)
+          RETURNS bigint AS
+          $$
+            DECLARE
+              time_part bigint;
+              sequence_base bigint;
+              tail bigint;
+            BEGIN
+              -- Our ID will be composed of the following:
+              -- 6 bytes (48 bits) of millisecond-level timestamp
+              -- 2 bytes (16 bits) of sequence data
+
+              -- The 'sequence data' is intended to be unique within a
+              -- given millisecond, yet obscure the 'serial number' of
+              -- this row.
+
+              -- To do this, we hash the following data:
+              -- * Table name (if provided, skipped if not)
+              -- * Secret salt (should not be guessable)
+              -- * Timestamp (again, millisecond-level granularity)
+
+              -- We then take the first two bytes of that value, and add
+              -- the lowest two bytes of the table ID sequence number
+              -- (`table_name`_id_seq). This means that even if we insert
+              -- two rows at the same millisecond, they will have
+              -- distinct 'sequence data' portions.
+
+              -- If this happens, and an attacker can see both such IDs,
+              -- they can determine which of the two entries was inserted
+              -- first, but not the total number of entries in the table
+              -- (even mod 2**16).
+
+              -- The table name is included in the hash to ensure that
+              -- different tables derive separate sequence bases so rows
+              -- inserted in the same millisecond in different tables do
+              -- not reveal the table ID sequence number for one another.
+
+              -- The secret salt is included in the hash to ensure that
+              -- external users cannot derive the sequence base given the
+              -- timestamp and table name, which would allow them to
+              -- compute the table ID sequence number.
+
+              time_part := (
+                -- Get the time in milliseconds
+                ((date_part('epoch', now()) * 1000))::bigint
+                -- And shift it over two bytes
+                << 16);
+
+              sequence_base := (
+                'x' ||
+                -- Take the first two bytes (four hex characters)
+                substr(
+                  -- Of the MD5 hash of the data we documented
+                  md5(table_name ||
+                    '#{SecureRandom.hex(16)}' ||
+                    time_part::text
+                  ),
+                  1, 4
+                )
+              -- And turn it into a bigint
+              )::bit(16)::bigint;
+
+              -- Finally, add our sequence number to our base, and chop
+              -- it to the last two bytes
+              tail := (
+                (sequence_base + nextval(table_name || '_id_seq'))
+                & 65535);
+
+              -- Return the time part and the sequence part. OR appears
+              -- faster here than addition, but they're equivalent:
+              -- time_part has no trailing two bytes, and tail is only
+              -- the last two bytes.
+              RETURN time_part | tail;
+            END
+          $$ LANGUAGE plpgsql VOLATILE;
+        SQL
+      end
+    end
+
+    def self.ensure_id_sequences_exist
+      conn = ActiveRecord::Base.connection
+
+      # Find tables using timestamp IDs.
+      default_regex = /timestamp_id\('(?<seq_prefix>\w+)'/
+      conn.tables.each do |table|
+        # We're only concerned with "id" columns.
+        next unless (id_col = conn.columns(table).find { |col| col.name == 'id' })
+
+        # And only those that are using timestamp_id.
+        next unless (data = default_regex.match(id_col.default_function))
+
+        seq_name = data[:seq_prefix] + '_id_seq'
+        # If we were on Postgres 9.5+, we could do CREATE SEQUENCE IF
+        # NOT EXISTS, but we can't depend on that. Instead, catch the
+        # possible exception and ignore it.
+        # Note that seq_name isn't a column name, but it's a
+        # relation, like a column, and follows the same quoting rules
+        # in Postgres.
+        conn.execute(<<~SQL)
+          DO $$
+            BEGIN
+              CREATE SEQUENCE #{conn.quote_column_name(seq_name)};
+            EXCEPTION WHEN duplicate_table THEN
+              -- Do nothing, we have the sequence already.
+            END
+          $$ LANGUAGE plpgsql;
+        SQL
+      end
+    end
+  end
+end
diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake
index 7a055bf25..66468d999 100644
--- a/lib/tasks/db.rake
+++ b/lib/tasks/db.rake
@@ -1,5 +1,36 @@
 # frozen_string_literal: true
 
+require Rails.root.join('lib', 'mastodon', 'timestamp_ids')
+
+def each_schema_load_environment
+  # If we're in development, also run this for the test environment.
+  # This is a somewhat hacky way to do this, so here's why:
+  # 1. We have to define this before we load the schema, or we won't
+  #    have a timestamp_id function when we get to it in the schema.
+  # 2. db:setup calls db:schema:load_if_ruby, which calls
+  #    db:schema:load, which we define above as having a prerequisite
+  #    of this task.
+  # 3. db:schema:load ends up running
+  #    ActiveRecord::Tasks::DatabaseTasks.load_schema_current, which
+  #    calls a private method `each_current_configuration`, which
+  #    explicitly also does the loading for the `test` environment
+  #    if the current environment is `development`, so we end up
+  #    needing to do the same, and we can't even use the same method
+  #    to do it.
+
+  if Rails.env == 'development'
+    test_conf = ActiveRecord::Base.configurations['test']
+    if test_conf['database']&.present?
+      ActiveRecord::Base.establish_connection(:test)
+      yield
+
+      ActiveRecord::Base.establish_connection(Rails.env.to_sym)
+    end
+  end
+
+  yield
+end
+
 namespace :db do
   namespace :migrate do
     desc 'Setup the db or migrate depending on state of db'
@@ -16,4 +47,29 @@ namespace :db do
       end
     end
   end
+
+  # Before we load the schema, define the timestamp_id function.
+  # Idiomatically, we might do this in a migration, but then it
+  # wouldn't end up in schema.rb, so we'd need to figure out a way to
+  # get it in before doing db:setup as well. This is simpler, and
+  # ensures it's always in place.
+  Rake::Task['db:schema:load'].enhance ['db:define_timestamp_id']
+
+  # After we load the schema, make sure we have sequences for each
+  # table using timestamp IDs.
+  Rake::Task['db:schema:load'].enhance do
+    Rake::Task['db:ensure_id_sequences_exist'].invoke
+  end
+
+  task :define_timestamp_id do
+    each_schema_load_environment do
+      Mastodon::TimestampIds.define_timestamp_id
+    end
+  end
+
+  task :ensure_id_sequences_exist do
+    each_schema_load_environment do
+      Mastodon::TimestampIds.ensure_id_sequences_exist
+    end
+  end
 end
diff --git a/spec/lib/feed_manager_spec.rb b/spec/lib/feed_manager_spec.rb
index 22439cf35..923894ccb 100644
--- a/spec/lib/feed_manager_spec.rb
+++ b/spec/lib/feed_manager_spec.rb
@@ -1,6 +1,10 @@
 require 'rails_helper'
 
 RSpec.describe FeedManager do
+  it 'tracks at least as many statuses as reblogs' do
+    expect(FeedManager::REBLOG_FALLOFF).to be <= FeedManager::MAX_ITEMS
+  end
+
   describe '#key' do
     subject { FeedManager.instance.key(:home, 1) }
 
@@ -150,5 +154,110 @@ RSpec.describe FeedManager do
 
       expect(Redis.current.zcard("feed:type:#{account.id}")).to eq FeedManager::MAX_ITEMS
     end
+
+    it 'sends push updates for non-home timelines' do
+      account = Fabricate(:account)
+      status = Fabricate(:status)
+      allow(Redis.current).to receive_messages(publish: nil)
+
+      FeedManager.instance.push('type', account, status)
+
+      expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", any_args).at_least(:once)
+    end
+
+    context 'reblogs' do
+      it 'saves reblogs of unseen statuses' do
+        account = Fabricate(:account)
+        reblogged = Fabricate(:status)
+        reblog = Fabricate(:status, reblog: reblogged)
+
+        expect(FeedManager.instance.push('type', account, reblog)).to be true
+      end
+
+      it 'does not save a new reblog of a recent status' do
+        account = Fabricate(:account)
+        reblogged = Fabricate(:status)
+        reblog = Fabricate(:status, reblog: reblogged)
+
+        FeedManager.instance.push('type', account, reblogged)
+
+        expect(FeedManager.instance.push('type', account, reblog)).to be false
+      end
+
+      it 'saves a new reblog of an old status' do
+        account = Fabricate(:account)
+        reblogged = Fabricate(:status)
+        reblog = Fabricate(:status, reblog: reblogged)
+
+        FeedManager.instance.push('type', account, reblogged)
+
+        # Fill the feed with intervening statuses
+        FeedManager::REBLOG_FALLOFF.times do
+          FeedManager.instance.push('type', account, Fabricate(:status))
+        end
+
+        expect(FeedManager.instance.push('type', account, reblog)).to be true
+      end
+
+      it 'does not save a new reblog of a recently-reblogged status' do
+        account = Fabricate(:account)
+        reblogged = Fabricate(:status)
+        reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) }
+
+        # The first reblog will be accepted
+        FeedManager.instance.push('type', account, reblogs.first)
+
+        # The second reblog should be ignored
+        expect(FeedManager.instance.push('type', account, reblogs.last)).to be false
+      end
+
+      it 'saves a new reblog of a long-ago-reblogged status' do
+        account = Fabricate(:account)
+        reblogged = Fabricate(:status)
+        reblogs = 2.times.map { Fabricate(:status, reblog: reblogged) }
+
+        # The first reblog will be accepted
+        FeedManager.instance.push('type', account, reblogs.first)
+
+        # Fill the feed with intervening statuses
+        FeedManager::REBLOG_FALLOFF.times do
+          FeedManager.instance.push('type', account, Fabricate(:status))
+        end
+
+        # The second reblog should also be accepted
+        expect(FeedManager.instance.push('type', account, reblogs.last)).to be true
+      end
+    end
+  end
+
+  describe '#unpush' do
+    it 'leaves a reblogged status when deleting the reblog' do
+      account = Fabricate(:account)
+      reblogged = Fabricate(:status)
+      status = Fabricate(:status, reblog: reblogged)
+
+      FeedManager.instance.push('type', account, status)
+
+      # The reblogging status should show up under normal conditions.
+      expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [status.id.to_s]
+
+      FeedManager.instance.unpush('type', account, status)
+
+      # Because we couldn't tell if the status showed up any other way,
+      # we had to stick the reblogged status in by itself.
+      expect(Redis.current.zrange("feed:type:#{account.id}", 0, -1)).to eq [reblogged.id.to_s]
+    end
+
+    it 'sends push updates' do
+      account = Fabricate(:account)
+      status = Fabricate(:status)
+      FeedManager.instance.push('type', account, status)
+
+      allow(Redis.current).to receive_messages(publish: nil)
+      FeedManager.instance.unpush('type', account, status)
+
+      deletion = Oj.dump(event: :delete, payload: status.id.to_s)
+      expect(Redis.current).to have_received(:publish).with("timeline:#{account.id}", deletion)
+    end
   end
 end
diff --git a/spec/models/feed_spec.rb b/spec/models/feed_spec.rb
index 1c377c17f..5433f44bd 100644
--- a/spec/models/feed_spec.rb
+++ b/spec/models/feed_spec.rb
@@ -9,7 +9,7 @@ RSpec.describe Feed, type: :model do
       Fabricate(:status, account: account, id: 3)
       Fabricate(:status, account: account, id: 10)
       Redis.current.zadd(FeedManager.instance.key(:home, account.id),
-                        [[4, 'deleted'], [3, 'val3'], [2, 'val2'], [1, 'val1']])
+                        [[4, 4], [3, 3], [2, 2], [1, 1]])
 
       feed = Feed.new(:home, account)
       results = feed.get(3)
diff --git a/spec/services/batched_remove_status_service_spec.rb b/spec/services/batched_remove_status_service_spec.rb
index f5c9adfb5..c82c45e09 100644
--- a/spec/services/batched_remove_status_service_spec.rb
+++ b/spec/services/batched_remove_status_service_spec.rb
@@ -5,7 +5,7 @@ RSpec.describe BatchedRemoveStatusService do
 
   let!(:alice)  { Fabricate(:account) }
   let!(:bob)    { Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://example.com/salmon') }
-  let!(:jeff)   { Fabricate(:account) }
+  let!(:jeff)   { Fabricate(:user).account }
   let!(:hank)   { Fabricate(:account, username: 'hank', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox') }
 
   let(:status1) { PostStatusService.new.call(alice, 'Hello @bob@example.com') }
@@ -19,6 +19,7 @@ RSpec.describe BatchedRemoveStatusService do
     stub_request(:post, 'http://example.com/inbox').to_return(status: 200)
 
     Fabricate(:subscription, account: alice, callback_url: 'http://example.com/push', confirmed: true, expires_at: 30.days.from_now)
+    jeff.user.update(current_sign_in_at: Time.now)
     jeff.follow!(alice)
     hank.follow!(alice)
 
diff --git a/spec/services/precompute_feed_service_spec.rb b/spec/services/precompute_feed_service_spec.rb
index dbd08ac1b..d1ef6c184 100644
--- a/spec/services/precompute_feed_service_spec.rb
+++ b/spec/services/precompute_feed_service_spec.rb
@@ -16,7 +16,7 @@ RSpec.describe PrecomputeFeedService do
 
       subject.call(account)
 
-      expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id
+      expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq status.id.to_f
     end
 
     it 'does not raise an error even if it could not find any status' do