about summary refs log tree commit diff
path: root/app/models/trends
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2022-02-25 00:34:14 +0100
committerGitHub <noreply@github.com>2022-02-25 00:34:14 +0100
commit27965ce5edff20db2de1dd233c88f8393bb0da0b (patch)
tree6714a950c1b9facc8c7bd1907e81e777257e5538 /app/models/trends
parenta29a982eaa0536a741b43ffb3397c74e3abe7196 (diff)
Add trending statuses (#17431)
* Add trending statuses

* Fix dangling items with stale scores in localized sets

* Various fixes and improvements

- Change approve_all/reject_all to approve_accounts/reject_accounts
- Change Trends::Query methods to not mutate the original query
- Change Trends::Query#skip to offset
- Change follow recommendations to be refreshed in a transaction

* Add tests for trending statuses filtering behaviour

* Fix not applying filtering scope in controller
Diffstat (limited to 'app/models/trends')
-rw-r--r--app/models/trends/base.rb20
-rw-r--r--app/models/trends/links.rb52
-rw-r--r--app/models/trends/preview_card_batch.rb65
-rw-r--r--app/models/trends/preview_card_filter.rb46
-rw-r--r--app/models/trends/preview_card_provider_batch.rb33
-rw-r--r--app/models/trends/preview_card_provider_filter.rb49
-rw-r--r--app/models/trends/query.rb106
-rw-r--r--app/models/trends/status_batch.rb65
-rw-r--r--app/models/trends/status_filter.rb46
-rw-r--r--app/models/trends/statuses.rb142
-rw-r--r--app/models/trends/tag_batch.rb37
-rw-r--r--app/models/trends/tag_filter.rb60
-rw-r--r--app/models/trends/tags.rb36
13 files changed, 704 insertions, 53 deletions
diff --git a/app/models/trends/base.rb b/app/models/trends/base.rb
index b767dcb1a..7ed13228d 100644
--- a/app/models/trends/base.rb
+++ b/app/models/trends/base.rb
@@ -2,6 +2,7 @@
 
 class Trends::Base
   include Redisable
+  include LanguagesHelper
 
   class_attribute :default_options
 
@@ -32,8 +33,8 @@ class Trends::Base
     raise NotImplementedError
   end
 
-  def get(*)
-    raise NotImplementedError
+  def query
+    Trends::Query.new(key_prefix, klass)
   end
 
   def score(id)
@@ -72,6 +73,21 @@ class Trends::Base
     redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
   end
 
+  # @param [Integer] id
+  # @param [Float] score
+  # @param [Hash<String, Boolean>] subsets
+  def add_to_and_remove_from_subsets(id, score, subsets = {})
+    subsets.each_key do |subset|
+      key = [key_prefix, subset].compact.join(':')
+
+      if score.positive? && subsets[subset]
+        redis.zadd(key, score, id)
+      else
+        redis.zrem(key, id)
+      end
+    end
+  end
+
   private
 
   def used_key(at_time)
diff --git a/app/models/trends/links.rb b/app/models/trends/links.rb
index a0d65138b..62308e706 100644
--- a/app/models/trends/links.rb
+++ b/app/models/trends/links.rb
@@ -4,8 +4,8 @@ class Trends::Links < Trends::Base
   PREFIX = 'trending_links'
 
   self.default_options = {
-    threshold: 15,
-    review_threshold: 10,
+    threshold: 5,
+    review_threshold: 3,
     max_score_cooldown: 2.days.freeze,
     max_score_halflife: 8.hours.freeze,
   }
@@ -27,12 +27,6 @@ class Trends::Links < Trends::Base
     record_used_id(preview_card.id, at_time)
   end
 
-  def get(allowed, limit)
-    preview_card_ids = currently_trending_ids(allowed, limit)
-    preview_cards = PreviewCard.where(id: preview_card_ids).index_by(&:id)
-    preview_card_ids.map { |id| preview_cards[id] }.compact
-  end
-
   def refresh(at_time = Time.now.utc)
     preview_cards = PreviewCard.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq)
     calculate_scores(preview_cards, at_time)
@@ -42,7 +36,7 @@ class Trends::Links < Trends::Base
   def request_review
     preview_cards = PreviewCard.where(id: currently_trending_ids(false, -1))
 
-    preview_cards_requiring_review = preview_cards.filter_map do |preview_card|
+    preview_cards.filter_map do |preview_card|
       next unless would_be_trending?(preview_card.id) && !preview_card.trendable? && preview_card.requires_review_notification?
 
       if preview_card.provider.nil?
@@ -53,12 +47,6 @@ class Trends::Links < Trends::Base
 
       preview_card
     end
-
-    return if preview_cards_requiring_review.empty?
-
-    User.staff.includes(:account).find_each do |user|
-      AdminMailer.new_trending_links(user.account, preview_cards_requiring_review).deliver_later! if user.allows_trending_tag_emails?
-    end
   end
 
   protected
@@ -67,6 +55,10 @@ class Trends::Links < Trends::Base
     PREFIX
   end
 
+  def klass
+    PreviewCard
+  end
+
   private
 
   def calculate_scores(preview_cards, at_time)
@@ -96,17 +88,27 @@ class Trends::Links < Trends::Base
 
       decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 
-      if decaying_score.zero?
-        redis.zrem("#{PREFIX}:all", preview_card.id)
-        redis.zrem("#{PREFIX}:allowed", preview_card.id)
-      else
-        redis.zadd("#{PREFIX}:all", decaying_score, preview_card.id)
+      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
+        all: true,
+        allowed: preview_card.trendable?,
+      })
 
-        if preview_card.trendable?
-          redis.zadd("#{PREFIX}:allowed", decaying_score, preview_card.id)
-        else
-          redis.zrem("#{PREFIX}:allowed", preview_card.id)
-        end
+      next unless valid_locale?(preview_card.language)
+
+      add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
+        "all:#{preview_card.language}" => true,
+        "allowed:#{preview_card.language}" => preview_card.trendable?,
+      })
+    end
+
+    # Clean up localized sets by calculating the intersection with the main
+    # set. We do this instead of just deleting the localized sets to avoid
+    # having moments where the API returns empty results
+
+    redis.pipelined do
+      Trends.available_locales.each do |locale|
+        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
       end
     end
   end
diff --git a/app/models/trends/preview_card_batch.rb b/app/models/trends/preview_card_batch.rb
new file mode 100644
index 000000000..b1d682910
--- /dev/null
+++ b/app/models/trends/preview_card_batch.rb
@@ -0,0 +1,65 @@
+# frozen_string_literal: true
+
+class Trends::PreviewCardBatch
+  include ActiveModel::Model
+  include Authorization
+
+  attr_accessor :preview_card_ids, :action, :current_account, :precision
+
+  def save
+    case action
+    when 'approve'
+      approve!
+    when 'approve_providers'
+      approve_providers!
+    when 'reject'
+      reject!
+    when 'reject_providers'
+      reject_providers!
+    end
+  end
+
+  private
+
+  def preview_cards
+    @preview_cards ||= PreviewCard.where(id: preview_card_ids)
+  end
+
+  def preview_card_providers
+    @preview_card_providers ||= preview_cards.map(&:domain).uniq.map { |domain| PreviewCardProvider.matching_domain(domain) || PreviewCardProvider.new(domain: domain) }
+  end
+
+  def approve!
+    preview_cards.each { |preview_card| authorize(preview_card, :review?) }
+    preview_cards.update_all(trendable: true)
+  end
+
+  def approve_providers!
+    preview_card_providers.each do |provider|
+      authorize(provider, :review?)
+      provider.update(trendable: true, reviewed_at: action_time)
+    end
+
+    # Reset any individual overrides
+    preview_cards.update_all(trendable: nil)
+  end
+
+  def reject!
+    preview_cards.each { |preview_card| authorize(preview_card, :review?) }
+    preview_cards.update_all(trendable: false)
+  end
+
+  def reject_providers!
+    preview_card_providers.each do |provider|
+      authorize(provider, :review?)
+      provider.update(trendable: false, reviewed_at: action_time)
+    end
+
+    # Reset any individual overrides
+    preview_cards.update_all(trendable: nil)
+  end
+
+  def action_time
+    @action_time ||= Time.now.utc
+  end
+end
diff --git a/app/models/trends/preview_card_filter.rb b/app/models/trends/preview_card_filter.rb
new file mode 100644
index 000000000..25add58c8
--- /dev/null
+++ b/app/models/trends/preview_card_filter.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+class Trends::PreviewCardFilter
+  KEYS = %i(
+    trending
+    locale
+  ).freeze
+
+  attr_reader :params
+
+  def initialize(params)
+    @params = params
+  end
+
+  def results
+    scope = PreviewCard.unscoped
+
+    params.each do |key, value|
+      next if %w(page locale).include?(key.to_s)
+
+      scope.merge!(scope_for(key, value.to_s.strip)) if value.present?
+    end
+
+    scope
+  end
+
+  private
+
+  def scope_for(key, value)
+    case key.to_s
+    when 'trending'
+      trending_scope(value)
+    else
+      raise "Unknown filter: #{key}"
+    end
+  end
+
+  def trending_scope(value)
+    scope = Trends.links.query
+
+    scope = scope.in_locale(@params[:locale].to_s) if @params[:locale].present?
+    scope = scope.allowed if value == 'allowed'
+
+    scope.to_arel
+  end
+end
diff --git a/app/models/trends/preview_card_provider_batch.rb b/app/models/trends/preview_card_provider_batch.rb
new file mode 100644
index 000000000..062720c81
--- /dev/null
+++ b/app/models/trends/preview_card_provider_batch.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+class Trends::PreviewCardProviderBatch
+  include ActiveModel::Model
+  include Authorization
+
+  attr_accessor :preview_card_provider_ids, :action, :current_account
+
+  def save
+    case action
+    when 'approve'
+      approve!
+    when 'reject'
+      reject!
+    end
+  end
+
+  private
+
+  def preview_card_providers
+    PreviewCardProvider.where(id: preview_card_provider_ids)
+  end
+
+  def approve!
+    preview_card_providers.each { |provider| authorize(provider, :review?) }
+    preview_card_providers.update_all(trendable: true, reviewed_at: Time.now.utc)
+  end
+
+  def reject!
+    preview_card_providers.each { |provider| authorize(provider, :review?) }
+    preview_card_providers.update_all(trendable: false, reviewed_at: Time.now.utc)
+  end
+end
diff --git a/app/models/trends/preview_card_provider_filter.rb b/app/models/trends/preview_card_provider_filter.rb
new file mode 100644
index 000000000..abfdd07e8
--- /dev/null
+++ b/app/models/trends/preview_card_provider_filter.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+class Trends::PreviewCardProviderFilter
+  KEYS = %i(
+    status
+  ).freeze
+
+  attr_reader :params
+
+  def initialize(params)
+    @params = params
+  end
+
+  def results
+    scope = PreviewCardProvider.unscoped
+
+    params.each do |key, value|
+      next if key.to_s == 'page'
+
+      scope.merge!(scope_for(key, value.to_s.strip)) if value.present?
+    end
+
+    scope.order(domain: :asc)
+  end
+
+  private
+
+  def scope_for(key, value)
+    case key.to_s
+    when 'status'
+      status_scope(value)
+    else
+      raise "Unknown filter: #{key}"
+    end
+  end
+
+  def status_scope(value)
+    case value.to_s
+    when 'approved'
+      PreviewCardProvider.trendable
+    when 'rejected'
+      PreviewCardProvider.not_trendable
+    when 'pending_review'
+      PreviewCardProvider.pending_review
+    else
+      raise "Unknown status: #{value}"
+    end
+  end
+end
diff --git a/app/models/trends/query.rb b/app/models/trends/query.rb
new file mode 100644
index 000000000..64a4c0c1f
--- /dev/null
+++ b/app/models/trends/query.rb
@@ -0,0 +1,106 @@
+# frozen_string_literal: true
+
+class Trends::Query
+  include Redisable
+  include Enumerable
+
+  attr_reader :prefix, :klass, :loaded
+
+  alias loaded? loaded
+
+  def initialize(prefix, klass)
+    @prefix  = prefix
+    @klass   = klass
+    @records = []
+    @loaded  = false
+    @allowed = false
+    @limit   = -1
+    @offset  = 0
+  end
+
+  def allowed!
+    @allowed = true
+    self
+  end
+
+  def allowed
+    clone.allowed!
+  end
+
+  def in_locale!(value)
+    @locale = value
+    self
+  end
+
+  def in_locale(value)
+    clone.in_locale!(value)
+  end
+
+  def offset!(value)
+    @offset = value
+    self
+  end
+
+  def offset(value)
+    clone.offset!(value)
+  end
+
+  def limit!(value)
+    @limit = value
+    self
+  end
+
+  def limit(value)
+    clone.limit!(value)
+  end
+
+  def records
+    load
+    @records
+  end
+
+  delegate :each, :empty?, :first, :last, to: :records
+
+  def to_ary
+    records.dup
+  end
+
+  alias to_a to_ary
+
+  def to_arel
+    tmp_ids = ids
+
+    if tmp_ids.empty?
+      klass.none
+    else
+      klass.joins("join unnest(array[#{tmp_ids.join(',')}]) with ordinality as x (id, ordering) on #{klass.table_name}.id = x.id").reorder('x.ordering')
+    end
+  end
+
+  private
+
+  def key
+    [@prefix, @allowed ? 'allowed' : 'all', @locale].compact.join(':')
+  end
+
+  def load
+    unless loaded?
+      @records = perform_queries
+      @loaded  = true
+    end
+
+    self
+  end
+
+  def ids
+    redis.zrevrange(key, @offset, @limit.positive? ? @limit - 1 : @limit).map(&:to_i)
+  end
+
+  def perform_queries
+    apply_scopes(to_arel).to_a
+  end
+
+  def apply_scopes(scope)
+    scope
+  end
+end
diff --git a/app/models/trends/status_batch.rb b/app/models/trends/status_batch.rb
new file mode 100644
index 000000000..78d93bed4
--- /dev/null
+++ b/app/models/trends/status_batch.rb
@@ -0,0 +1,65 @@
+# frozen_string_literal: true
+
+class Trends::StatusBatch
+  include ActiveModel::Model
+  include Authorization
+
+  attr_accessor :status_ids, :action, :current_account
+
+  def save
+    case action
+    when 'approve'
+      approve!
+    when 'approve_accounts'
+      approve_accounts!
+    when 'reject'
+      reject!
+    when 'reject_accounts'
+      reject_accounts!
+    end
+  end
+
+  private
+
+  def statuses
+    @statuses ||= Status.where(id: status_ids)
+  end
+
+  def status_accounts
+    @status_accounts ||= Account.where(id: statuses.map(&:account_id).uniq)
+  end
+
+  def approve!
+    statuses.each { |status| authorize(status, :review?) }
+    statuses.update_all(trendable: true)
+  end
+
+  def approve_accounts!
+    status_accounts.each do |account|
+      authorize(account, :review?)
+      account.update(trendable: true, reviewed_at: action_time)
+    end
+
+    # Reset any individual overrides
+    statuses.update_all(trendable: nil)
+  end
+
+  def reject!
+    statuses.each { |status| authorize(status, :review?) }
+    statuses.update_all(trendable: false)
+  end
+
+  def reject_accounts!
+    status_accounts.each do |account|
+      authorize(account, :review?)
+      account.update(trendable: false, reviewed_at: action_time)
+    end
+
+    # Reset any individual overrides
+    statuses.update_all(trendable: nil)
+  end
+
+  def action_time
+    @action_time ||= Time.now.utc
+  end
+end
diff --git a/app/models/trends/status_filter.rb b/app/models/trends/status_filter.rb
new file mode 100644
index 000000000..7c453e339
--- /dev/null
+++ b/app/models/trends/status_filter.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+class Trends::StatusFilter
+  KEYS = %i(
+    trending
+    locale
+  ).freeze
+
+  attr_reader :params
+
+  def initialize(params)
+    @params = params
+  end
+
+  def results
+    scope = Status.unscoped.kept
+
+    params.each do |key, value|
+      next if %w(page locale).include?(key.to_s)
+
+      scope.merge!(scope_for(key, value.to_s.strip)) if value.present?
+    end
+
+    scope
+  end
+
+  private
+
+  def scope_for(key, value)
+    case key.to_s
+    when 'trending'
+      trending_scope(value)
+    else
+      raise "Unknown filter: #{key}"
+    end
+  end
+
+  def trending_scope(value)
+    scope = Trends.statuses.query
+
+    scope = scope.in_locale(@params[:locale].to_s) if @params[:locale].present?
+    scope = scope.allowed if value == 'allowed'
+
+    scope.to_arel
+  end
+end
diff --git a/app/models/trends/statuses.rb b/app/models/trends/statuses.rb
new file mode 100644
index 000000000..e785413ec
--- /dev/null
+++ b/app/models/trends/statuses.rb
@@ -0,0 +1,142 @@
+# frozen_string_literal: true
+
+class Trends::Statuses < Trends::Base
+  PREFIX = 'trending_statuses'
+
+  self.default_options = {
+    threshold: 5,
+    review_threshold: 3,
+    score_halflife: 2.hours.freeze,
+  }
+
+  class Query < Trends::Query
+    def filtered_for!(account)
+      @account = account
+      self
+    end
+
+    def filtered_for(account)
+      clone.filtered_for!(account)
+    end
+
+    private
+
+    def apply_scopes(scope)
+      scope.includes(:account)
+    end
+
+    def perform_queries
+      return super if @account.nil?
+
+      statuses        = super
+      account_ids     = statuses.map(&:account_id)
+      account_domains = statuses.map(&:account_domain)
+
+      preloaded_relations = {
+        blocking: Account.blocking_map(account_ids, @account.id),
+        blocked_by: Account.blocked_by_map(account_ids, @account.id),
+        muting: Account.muting_map(account_ids, @account.id),
+        following: Account.following_map(account_ids, @account.id),
+        domain_blocking_by_domain: Account.domain_blocking_map_by_domain(account_domains, @account.id),
+      }
+
+      statuses.reject { |status| StatusFilter.new(status, @account, preloaded_relations).filtered? }
+    end
+  end
+
+  def register(status, at_time = Time.now.utc)
+    add(status.proper, status.account_id, at_time) if eligible?(status)
+  end
+
+  def add(status, _account_id, at_time = Time.now.utc)
+    # We rely on the total reblogs and favourites count, so we
+    # don't record which account did the what and when here
+
+    record_used_id(status.id, at_time)
+  end
+
+  def query
+    Query.new(key_prefix, klass)
+  end
+
+  def refresh(at_time = Time.now.utc)
+    statuses = Status.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq).includes(:account, :media_attachments)
+    calculate_scores(statuses, at_time)
+    trim_older_items
+  end
+
+  def request_review
+    statuses = Status.where(id: currently_trending_ids(false, -1)).includes(:account)
+
+    statuses.filter_map do |status|
+      next unless would_be_trending?(status.id) && !status.trendable? && status.requires_review_notification?
+
+      status.account.touch(:requested_review_at)
+      status
+    end
+  end
+
+  protected
+
+  def key_prefix
+    PREFIX
+  end
+
+  def klass
+    Status
+  end
+
+  private
+
+  def eligible?(status)
+    original_status = status.proper
+
+    original_status.public_visibility? &&
+      original_status.account.discoverable? && !original_status.account.silenced? &&
+      original_status.spoiler_text.blank? && !original_status.sensitive? && !original_status.reply?
+  end
+
+  def calculate_scores(statuses, at_time)
+    redis.pipelined do
+      statuses.each do |status|
+        expected  = 1.0
+        observed  = (status.reblogs_count + status.favourites_count).to_f
+
+        score = begin
+          if expected > observed || observed < options[:threshold]
+            0
+          else
+            ((observed - expected)**2) / expected
+          end
+        end
+
+        decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
+
+        add_to_and_remove_from_subsets(status.id, decaying_score, {
+          all: true,
+          allowed: status.trendable? && status.account.discoverable?,
+        })
+
+        next unless valid_locale?(status.language)
+
+        add_to_and_remove_from_subsets(status.id, decaying_score, {
+          "all:#{status.language}" => true,
+          "allowed:#{status.language}" => status.trendable? && status.account.discoverable?,
+        })
+      end
+
+      # Clean up localized sets by calculating the intersection with the main
+      # set. We do this instead of just deleting the localized sets to avoid
+      # having moments where the API returns empty results
+
+      Trends.available_locales.each do |locale|
+        redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+        redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
+      end
+    end
+  end
+
+  def would_be_trending?(id)
+    score(id) > score_at_rank(options[:review_threshold] - 1)
+  end
+end
diff --git a/app/models/trends/tag_batch.rb b/app/models/trends/tag_batch.rb
new file mode 100644
index 000000000..16ee08c06
--- /dev/null
+++ b/app/models/trends/tag_batch.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+class Trends::TagBatch
+  include ActiveModel::Model
+  include Authorization
+
+  attr_accessor :tag_ids, :action, :current_account
+
+  def save
+    case action
+    when 'approve'
+      approve!
+    when 'reject'
+      reject!
+    end
+  end
+
+  private
+
+  def tags
+    Tag.where(id: tag_ids)
+  end
+
+  def approve!
+    tags.each { |tag| authorize(tag, :review?) }
+    tags.update_all(trendable: true, reviewed_at: action_time)
+  end
+
+  def reject!
+    tags.each { |tag| authorize(tag, :review?) }
+    tags.update_all(trendable: false, reviewed_at: action_time)
+  end
+
+  def action_time
+    @action_time ||= Time.now.utc
+  end
+end
diff --git a/app/models/trends/tag_filter.rb b/app/models/trends/tag_filter.rb
new file mode 100644
index 000000000..3b142efc4
--- /dev/null
+++ b/app/models/trends/tag_filter.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+class Trends::TagFilter
+  KEYS = %i(
+    trending
+    status
+  ).freeze
+
+  attr_reader :params
+
+  def initialize(params)
+    @params = params
+  end
+
+  def results
+    scope = begin
+      if params[:status] == 'pending_review'
+        Tag.unscoped
+      else
+        trending_scope
+      end
+    end
+
+    params.each do |key, value|
+      next if key.to_s == 'page'
+
+      scope.merge!(scope_for(key, value.to_s.strip)) if value.present?
+    end
+
+    scope
+  end
+
+  private
+
+  def scope_for(key, value)
+    case key.to_s
+    when 'status'
+      status_scope(value)
+    else
+      raise "Unknown filter: #{key}"
+    end
+  end
+
+  def trending_scope
+    Trends.tags.query.to_arel
+  end
+
+  def status_scope(value)
+    case value.to_s
+    when 'approved'
+      Tag.trendable
+    when 'rejected'
+      Tag.not_trendable
+    when 'pending_review'
+      Tag.pending_review
+    else
+      raise "Unknown status: #{value}"
+    end
+  end
+end
diff --git a/app/models/trends/tags.rb b/app/models/trends/tags.rb
index 2ea4550df..3caa58815 100644
--- a/app/models/trends/tags.rb
+++ b/app/models/trends/tags.rb
@@ -5,7 +5,7 @@ class Trends::Tags < Trends::Base
 
   self.default_options = {
     threshold: 5,
-    review_threshold: 10,
+    review_threshold: 3,
     max_score_cooldown: 2.days.freeze,
     max_score_halflife: 4.hours.freeze,
   }
@@ -29,27 +29,15 @@ class Trends::Tags < Trends::Base
     trim_older_items
   end
 
-  def get(allowed, limit)
-    tag_ids = currently_trending_ids(allowed, limit)
-    tags = Tag.where(id: tag_ids).index_by(&:id)
-    tag_ids.map { |id| tags[id] }.compact
-  end
-
   def request_review
     tags = Tag.where(id: currently_trending_ids(false, -1))
 
-    tags_requiring_review = tags.filter_map do |tag|
+    tags.filter_map do |tag|
       next unless would_be_trending?(tag.id) && !tag.trendable? && tag.requires_review_notification?
 
       tag.touch(:requested_review_at)
       tag
     end
-
-    return if tags_requiring_review.empty?
-
-    User.staff.includes(:account).find_each do |user|
-      AdminMailer.new_trending_tags(user.account, tags_requiring_review).deliver_later! if user.allows_trending_tag_emails?
-    end
   end
 
   protected
@@ -58,6 +46,10 @@ class Trends::Tags < Trends::Base
     PREFIX
   end
 
+  def klass
+    Tag
+  end
+
   private
 
   def calculate_scores(tags, at_time)
@@ -87,18 +79,10 @@ class Trends::Tags < Trends::Base
 
       decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
 
-      if decaying_score.zero?
-        redis.zrem("#{PREFIX}:all", tag.id)
-        redis.zrem("#{PREFIX}:allowed", tag.id)
-      else
-        redis.zadd("#{PREFIX}:all", decaying_score, tag.id)
-
-        if tag.trendable?
-          redis.zadd("#{PREFIX}:allowed", decaying_score, tag.id)
-        else
-          redis.zrem("#{PREFIX}:allowed", tag.id)
-        end
-      end
+      add_to_and_remove_from_subsets(tag.id, decaying_score, {
+        all: true,
+        allowed: tag.trendable?,
+      })
     end
   end