about summary refs log tree commit diff
diff options
context:
space:
mode:
authorThibaut Girka <thib@sitedethib.com>2019-06-25 22:56:32 +0200
committerThibG <thib@sitedethib.com>2019-06-27 16:44:12 +0200
commitca17bae904783dfb1f4899a533d28a79da0c6fe9 (patch)
treec38565923e7b92c2125452daf35fecf76588f81d
parent2f95adc06fbfae22e8f9b4df212938108f5e295c (diff)
Use a redis-cached feed for the DM timeline
-rw-r--r--app/controllers/api/v1/timelines/direct_controller.rb10
-rw-r--r--app/lib/feed_manager.rb40
-rw-r--r--app/models/direct_feed.rb31
-rw-r--r--app/services/batched_remove_status_service.rb4
-rw-r--r--app/services/fan_out_on_write_service.rb7
-rw-r--r--app/services/precompute_feed_service.rb1
-rw-r--r--app/services/remove_status_service.rb4
-rw-r--r--app/workers/feed_insert_worker.rb11
-rw-r--r--app/workers/scheduler/feed_cleanup_scheduler.rb5
9 files changed, 100 insertions, 13 deletions
diff --git a/app/controllers/api/v1/timelines/direct_controller.rb b/app/controllers/api/v1/timelines/direct_controller.rb
index d8a76d153..6e98e9cac 100644
--- a/app/controllers/api/v1/timelines/direct_controller.rb
+++ b/app/controllers/api/v1/timelines/direct_controller.rb
@@ -27,16 +27,18 @@ class Api::V1::Timelines::DirectController < Api::BaseController
   end
 
   def direct_timeline_statuses
-    # this query requires built in pagination.
-    Status.as_direct_timeline(
-      current_account,
+    account_direct_feed.get(
       limit_param(DEFAULT_STATUSES_LIMIT),
       params[:max_id],
       params[:since_id],
-      true # returns array of cache_ids object
+      params[:min_id]
     )
   end
 
+  def account_direct_feed
+    DirectFeed.new(current_account)
+  end
+
   def insert_pagination_headers
     set_pagination_headers(next_path, prev_path)
   end
diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb
index ddcbdf6da..59767cdfe 100644
--- a/app/lib/feed_manager.rb
+++ b/app/lib/feed_manager.rb
@@ -22,6 +22,8 @@ class FeedManager
       filter_from_home?(status, receiver_id)
     elsif timeline_type == :mentions
       filter_from_mentions?(status, receiver_id)
+    elsif timeline_type == :direct
+      filter_from_direct?(status, receiver_id)
     else
       false
     end
@@ -59,6 +61,18 @@ class FeedManager
     true
   end
 
+  def push_to_direct(account, status)
+    return false unless add_to_feed(:direct, account.id, status)
+    trim(:direct, account.id)
+    PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}")
+    true
+  end
+
+  def unpush_from_direct(account, status)
+    return false unless remove_from_feed(:direct, account.id, status)
+    redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s))
+  end
+
   def trim(type, account_id)
     timeline_key = key(type, account_id)
     reblog_key   = key(type, account_id, 'reblogs')
@@ -142,6 +156,27 @@ class FeedManager
     end
   end
 
+  def populate_direct_feed(account)
+    added  = 0
+    limit  = FeedManager::MAX_ITEMS / 2
+    max_id = nil
+
+    loop do
+      statuses = Status.as_direct_timeline(account, limit, max_id)
+
+      break if statuses.empty?
+
+      statuses.each do |status|
+        next if filter_from_direct?(status, account)
+        added += 1 if add_to_feed(:direct, account.id, status)
+      end
+
+      break unless added.zero?
+
+      max_id = statuses.last.id
+    end
+  end
+
   private
 
   def push_update_required?(timeline_id)
@@ -199,6 +234,11 @@ class FeedManager
     should_filter
   end
 
+  def filter_from_direct?(status, receiver_id)
+    return false if receiver_id == status.account_id
+    filter_from_mentions?(status, receiver_id)
+  end
+
   def phrase_filtered?(status, receiver_id, context)
     active_filters = Rails.cache.fetch("filters:#{receiver_id}") { CustomFilter.where(account_id: receiver_id).active_irreversible.to_a }.to_a
 
diff --git a/app/models/direct_feed.rb b/app/models/direct_feed.rb
new file mode 100644
index 000000000..c0b8a0a35
--- /dev/null
+++ b/app/models/direct_feed.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+class DirectFeed < Feed
+  include Redisable
+
+  def initialize(account)
+    @type    = :direct
+    @id      = account.id
+    @account = account
+  end
+
+  def get(limit, max_id = nil, since_id = nil, min_id = nil)
+    unless redis.exists("account:#{@account.id}:regeneration")
+      statuses = super
+      return statuses unless statuses.empty?
+    end
+    from_database(limit, max_id, since_id, min_id)
+  end
+
+  private
+
+  def from_database(limit, max_id, since_id, min_id)
+    loop do
+      statuses = Status.as_direct_timeline(@account, limit, max_id, since_id, min_id)
+      return statuses if statuses.empty?
+      max_id = statuses.last.id
+      statuses = statuses.reject { |status| FeedManager.instance.filter?(:direct, status, @account.id) }
+      return statuses unless statuses.empty?
+    end
+  end
+end
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index 02f7076f7..2fe009c91 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -107,9 +107,9 @@ class BatchedRemoveStatusService < BaseService
     payload = @json_payloads[status.id]
     redis.pipelined do
       @mentions[status.id].each do |mention|
-        redis.publish("timeline:direct:#{mention.account.id}", payload) if mention.account.local?
+        FeedManager.instance.unpush_from_direct(mention.account, status) if mention.account.local?
       end
-      redis.publish("timeline:direct:#{status.account.id}", payload) if status.account.local?
+      FeedManager.instance.unpush_from_direct(status.account, status) if status.account.local?
     end
   end
 
diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb
index b66dc342e..cf433d8a6 100644
--- a/app/services/fan_out_on_write_service.rb
+++ b/app/services/fan_out_on_write_service.rb
@@ -37,6 +37,7 @@ class FanOutOnWriteService < BaseService
   def deliver_to_self(status)
     Rails.logger.debug "Delivering status #{status.id} to author"
     FeedManager.instance.push_to_home(status.account, status)
+    FeedManager.instance.push_to_direct(status.account, status) if status.direct_visibility?
   end
 
   def deliver_to_followers(status)
@@ -98,11 +99,9 @@ class FanOutOnWriteService < BaseService
   def deliver_to_direct_timelines(status)
     Rails.logger.debug "Delivering status #{status.id} to direct timelines"
 
-    status.mentions.includes(:account).each do |mention|
-      Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
+    FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account|
+      [status.id, account.id, :direct]
     end
-
-    Redis.current.publish("timeline:direct:#{status.account.id}", @payload) if status.account.local?
   end
 
   def deliver_to_own_conversation(status)
diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb
index 076dedaca..029c2f6e5 100644
--- a/app/services/precompute_feed_service.rb
+++ b/app/services/precompute_feed_service.rb
@@ -3,6 +3,7 @@
 class PrecomputeFeedService < BaseService
   def call(account)
     FeedManager.instance.populate_feed(account)
+    FeedManager.instance.populate_direct_feed(account)
   ensure
     Redis.current.del("account:#{account.id}:regeneration")
   end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index 98972fc70..9d5d0fc14 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -48,6 +48,7 @@ class RemoveStatusService < BaseService
 
   def remove_from_self
     FeedManager.instance.unpush_from_home(@account, @status)
+    FeedManager.instance.unpush_from_direct(@account, @status) if @status.direct_visibility?
   end
 
   def remove_from_followers
@@ -159,9 +160,8 @@ class RemoveStatusService < BaseService
 
   def remove_from_direct
     @mentions.each do |mention|
-      Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local?
+      FeedManager.instance.unpush_from_direct(mention.account, @status) if mention.account.local?
     end
-    Redis.current.publish("timeline:direct:#{@account.id}", @payload) if @account.local?
   end
 
   def lock_options
diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb
index 1ae3c877b..546f5c0c2 100644
--- a/app/workers/feed_insert_worker.rb
+++ b/app/workers/feed_insert_worker.rb
@@ -13,6 +13,8 @@ class FeedInsertWorker
     when :list
       @list     = List.find(id)
       @follower = @list.account
+    when :direct
+      @account  = Account.find(id)
     end
 
     check_and_insert
@@ -29,7 +31,12 @@ class FeedInsertWorker
   def feed_filtered?
     # Note: Lists are a variation of home, so the filtering rules
     # of home apply to both
-    FeedManager.instance.filter?(:home, @status, @follower.id)
+    case @type
+    when :home, :list
+      FeedManager.instance.filter?(:home, @status, @follower.id)
+    when :direct
+      FeedManager.instance.filter?(:direct, @status, @account.id)
+    end
   end
 
   def perform_push
@@ -38,6 +45,8 @@ class FeedInsertWorker
       FeedManager.instance.push_to_home(@follower, @status)
     when :list
       FeedManager.instance.push_to_list(@list, @status)
+    when :direct
+      FeedManager.instance.push_to_direct(@account, @status)
     end
   end
 end
diff --git a/app/workers/scheduler/feed_cleanup_scheduler.rb b/app/workers/scheduler/feed_cleanup_scheduler.rb
index bf5e20757..4933f1753 100644
--- a/app/workers/scheduler/feed_cleanup_scheduler.rb
+++ b/app/workers/scheduler/feed_cleanup_scheduler.rb
@@ -9,6 +9,7 @@ class Scheduler::FeedCleanupScheduler
   def perform
     clean_home_feeds!
     clean_list_feeds!
+    clean_direct_feeds!
   end
 
   private
@@ -21,6 +22,10 @@ class Scheduler::FeedCleanupScheduler
     clean_feeds!(inactive_list_ids, :list)
   end
 
+  def clean_direct_feeds!
+    clean_feeds!(inactive_account_ids, :direct)
+  end
+
   def clean_feeds!(ids, type)
     reblogged_id_sets = {}