diff options
23 files changed, 268 insertions, 9 deletions
diff --git a/README.md b/README.md index 304e37935..00472a616 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,8 @@ The container has two volumes, for the assets and for user uploads. The default - `rake mastodon:media:clear` removes uploads that have not been attached to any status after a while, you would want to run this from a periodic cronjob - `rake mastodon:push:clear` unsubscribes from PuSH notifications for remote users that have no local followers. You may not want to actually do that, to keep a fuller footprint of the fediverse or in case your users will soon re-follow - `rake mastodon:push:refresh` re-subscribes PuSH for expiring remote users, this should be run periodically from a cronjob and quite often as the expiration time depends on the particular hub of the remote user -- `rake mastodon:feeds:clear` removes all timelines, which forces them to be re-built on the fly next time a user tries to fetch their home/mentions timeline. Only for troubleshooting +- `rake mastodon:feeds:clear_all` removes all timelines, which forces them to be re-built on the fly next time a user tries to fetch their home/mentions timeline. Only for troubleshooting +- `rake mastodon:feeds:clear` removes timelines of users who haven't signed in lately, which allows to save RAM and improve message distribution. This is required to be run periodically so that when they login again the regeneration process will trigger Running any of these tasks via docker-compose would look like this: @@ -114,6 +115,8 @@ Which will re-create the updated containers, leaving databases and data as is. D You can open issues for bugs you've found or features you think are missing. You can also submit pull requests to this repository. This section may be updated with more details in the future. +**IRC channel**: #mastodon on irc.freenode.net + ### Extra credits - The [Emoji One](https://github.com/Ranks/emojione) pack has been used for the emojis diff --git a/app/controllers/api/push_controller.rb b/app/controllers/api/push_controller.rb new file mode 100644 index 000000000..78d4e36e6 --- /dev/null +++ b/app/controllers/api/push_controller.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class Api::PushController < ApiController + def update + mode = params['hub.mode'] + topic = params['hub.topic'] + callback = params['hub.callback'] + lease_seconds = params['hub.lease_seconds'] + secret = params['hub.secret'] + + case mode + when 'subscribe' + response, status = Pubsubhubbub::SubscribeService.new.call(topic_to_account(topic), callback, secret, lease_seconds) + when 'unsubscribe' + response, status = Pubsubhubbub::UnsubscribeService.new.call(topic_to_account(topic), callback) + else + response = "Unknown mode: #{mode}" + status = 422 + end + + render plain: response, status: status + end + + private + + def topic_to_account(topic_url) + return if topic_url.blank? + + uri = Addressable::URI.parse(topic_url) + params = Rails.application.routes.recognize_path(uri.path) + domain = uri.host + (uri.port ? ":#{uri.port}" : '') + + return unless TagManager.instance.local_domain?(domain) && params[:controller] == 'accounts' && params[:action] == 'show' && params[:format] == 'atom' + + Account.find_local(params[:username]) + end +end diff --git a/app/models/account.rb b/app/models/account.rb index 65fad2f47..f1a4d4b4f 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -44,8 +44,12 @@ class Account < ApplicationRecord has_many :block_relationships, class_name: 'Block', foreign_key: 'account_id', dependent: :destroy has_many :blocking, -> { order('blocks.id desc') }, through: :block_relationships, source: :target_account + # Media has_many :media_attachments, dependent: :destroy + # PuSH subscriptions + has_many :subscriptions, dependent: :destroy + pg_search_scope :search_for, against: { username: 'A', domain: 'B' }, using: { tsearch: { prefix: true } } scope :remote, -> { where.not(domain: nil) } diff --git a/app/models/media_attachment.rb b/app/models/media_attachment.rb index bfbf00d76..f1b9b8112 100644 --- a/app/models/media_attachment.rb +++ b/app/models/media_attachment.rb @@ -16,6 +16,8 @@ class MediaAttachment < ApplicationRecord validates :account, presence: true + default_scope { order('id asc') } + def local? remote_url.blank? end diff --git a/app/models/subscription.rb b/app/models/subscription.rb new file mode 100644 index 000000000..e968c6675 --- /dev/null +++ b/app/models/subscription.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class Subscription < ApplicationRecord + MIN_EXPIRATION = 3600 * 24 + MAX_EXPIRATION = 3600 * 24 * 30 + + belongs_to :account + + validates :callback_url, presence: true + validates :callback_url, uniqueness: { scope: :account_id } + + scope :active, -> { where(confirmed: true).where('expires_at > ?', Time.now.utc) } + + def lease_seconds=(str) + self.expires_at = Time.now.utc + [[MIN_EXPIRATION, str.to_i].max, MAX_EXPIRATION].min.seconds + end + + def lease_seconds + (expires_at - Time.now.utc).to_i + end + + before_validation :set_min_expiration + + private + + def set_min_expiration + self.lease_seconds = 0 unless expires_at + end +end diff --git a/app/services/favourite_service.rb b/app/services/favourite_service.rb index 781b03b40..2f280e03f 100644 --- a/app/services/favourite_service.rb +++ b/app/services/favourite_service.rb @@ -7,7 +7,9 @@ class FavouriteService < BaseService # @return [Favourite] def call(account, status) favourite = Favourite.create!(account: account, status: status) + HubPingWorker.perform_async(account.id) + Pubsubhubbub::DistributionWorker.perform_async(favourite.stream_entry.id) if status.local? NotifyService.new.call(status.account, favourite) diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb index a57e1b28a..09fa295e3 100644 --- a/app/services/follow_service.rb +++ b/app/services/follow_service.rb @@ -19,7 +19,10 @@ class FollowService < BaseService end merge_into_timeline(target_account, source_account) + HubPingWorker.perform_async(source_account.id) + Pubsubhubbub::DistributionWorker.perform_async(follow.stream_entry.id) + follow end diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb index 76366e984..979a157e9 100644 --- a/app/services/post_status_service.rb +++ b/app/services/post_status_service.rb @@ -14,8 +14,11 @@ class PostStatusService < BaseService attach_media(status, options[:media_ids]) process_mentions_service.call(status) process_hashtags_service.call(status) + DistributionWorker.perform_async(status.id) HubPingWorker.perform_async(account.id) + Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) + status end diff --git a/app/services/pubsubhubbub/subscribe_service.rb b/app/services/pubsubhubbub/subscribe_service.rb new file mode 100644 index 000000000..343376d77 --- /dev/null +++ b/app/services/pubsubhubbub/subscribe_service.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class Pubsubhubbub::SubscribeService < BaseService + def call(account, callback, secret, lease_seconds) + return ['Invalid topic URL', 422] if account.nil? + return ['Invalid callback URL', 422] unless !callback.blank? && callback =~ /\A#{URI.regexp(%w(http https))}\z/ + + subscription = Subscription.where(account: account, callback_url: callback).first_or_create!(account: account, callback_url: callback) + Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'subscribe', secret, lease_seconds) + + ['', 202] + end +end diff --git a/app/services/pubsubhubbub/unsubscribe_service.rb b/app/services/pubsubhubbub/unsubscribe_service.rb new file mode 100644 index 000000000..a2fdc548a --- /dev/null +++ b/app/services/pubsubhubbub/unsubscribe_service.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class Pubsubhubbub::SubscribeService < BaseService + def call(account, callback) + return ['Invalid topic URL', 422] if account.nil? + + subscription = Subscription.where(account: account, callback_url: callback) + + unless subscription.nil? + Pubsubhubbub::ConfirmationWorker.perform_async(subscription.id, 'unsubscribe') + end + + ['', 202] + end +end diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb index 6543d4ae7..39fdb4ea7 100644 --- a/app/services/reblog_service.rb +++ b/app/services/reblog_service.rb @@ -7,8 +7,10 @@ class ReblogService < BaseService # @return [Status] def call(account, reblogged_status) reblog = account.statuses.create!(reblog: reblogged_status, text: '') + DistributionWorker.perform_async(reblog.id) HubPingWorker.perform_async(account.id) + Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id) if reblogged_status.local? NotifyService.new.call(reblogged_status.account, reblog) diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 689abc97b..058fd3f18 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -10,6 +10,9 @@ class RemoveStatusService < BaseService remove_from_public(status) status.destroy! + + HubPingWorker.perform_async(status.account.id) + Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) end private diff --git a/app/views/accounts/show.atom.ruby b/app/views/accounts/show.atom.ruby index d7b2201d4..558c777f0 100644 --- a/app/views/accounts/show.atom.ruby +++ b/app/views/accounts/show.atom.ruby @@ -1,3 +1,5 @@ +# frozen_string_literal: true + Nokogiri::XML::Builder.new do |xml| feed(xml) do simple_id xml, account_url(@account, format: 'atom') @@ -12,6 +14,7 @@ Nokogiri::XML::Builder.new do |xml| link_alternate xml, TagManager.instance.url_for(@account) link_self xml, account_url(@account, format: 'atom') + link_hub xml, api_push_url link_hub xml, Rails.configuration.x.hub_url link_salmon xml, api_salmon_url(@account.id) diff --git a/app/workers/pubsubhubbub/confirmation_worker.rb b/app/workers/pubsubhubbub/confirmation_worker.rb new file mode 100644 index 000000000..6a036adbe --- /dev/null +++ b/app/workers/pubsubhubbub/confirmation_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class Pubsubhubbub::ConfirmationWorker + include Sidekiq::Worker + include RoutingHelper + + def perform(subscription_id, mode, secret = nil, lease_seconds = nil) + subscription = Subscription.find(subscription_id) + challenge = SecureRandom.hex + + subscription.secret = secret + subscription.lease_seconds = lease_seconds + + response = HTTP.headers(user_agent: 'Mastodon/PubSubHubbub') + .timeout(:per_operation, write: 20, connect: 20, read: 50) + .get(subscription.callback_url, params: { + 'hub.topic' => account_url(subscription.account, format: :atom), + 'hub.mode' => mode, + 'hub.challenge' => challenge, + 'hub.lease_seconds' => subscription.lease_seconds, + }) + + if mode == 'subscribe' && response.body.to_s == challenge + subscription.save! + elsif (mode == 'unsubscribe' && response.body.to_s == challenge) || !subscription.confirmed? + subscription.destroy! + end + end +end diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb new file mode 100644 index 000000000..4d55798e8 --- /dev/null +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +class Pubsubhubbub::DeliveryWorker + include Sidekiq::Worker + include RoutingHelper + + def perform(subscription_id, payload) + subscription = Subscription.find(subscription_id) + headers = {} + + headers['User-Agent'] = 'Mastodon/PubSubHubbub' + headers['Link'] = LinkHeader.new([[api_push_url, [%w(rel hub)]], [account_url(subscription.account, format: :atom), [%w(rel self)]]]).to_s + headers['X-Hub-Signature'] = signature(subscription.secret, payload) unless subscription.secret.blank? + + response = HTTP.timeout(:per_operation, write: 50, connect: 20, read: 50) + .headers(headers) + .post(subscription.callback_url, body: payload) + + raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300 + end + + private + + def signature(secret, payload) + hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload) + "sha1=#{hmac}" + end +end diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb new file mode 100644 index 000000000..d8cec2ef2 --- /dev/null +++ b/app/workers/pubsubhubbub/distribution_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class Pubsubhubbub::DistributionWorker + include Sidekiq::Worker + + def perform(stream_entry_id) + stream_entry = StreamEntry.find(stream_entry_id) + account = stream_entry.account + payload = AccountsController.render(:show, assigns: { account: account, entries: [stream_entry] }, formats: [:atom]) + + Subscription.where(account: account).active.select('id').find_each do |subscription| + Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload) + end + end +end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 700161989..84eae73be 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -7,9 +7,9 @@ class ThreadResolveWorker child_status = Status.find(child_status_id) parent_status = FetchRemoteStatusService.new.call(parent_url) - unless parent_status.nil? - child_status.thread = parent_status - child_status.save! - end + return if parent_status.nil? + + child_status.thread = parent_status + child_status.save! end end diff --git a/config/routes.rb b/config/routes.rb index e0c14b47a..5c6568298 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -1,7 +1,9 @@ +# frozen_string_literal: true + require 'sidekiq/web' Rails.application.routes.draw do - mount ActionCable.server => '/cable' + mount ActionCable.server, at: 'cable' authenticate :user, lambda { |u| u.admin? } do mount Sidekiq::Web, at: 'sidekiq' @@ -19,7 +21,7 @@ Rails.application.routes.draw do sessions: 'auth/sessions', registrations: 'auth/registrations', passwords: 'auth/passwords', - confirmations: 'auth/confirmations' + confirmations: 'auth/confirmations', } resources :accounts, path: 'users', only: [:show], param: :username do @@ -43,10 +45,13 @@ Rails.application.routes.draw do resources :tags, only: [:show] namespace :api do - # PubSubHubbub + # PubSubHubbub outgoing subscriptions resources :subscriptions, only: [:show] post '/subscriptions/:id', to: 'subscriptions#update' + # PubSubHubbub incoming subscriptions + post '/push', to: 'push#update', as: :push + # Salmon post '/salmon/:id', to: 'salmon#update', as: :salmon diff --git a/db/migrate/20161128103007_create_subscriptions.rb b/db/migrate/20161128103007_create_subscriptions.rb new file mode 100644 index 000000000..46443680a --- /dev/null +++ b/db/migrate/20161128103007_create_subscriptions.rb @@ -0,0 +1,15 @@ +class CreateSubscriptions < ActiveRecord::Migration[5.0] + def change + create_table :subscriptions do |t| + t.string :callback_url, null: false, default: '' + t.string :secret + t.datetime :expires_at, null: true, default: nil + t.boolean :confirmed, null: false, default: false + t.integer :account_id, null: false + + t.timestamps + end + + add_index :subscriptions, [:callback_url, :account_id], unique: true + end +end diff --git a/db/schema.rb b/db/schema.rb index 356badf8e..2c0e6de5b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20161123093447) do +ActiveRecord::Schema.define(version: 20161128103007) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -143,6 +143,19 @@ ActiveRecord::Schema.define(version: 20161123093447) do t.index ["uid"], name: "index_oauth_applications_on_uid", unique: true, using: :btree end + create_table "pubsubhubbub_subscriptions", force: :cascade do |t| + t.string "topic", default: "", null: false + t.string "callback", default: "", null: false + t.string "mode", default: "", null: false + t.string "challenge", default: "", null: false + t.string "secret" + t.boolean "confirmed", default: false, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["topic", "callback"], name: "index_pubsubhubbub_subscriptions_on_topic_and_callback", unique: true, using: :btree + end + create_table "settings", force: :cascade do |t| t.string "var", null: false t.text "value" @@ -185,6 +198,17 @@ ActiveRecord::Schema.define(version: 20161123093447) do t.index ["activity_id", "activity_type"], name: "index_stream_entries_on_activity_id_and_activity_type", using: :btree end + create_table "subscriptions", force: :cascade do |t| + t.string "callback_url", default: "", null: false + t.string "secret" + t.datetime "expires_at" + t.boolean "confirmed", default: false, null: false + t.integer "account_id", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["callback_url", "account_id"], name: "index_subscriptions_on_callback_url_and_account_id", unique: true, using: :btree + end + create_table "tags", force: :cascade do |t| t.string "name", default: "", null: false t.datetime "created_at", null: false diff --git a/spec/controllers/api/push_controller_spec.rb b/spec/controllers/api/push_controller_spec.rb new file mode 100644 index 000000000..e699006f7 --- /dev/null +++ b/spec/controllers/api/push_controller_spec.rb @@ -0,0 +1,13 @@ +require 'rails_helper' + +RSpec.describe Api::PushController, type: :controller do + describe 'POST #update' do + context 'with hub.mode=subscribe' do + pending + end + + context 'with hub.mode=unsubscribe' do + pending + end + end +end diff --git a/spec/fabricators/subscription_fabricator.rb b/spec/fabricators/subscription_fabricator.rb new file mode 100644 index 000000000..0c8290494 --- /dev/null +++ b/spec/fabricators/subscription_fabricator.rb @@ -0,0 +1,6 @@ +Fabricator(:subscription) do + callback_url "http://example.com/callback" + secret "foobar" + expires_at "2016-11-28 11:30:07" + confirmed false +end diff --git a/spec/models/subscription_spec.rb b/spec/models/subscription_spec.rb new file mode 100644 index 000000000..d40bf0b44 --- /dev/null +++ b/spec/models/subscription_spec.rb @@ -0,0 +1,5 @@ +require 'rails_helper' + +RSpec.describe Subscription, type: :model do + pending "add some examples to (or delete) #{__FILE__}" +end |