about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2016-11-28 13:36:47 +0100
committerEugen Rochko <eugen@zeonfederated.com>2016-11-28 13:36:47 +0100
commit2d2c81765b0a68446a94a7fcba34758a3aa886c0 (patch)
tree0a0610b95195120bd334ad96f2f308a3bb30295a
parent26287b6e7dd68e03d6f2d6a7051bb9873e3f6c9a (diff)
Adding embedded PuSH server
-rw-r--r--app/controllers/api/push_controller.rb37
-rw-r--r--app/models/account.rb4
-rw-r--r--app/models/subscription.rb29
-rw-r--r--app/services/favourite_service.rb2
-rw-r--r--app/services/follow_service.rb3
-rw-r--r--app/services/post_status_service.rb3
-rw-r--r--app/services/pubsubhubbub/subscribe_service.rb13
-rw-r--r--app/services/pubsubhubbub/unsubscribe_service.rb15
-rw-r--r--app/services/reblog_service.rb2
-rw-r--r--app/services/remove_status_service.rb3
-rw-r--r--app/views/accounts/show.atom.ruby3
-rw-r--r--app/workers/pubsubhubbub/confirmation_worker.rb29
-rw-r--r--app/workers/pubsubhubbub/delivery_worker.rb28
-rw-r--r--app/workers/pubsubhubbub/distribution_worker.rb15
-rw-r--r--app/workers/thread_resolve_worker.rb8
-rw-r--r--config/routes.rb11
-rw-r--r--db/migrate/20161128103007_create_subscriptions.rb15
-rw-r--r--db/schema.rb26
-rw-r--r--spec/controllers/api/push_controller_spec.rb13
-rw-r--r--spec/fabricators/subscription_fabricator.rb6
-rw-r--r--spec/models/subscription_spec.rb5
21 files changed, 262 insertions, 8 deletions
diff --git a/app/controllers/api/push_controller.rb b/app/controllers/api/push_controller.rb
new file mode 100644
index 000000000..78d4e36e6
--- /dev/null
+++ b/app/controllers/api/push_controller.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+class Api::PushController < ApiController
+  def update
+    mode          = params['hub.mode']
+    topic         = params['hub.topic']
+    callback      = params['hub.callback']
+    lease_seconds = params['hub.lease_seconds']
+    secret        = params['hub.secret']
+
+    case mode
+    when 'subscribe'
+      response, status = Pubsubhubbub::SubscribeService.new.call(topic_to_account(topic), callback, secret, lease_seconds)
+    when 'unsubscribe'
+      response, status = Pubsubhubbub::UnsubscribeService.new.call(topic_to_account(topic), callback)
+    else
+      response = "Unknown mode: #{mode}"
+      status   = 422
+    end
+
+    render plain: response, status: status
+  end
+
+  private
+
+  def topic_to_account(topic_url)
+    return if topic_url.blank?
+
+    uri    = Addressable::URI.parse(topic_url)
+    params = Rails.application.routes.recognize_path(uri.path)
+    domain = uri.host + (uri.port ? ":#{uri.port}" : '')
+
+    return unless TagManager.instance.local_domain?(domain) && params[:controller] == 'accounts' && params[:action] == 'show' && params[:format] == 'atom'
+
+    Account.find_local(params[:username])
+  end
+end
diff --git a/app/models/account.rb b/app/models/account.rb
index 65fad2f47..f1a4d4b4f 100644
--- a/app/models/account.rb
+++ b/app/models/account.rb
@@ -44,8 +44,12 @@ class Account < ApplicationRecord
   has_many :block_relationships, class_name: 'Block', foreign_key: 'account_id', dependent: :destroy
   has_many :blocking, -> { order('blocks.id desc') }, through: :block_relationships, source: :target_account
 
+  # Media
   has_many :media_attachments, dependent: :destroy
 
+  # PuSH subscriptions
+  has_many :subscriptions, dependent: :destroy
+
   pg_search_scope :search_for, against: { username: 'A', domain: 'B' }, using: { tsearch: { prefix: true } }
 
   scope :remote, -> { where.not(domain: nil) }
diff --git a/app/models/subscription.rb b/app/models/subscription.rb
new file mode 100644
index 000000000..e968c6675
--- /dev/null
+++ b/app/models/subscription.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+class Subscription < ApplicationRecord
+  MIN_EXPIRATION = 3600 * 24
+  MAX_EXPIRATION = 3600 * 24 * 30
+
+  belongs_to :account
+
+  validates :callback_url, presence: true
+  validates :callback_url, uniqueness: { scope: :account_id }
+
+  scope :active, -> { where(confirmed: true).where('expires_at > ?', Time.now.utc) }
+
+  def lease_seconds=(str)
+    self.expires_at = Time.now.utc + [[MIN_EXPIRATION, str.to_i].max, MAX_EXPIRATION].min.seconds
+  end
+
+  def lease_seconds
+    (expires_at - Time.now.utc).to_i
+  end
+
+  before_validation :set_min_expiration
+
+  private
+
+  def set_min_expiration
+    self.lease_seconds = 0 unless expires_at
+  end
+end
diff --git a/app/services/favourite_service.rb b/app/services/favourite_service.rb
index 781b03b40..2f280e03f 100644
--- a/app/services/favourite_service.rb
+++ b/app/services/favourite_service.rb
@@ -7,7 +7,9 @@ class FavouriteService < BaseService
   # @return [Favourite]
   def call(account, status)
     favourite = Favourite.create!(account: account, status: status)
+
     HubPingWorker.perform_async(account.id)
+    Pubsubhubbub::DistributionWorker.perform_async(favourite.stream_entry.id)
 
     if status.local?
       NotifyService.new.call(status.account, favourite)
diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb
index a57e1b28a..09fa295e3 100644
--- a/app/services/follow_service.rb
+++ b/app/services/follow_service.rb
@@ -19,7 +19,10 @@ class FollowService < BaseService
     end
 
     merge_into_timeline(target_account, source_account)
+
     HubPingWorker.perform_async(source_account.id)
+    Pubsubhubbub::DistributionWorker.perform_async(follow.stream_entry.id)
+
     follow
   end
 
diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb
index 76366e984..979a157e9 100644
--- a/app/services/post_status_service.rb
+++ b/app/services/post_status_service.rb
@@ -14,8 +14,11 @@ class PostStatusService < BaseService
     attach_media(status, options[:media_ids])
     process_mentions_service.call(status)
     process_hashtags_service.call(status)
+
     DistributionWorker.perform_async(status.id)
     HubPingWorker.perform_async(account.id)
+    Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
+
     status
   end
 
diff --git a/app/services/pubsubhubbub/subscribe_service.rb b/app/services/pubsubhubbub/subscribe_service.rb
new file mode 100644
index 000000000..343376d77
--- /dev/null
+++ b/app/services/pubsubhubbub/subscribe_service.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::SubscribeService < BaseService
+  def call(account, callback, secret, lease_seconds)
+    return ['Invalid topic URL', 422] if account.nil?
+    return ['Invalid callback URL', 422] unless !callback.blank? && callback =~ /\A#{URI.regexp(%w(http https))}\z/
+
+    subscription = Subscription.where(account: account, callback_url: callback).first_or_create!(account: account, callback_url: callback)
+    Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'subscribe', secret, lease_seconds)
+
+    ['', 202]
+  end
+end
diff --git a/app/services/pubsubhubbub/unsubscribe_service.rb b/app/services/pubsubhubbub/unsubscribe_service.rb
new file mode 100644
index 000000000..a2fdc548a
--- /dev/null
+++ b/app/services/pubsubhubbub/unsubscribe_service.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::SubscribeService < BaseService
+  def call(account, callback)
+    return ['Invalid topic URL', 422] if account.nil?
+
+    subscription = Subscription.where(account: account, callback_url: callback)
+
+    unless subscription.nil?
+      Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'unsubscribe')
+    end
+
+    ['', 202]
+  end
+end
diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb
index 6543d4ae7..39fdb4ea7 100644
--- a/app/services/reblog_service.rb
+++ b/app/services/reblog_service.rb
@@ -7,8 +7,10 @@ class ReblogService < BaseService
   # @return [Status]
   def call(account, reblogged_status)
     reblog = account.statuses.create!(reblog: reblogged_status, text: '')
+
     DistributionWorker.perform_async(reblog.id)
     HubPingWorker.perform_async(account.id)
+    Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id)
 
     if reblogged_status.local?
       NotifyService.new.call(reblogged_status.account, reblog)
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index 689abc97b..058fd3f18 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -10,6 +10,9 @@ class RemoveStatusService < BaseService
     remove_from_public(status)
 
     status.destroy!
+
+    HubPingWorker.perform_async(status.account.id)
+    Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
   end
 
   private
diff --git a/app/views/accounts/show.atom.ruby b/app/views/accounts/show.atom.ruby
index d7b2201d4..558c777f0 100644
--- a/app/views/accounts/show.atom.ruby
+++ b/app/views/accounts/show.atom.ruby
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
 Nokogiri::XML::Builder.new do |xml|
   feed(xml) do
     simple_id  xml, account_url(@account, format: 'atom')
@@ -12,6 +14,7 @@ Nokogiri::XML::Builder.new do |xml|
 
     link_alternate xml, TagManager.instance.url_for(@account)
     link_self      xml, account_url(@account, format: 'atom')
+    link_hub       xml, api_push_url
     link_hub       xml, Rails.configuration.x.hub_url
     link_salmon    xml, api_salmon_url(@account.id)
 
diff --git a/app/workers/pubsubhubbub/confirmation_worker.rb b/app/workers/pubsubhubbub/confirmation_worker.rb
new file mode 100644
index 000000000..6a036adbe
--- /dev/null
+++ b/app/workers/pubsubhubbub/confirmation_worker.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::ConfirmationWorker
+  include Sidekiq::Worker
+  include RoutingHelper
+
+  def perform(subscription_id, mode, secret = nil, lease_seconds = nil)
+    subscription = Subscription.find(subscription_id)
+    challenge    = SecureRandom.hex
+
+    subscription.secret        = secret
+    subscription.lease_seconds = lease_seconds
+
+    response = HTTP.headers(user_agent: 'Mastodon/PubSubHubbub')
+                   .timeout(:per_operation, write: 20, connect: 20, read: 50)
+                   .get(subscription.callback_url, params: {
+                          'hub.topic' => account_url(subscription.account, format: :atom),
+                          'hub.mode'          => mode,
+                          'hub.challenge'     => challenge,
+                          'hub.lease_seconds' => subscription.lease_seconds,
+                        })
+
+    if mode == 'subscribe' && response.body.to_s == challenge
+      subscription.save!
+    elsif (mode == 'unsubscribe' && response.body.to_s == challenge) || !subscription.confirmed?
+      subscription.destroy!
+    end
+  end
+end
diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb
new file mode 100644
index 000000000..4d55798e8
--- /dev/null
+++ b/app/workers/pubsubhubbub/delivery_worker.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::DeliveryWorker
+  include Sidekiq::Worker
+  include RoutingHelper
+
+  def perform(subscription_id, payload)
+    subscription = Subscription.find(subscription_id)
+    headers      = {}
+
+    headers['User-Agent']      = 'Mastodon/PubSubHubbub'
+    headers['Link']            = LinkHeader.new([[api_push_url, [%w(rel hub)]], [account_url(subscription.account, format: :atom), [%w(rel self)]]]).to_s
+    headers['X-Hub-Signature'] = signature(subscription.secret, payload) unless subscription.secret.blank?
+
+    response = HTTP.timeout(:per_operation, write: 50, connect: 20, read: 50)
+                   .headers(headers)
+                   .post(subscription.callback_url, body: payload)
+
+    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
+  end
+
+  private
+
+  def signature(secret, payload)
+    hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
+    "sha1=#{hmac}"
+  end
+end
diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb
new file mode 100644
index 000000000..d8cec2ef2
--- /dev/null
+++ b/app/workers/pubsubhubbub/distribution_worker.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::DistributionWorker
+  include Sidekiq::Worker
+
+  def perform(stream_entry_id)
+    stream_entry = StreamEntry.find(stream_entry_id)
+    account      = stream_entry.account
+    payload      = AccountsController.render(:show, assigns: { account: account, entries: [stream_entry] }, formats: [:atom])
+
+    Subscription.where(account: account).active.select('id').find_each do |subscription|
+      Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload)
+    end
+  end
+end
diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb
index 700161989..84eae73be 100644
--- a/app/workers/thread_resolve_worker.rb
+++ b/app/workers/thread_resolve_worker.rb
@@ -7,9 +7,9 @@ class ThreadResolveWorker
     child_status  = Status.find(child_status_id)
     parent_status = FetchRemoteStatusService.new.call(parent_url)
 
-    unless parent_status.nil?
-      child_status.thread = parent_status
-      child_status.save!
-    end
+    return if parent_status.nil?
+
+    child_status.thread = parent_status
+    child_status.save!
   end
 end
diff --git a/config/routes.rb b/config/routes.rb
index e0c14b47a..5c6568298 100644
--- a/config/routes.rb
+++ b/config/routes.rb
@@ -1,7 +1,9 @@
+# frozen_string_literal: true
+
 require 'sidekiq/web'
 
 Rails.application.routes.draw do
-  mount ActionCable.server => '/cable'
+  mount ActionCable.server, at: 'cable'
 
   authenticate :user, lambda { |u| u.admin? } do
     mount Sidekiq::Web, at: 'sidekiq'
@@ -19,7 +21,7 @@ Rails.application.routes.draw do
     sessions:           'auth/sessions',
     registrations:      'auth/registrations',
     passwords:          'auth/passwords',
-    confirmations:      'auth/confirmations'
+    confirmations:      'auth/confirmations',
   }
 
   resources :accounts, path: 'users', only: [:show], param: :username do
@@ -43,10 +45,13 @@ Rails.application.routes.draw do
   resources :tags,  only: [:show]
 
   namespace :api do
-    # PubSubHubbub
+    # PubSubHubbub outgoing subscriptions
     resources :subscriptions, only: [:show]
     post '/subscriptions/:id', to: 'subscriptions#update'
 
+    # PubSubHubbub incoming subscriptions
+    post '/push', to: 'push#update', as: :push
+
     # Salmon
     post '/salmon/:id', to: 'salmon#update', as: :salmon
 
diff --git a/db/migrate/20161128103007_create_subscriptions.rb b/db/migrate/20161128103007_create_subscriptions.rb
new file mode 100644
index 000000000..46443680a
--- /dev/null
+++ b/db/migrate/20161128103007_create_subscriptions.rb
@@ -0,0 +1,15 @@
+class CreateSubscriptions < ActiveRecord::Migration[5.0]
+  def change
+    create_table :subscriptions do |t|
+      t.string :callback_url, null: false, default: ''
+      t.string :secret
+      t.datetime :expires_at, null: true, default: nil
+      t.boolean :confirmed, null: false, default: false
+      t.integer :account_id, null: false
+
+      t.timestamps
+    end
+
+    add_index :subscriptions, [:callback_url, :account_id], unique: true
+  end
+end
diff --git a/db/schema.rb b/db/schema.rb
index 356badf8e..2c0e6de5b 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -10,7 +10,7 @@
 #
 # It's strongly recommended that you check this file into your version control system.
 
-ActiveRecord::Schema.define(version: 20161123093447) do
+ActiveRecord::Schema.define(version: 20161128103007) do
 
   # These are extensions that must be enabled in order to support this database
   enable_extension "plpgsql"
@@ -143,6 +143,19 @@ ActiveRecord::Schema.define(version: 20161123093447) do
     t.index ["uid"], name: "index_oauth_applications_on_uid", unique: true, using: :btree
   end
 
+  create_table "pubsubhubbub_subscriptions", force: :cascade do |t|
+    t.string   "topic",      default: "",    null: false
+    t.string   "callback",   default: "",    null: false
+    t.string   "mode",       default: "",    null: false
+    t.string   "challenge",  default: "",    null: false
+    t.string   "secret"
+    t.boolean  "confirmed",  default: false, null: false
+    t.datetime "expires_at",                 null: false
+    t.datetime "created_at",                 null: false
+    t.datetime "updated_at",                 null: false
+    t.index ["topic", "callback"], name: "index_pubsubhubbub_subscriptions_on_topic_and_callback", unique: true, using: :btree
+  end
+
   create_table "settings", force: :cascade do |t|
     t.string   "var",         null: false
     t.text     "value"
@@ -185,6 +198,17 @@ ActiveRecord::Schema.define(version: 20161123093447) do
     t.index ["activity_id", "activity_type"], name: "index_stream_entries_on_activity_id_and_activity_type", using: :btree
   end
 
+  create_table "subscriptions", force: :cascade do |t|
+    t.string   "callback_url", default: "",    null: false
+    t.string   "secret"
+    t.datetime "expires_at"
+    t.boolean  "confirmed",    default: false, null: false
+    t.integer  "account_id",                   null: false
+    t.datetime "created_at",                   null: false
+    t.datetime "updated_at",                   null: false
+    t.index ["callback_url", "account_id"], name: "index_subscriptions_on_callback_url_and_account_id", unique: true, using: :btree
+  end
+
   create_table "tags", force: :cascade do |t|
     t.string   "name",       default: "", null: false
     t.datetime "created_at",              null: false
diff --git a/spec/controllers/api/push_controller_spec.rb b/spec/controllers/api/push_controller_spec.rb
new file mode 100644
index 000000000..e699006f7
--- /dev/null
+++ b/spec/controllers/api/push_controller_spec.rb
@@ -0,0 +1,13 @@
+require 'rails_helper'
+
+RSpec.describe Api::PushController, type: :controller do
+  describe 'POST #update' do
+    context 'with hub.mode=subscribe' do
+      pending
+    end
+
+    context 'with hub.mode=unsubscribe' do
+      pending
+    end
+  end
+end
diff --git a/spec/fabricators/subscription_fabricator.rb b/spec/fabricators/subscription_fabricator.rb
new file mode 100644
index 000000000..0c8290494
--- /dev/null
+++ b/spec/fabricators/subscription_fabricator.rb
@@ -0,0 +1,6 @@
+Fabricator(:subscription) do
+  callback_url "http://example.com/callback"
+  secret       "foobar"
+  expires_at   "2016-11-28 11:30:07"
+  confirmed    false
+end
diff --git a/spec/models/subscription_spec.rb b/spec/models/subscription_spec.rb
new file mode 100644
index 000000000..d40bf0b44
--- /dev/null
+++ b/spec/models/subscription_spec.rb
@@ -0,0 +1,5 @@
+require 'rails_helper'
+
+RSpec.describe Subscription, type: :model do
+  pending "add some examples to (or delete) #{__FILE__}"
+end