about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2016-03-08 20:16:11 +0100
committerEugen Rochko <eugen@zeonfederated.com>2016-03-08 20:20:45 +0100
commit6c4c84b161947cb11ad0451a39e26b25be4c93d5 (patch)
treefa2a6f4aaff71fcf76c745a57cb7732102814871
parentfe57f6330f089d023f0fa4db7f7c8a51551d2ee9 (diff)
Distrubute statuses as a fan-out-on-write system, with optional precomputing
-rw-r--r--Gemfile1
-rw-r--r--Gemfile.lock2
-rw-r--r--app/controllers/api/statuses_controller.rb6
-rw-r--r--app/controllers/home_controller.rb3
-rw-r--r--app/models/feed.rb27
-rw-r--r--app/models/status.rb1
-rw-r--r--app/services/fan_out_on_write_service.rb46
-rw-r--r--app/services/precompute_feed_service.rb35
-rw-r--r--config/initializers/redis.rb1
9 files changed, 119 insertions, 3 deletions
diff --git a/Gemfile b/Gemfile
index 7b37ec29c..49ec1888e 100644
--- a/Gemfile
+++ b/Gemfile
@@ -30,6 +30,7 @@ gem 'rails_autolink'
 gem 'doorkeeper'
 gem 'rabl'
 gem 'oj'
+gem 'redis', '~>3.2'
 
 group :development, :test do
   gem 'rspec-rails'
diff --git a/Gemfile.lock b/Gemfile.lock
index a05fad7f0..6550ff101 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -208,6 +208,7 @@ GEM
     rake (10.5.0)
     rdoc (4.2.2)
       json (~> 1.4)
+    redis (3.2.2)
     ref (2.0.0)
     responders (2.1.1)
       railties (>= 4.2.0, < 5.1)
@@ -328,6 +329,7 @@ DEPENDENCIES
   rails (= 4.2.5.1)
   rails_12factor
   rails_autolink
+  redis (~> 3.2)
   rspec-rails
   rubocop
   sass-rails (~> 5.0)
diff --git a/app/controllers/api/statuses_controller.rb b/app/controllers/api/statuses_controller.rb
index 82334a32f..04128537a 100644
--- a/app/controllers/api/statuses_controller.rb
+++ b/app/controllers/api/statuses_controller.rb
@@ -22,10 +22,12 @@ class Api::StatusesController < ApiController
   end
 
   def home
-    @statuses = Status.where(account: [current_user.account] + current_user.account.following).order('created_at desc')
+    feed      = Feed.new(:home, current_user.account)
+    @statuses = feed.get(20, (params[:offset] || 0).to_i)
   end
 
   def mentions
-    @statuses = Status.where(id: Mention.where(account: current_user.account).pluck(:status_id)).order('created_at desc')
+    feed      = Feed.new(:mentions, current_user.account)
+    @statuses = feed.get(20, (params[:offset] || 0).to_i)
   end
 end
diff --git a/app/controllers/home_controller.rb b/app/controllers/home_controller.rb
index 3a3d0ade4..294749a22 100644
--- a/app/controllers/home_controller.rb
+++ b/app/controllers/home_controller.rb
@@ -2,6 +2,7 @@ class HomeController < ApplicationController
   before_action :authenticate_user!
 
   def index
-    @statuses = Status.where(account: ([current_user.account] + current_user.account.following)).where('reblog_of_id IS NULL OR account_id != ?', current_user.account.id).order('created_at desc')
+    feed      = Feed.new(:home, current_user.account)
+    @statuses = feed.get(20, (params[:offset] || 0).to_i)
   end
 end
diff --git a/app/models/feed.rb b/app/models/feed.rb
new file mode 100644
index 000000000..a063ad05b
--- /dev/null
+++ b/app/models/feed.rb
@@ -0,0 +1,27 @@
+class Feed
+  def initialize(type, account)
+    @type    = type
+    @account = account
+  end
+
+  def get(limit, offset = 0)
+    unhydrated = redis.zrevrange(key, offset, limit)
+    status_map = Hash.new
+
+    # If we're after most recent items and none are there, we need to precompute the feed
+    return PrecomputeFeedService.new.(@type, @account).take(limit) if unhydrated.empty? && offset == 0
+
+    Status.where(id: unhydrated).each { |status| status_map[status.id.to_s] = status }
+    return unhydrated.map { |id| status_map[id] }
+  end
+
+  private
+
+  def key
+    "feed:#{@type}:#{@account.id}"
+  end
+
+  def redis
+    $redis
+  end
+end
diff --git a/app/models/status.rb b/app/models/status.rb
index a346ac9b0..7e0c334ec 100644
--- a/app/models/status.rb
+++ b/app/models/status.rb
@@ -67,5 +67,6 @@ class Status < ActiveRecord::Base
 
   after_create do
     self.account.stream_entries.create!(activity: self)
+    FanOutOnWriteService.new.(self)
   end
 end
diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb
new file mode 100644
index 000000000..87a7c55ac
--- /dev/null
+++ b/app/services/fan_out_on_write_service.rb
@@ -0,0 +1,46 @@
+class FanOutOnWriteService < BaseService
+  MAX_FEED_SIZE = 800
+
+  # Push a status into home and mentions feeds
+  # @param [Status] status
+  def call(status)
+    replied_to_user = status.reply? ? status.thread.account : nil
+
+    # Deliver to local self
+    push(:home, status.account.id, status) if status.account.local?
+
+    # Deliver to local followers
+    status.account.followers.each do |follower|
+      next if (status.reply? && !follower.following?(replied_to_user)) || !follower.local?
+      push(:home, follower.id, status)
+    end
+
+    # Deliver to local mentioned
+    status.mentions.each do |mentioned_account|
+      next unless mentioned_account.local?
+      push(:mentions, mentioned_account.id, status)
+    end
+  end
+
+  private
+
+  def push(type, receiver_id, status)
+    redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id)
+    trim(type, receiver_id)
+  end
+
+  def trim(type, receiver_id)
+    return unless redis.zcard(key(type, receiver_id)) > MAX_FEED_SIZE
+
+    last = redis.zrevrange(key(type, receiver_id), MAX_FEED_SIZE - 1, MAX_FEED_SIZE - 1)
+    redis.zremrangebyscore(key(type, receiver_id), '-inf', "(#{last.last}")
+  end
+
+  def key(type, id)
+    "feed:#{type}:#{id}"
+  end
+
+  def redis
+    $redis
+  end
+end
diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb
new file mode 100644
index 000000000..89b034404
--- /dev/null
+++ b/app/services/precompute_feed_service.rb
@@ -0,0 +1,35 @@
+class PrecomputeFeedService < BaseService
+  MAX_FEED_SIZE = 800
+
+  # Fill up a user's home/mentions feed from DB and return it
+  # @param [Symbol] type :home or :mentions
+  # @param [Account] account
+  # @return [Array]
+  def call(type, account)
+    statuses = send(type.to_s, account).order('created_at desc').limit(MAX_FEED_SIZE)
+    statuses.each { |status| push(type, account.id, status) }
+    statuses
+  end
+
+  private
+
+  def push(type, receiver_id, status)
+    redis.zadd(key(type, receiver_id), status.created_at.to_i, status.id)
+  end
+
+  def home(account)
+    Status.where(account: [account] + account.following)
+  end
+
+  def mentions(account)
+    Status.where(id: Mention.where(account: account).pluck(:status_id))
+  end
+
+  def key(type, id)
+    "feed:#{type}:#{id}"
+  end
+
+  def redis
+    $redis
+  end
+end
diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb
new file mode 100644
index 000000000..14f3778c4
--- /dev/null
+++ b/config/initializers/redis.rb
@@ -0,0 +1 @@
+$redis = Redis.new(host: ENV['REDIS_HOST'] || 'localhost', port: ENV['REDIS_PORT'] || 6379)