diff options
Diffstat (limited to 'app')
-rw-r--r-- | app/javascript/mastodon/features/status/index.js | 68 | ||||
-rw-r--r-- | app/javascript/mastodon/reducers/compose.js | 8 | ||||
-rw-r--r-- | app/lib/connection_pool/shared_connection_pool.rb | 63 | ||||
-rw-r--r-- | app/lib/connection_pool/shared_timed_stack.rb | 95 | ||||
-rw-r--r-- | app/lib/request.rb | 70 | ||||
-rw-r--r-- | app/lib/request_pool.rb | 114 | ||||
-rw-r--r-- | app/lib/sidekiq_error_handler.rb | 8 | ||||
-rw-r--r-- | app/models/concerns/attachmentable.rb | 4 | ||||
-rw-r--r-- | app/models/custom_filter.rb | 7 | ||||
-rw-r--r-- | app/services/activitypub/process_account_service.rb | 4 | ||||
-rw-r--r-- | app/services/resolve_account_service.rb | 2 | ||||
-rw-r--r-- | app/workers/activitypub/delivery_worker.rb | 17 |
12 files changed, 405 insertions, 55 deletions
diff --git a/app/javascript/mastodon/features/status/index.js b/app/javascript/mastodon/features/status/index.js index 981eb9d58..0422111ae 100644 --- a/app/javascript/mastodon/features/status/index.js +++ b/app/javascript/mastodon/features/status/index.js @@ -4,6 +4,7 @@ import { connect } from 'react-redux'; import PropTypes from 'prop-types'; import classNames from 'classnames'; import ImmutablePropTypes from 'react-immutable-proptypes'; +import { createSelector } from 'reselect'; import { fetchStatus } from '../../actions/statuses'; import MissingIndicator from '../../components/missing_indicator'; import DetailedStatus from './components/detailed_status'; @@ -63,39 +64,58 @@ const messages = defineMessages({ const makeMapStateToProps = () => { const getStatus = makeGetStatus(); - const mapStateToProps = (state, props) => { - const status = getStatus(state, { id: props.params.statusId }); + const getAncestorsIds = createSelector([ + (_, { id }) => id, + state => state.getIn(['contexts', 'inReplyTos']), + ], (statusId, inReplyTos) => { let ancestorsIds = Immutable.List(); + ancestorsIds = ancestorsIds.withMutations(mutable => { + let id = statusId; + + while (id) { + mutable.unshift(id); + id = inReplyTos.get(id); + } + }); + + return ancestorsIds; + }); + + const getDescendantsIds = createSelector([ + (_, { id }) => id, + state => state.getIn(['contexts', 'replies']), + ], (statusId, contextReplies) => { let descendantsIds = Immutable.List(); + descendantsIds = descendantsIds.withMutations(mutable => { + const ids = [statusId]; - if (status) { - ancestorsIds = ancestorsIds.withMutations(mutable => { - let id = status.get('in_reply_to_id'); + while (ids.length > 0) { + let id = ids.shift(); + const replies = contextReplies.get(id); - while (id) { - mutable.unshift(id); - id = state.getIn(['contexts', 'inReplyTos', id]); + if (statusId !== id) { + mutable.push(id); } - }); - descendantsIds = descendantsIds.withMutations(mutable => { - const ids = [status.get('id')]; + if (replies) { + replies.reverse().forEach(reply => { + ids.unshift(reply); + }); + } + } + }); - while (ids.length > 0) { - let id = ids.shift(); - const replies = state.getIn(['contexts', 'replies', id]); + return descendantsIds; + }); - if (status.get('id') !== id) { - mutable.push(id); - } + const mapStateToProps = (state, props) => { + const status = getStatus(state, { id: props.params.statusId }); + let ancestorsIds = Immutable.List(); + let descendantsIds = Immutable.List(); - if (replies) { - replies.reverse().forEach(reply => { - ids.unshift(reply); - }); - } - } - }); + if (status) { + ancestorsIds = getAncestorsIds(state, { id: status.get('in_reply_to_id') }); + descendantsIds = getDescendantsIds(state, { id: status.get('id') }); } return { diff --git a/app/javascript/mastodon/reducers/compose.js b/app/javascript/mastodon/reducers/compose.js index 8cdd29bfe..fae7522b2 100644 --- a/app/javascript/mastodon/reducers/compose.js +++ b/app/javascript/mastodon/reducers/compose.js @@ -195,6 +195,12 @@ const expandMentions = status => { return fragment.innerHTML; }; +const expiresInFromExpiresAt = expires_at => { + if (!expires_at) return 24 * 3600; + const delta = (new Date(expires_at).getTime() - Date.now()) / 1000; + return [300, 1800, 3600, 21600, 86400, 259200, 604800].find(expires_in => expires_in >= delta) || 24 * 3600; +}; + export default function compose(state = initialState, action) { switch(action.type) { case STORE_HYDRATE: @@ -353,7 +359,7 @@ export default function compose(state = initialState, action) { map.set('poll', ImmutableMap({ options: action.status.getIn(['poll', 'options']).map(x => x.get('title')), multiple: action.status.getIn(['poll', 'multiple']), - expires_in: 24 * 3600, + expires_in: expiresInFromExpiresAt(action.status.getIn(['poll', 'expires_at'])), })); } }); diff --git a/app/lib/connection_pool/shared_connection_pool.rb b/app/lib/connection_pool/shared_connection_pool.rb new file mode 100644 index 000000000..2865a4108 --- /dev/null +++ b/app/lib/connection_pool/shared_connection_pool.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'connection_pool' +require_relative './shared_timed_stack' + +class ConnectionPool::SharedConnectionPool < ConnectionPool + def initialize(options = {}, &block) + super(options, &block) + + @available = ConnectionPool::SharedTimedStack.new(@size, &block) + end + + delegate :size, :flush, to: :@available + + def with(preferred_tag, options = {}) + Thread.handle_interrupt(Exception => :never) do + conn = checkout(preferred_tag, options) + + begin + Thread.handle_interrupt(Exception => :immediate) do + yield conn + end + ensure + checkin(preferred_tag) + end + end + end + + def checkout(preferred_tag, options = {}) + if ::Thread.current[key(preferred_tag)] + ::Thread.current[key_count(preferred_tag)] += 1 + ::Thread.current[key(preferred_tag)] + else + ::Thread.current[key_count(preferred_tag)] = 1 + ::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout) + end + end + + def checkin(preferred_tag) + if ::Thread.current[key(preferred_tag)] + if ::Thread.current[key_count(preferred_tag)] == 1 + @available.push(::Thread.current[key(preferred_tag)]) + ::Thread.current[key(preferred_tag)] = nil + else + ::Thread.current[key_count(preferred_tag)] -= 1 + end + else + raise ConnectionPool::Error, 'no connections are checked out' + end + + nil + end + + private + + def key(tag) + :"#{@key}-#{tag}" + end + + def key_count(tag) + :"#{@key_count}-#{tag}" + end +end diff --git a/app/lib/connection_pool/shared_timed_stack.rb b/app/lib/connection_pool/shared_timed_stack.rb new file mode 100644 index 000000000..14a5285c4 --- /dev/null +++ b/app/lib/connection_pool/shared_timed_stack.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +class ConnectionPool::SharedTimedStack + def initialize(max = 0, &block) + @create_block = block + @max = max + @created = 0 + @queue = [] + @tagged_queue = Hash.new { |hash, key| hash[key] = [] } + @mutex = Mutex.new + @resource = ConditionVariable.new + end + + def push(connection) + @mutex.synchronize do + store_connection(connection) + @resource.broadcast + end + end + + alias << push + + def pop(preferred_tag, timeout = 5.0) + deadline = current_time + timeout + + @mutex.synchronize do + loop do + return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty? + + connection = try_create(preferred_tag) + return connection if connection + + to_wait = deadline - current_time + raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 + + @resource.wait(@mutex, to_wait) + end + end + end + + def empty? + size.zero? + end + + def size + @mutex.synchronize do + @queue.size + end + end + + def flush + @mutex.synchronize do + @queue.delete_if do |connection| + delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME) + + if delete + @tagged_queue[connection.site].delete(connection) + connection.close + @created -= 1 + end + + delete + end + end + end + + private + + def try_create(preferred_tag) + if @created == @max && !@queue.empty? + throw_away_connection = @queue.pop + @tagged_queue[throw_away_connection.site].delete(throw_away_connection) + @create_block.call(preferred_tag) + elsif @created != @max + connection = @create_block.call(preferred_tag) + @created += 1 + connection + end + end + + def fetch_preferred_connection(preferred_tag) + connection = @tagged_queue[preferred_tag].pop + @queue.delete(connection) + connection + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def store_connection(connection) + @tagged_queue[connection.site].push(connection) + @queue.push(connection) + end +end diff --git a/app/lib/request.rb b/app/lib/request.rb index e555ae6a1..af49d6c77 100644 --- a/app/lib/request.rb +++ b/app/lib/request.rb @@ -17,15 +17,21 @@ end class Request REQUEST_TARGET = '(request-target)' + # We enforce a 5s timeout on DNS resolving, 5s timeout on socket opening + # and 5s timeout on the TLS handshake, meaning the worst case should take + # about 15s in total + TIMEOUT = { connect: 5, read: 10, write: 10 }.freeze + include RoutingHelper def initialize(verb, url, **options) raise ArgumentError if url.blank? - @verb = verb - @url = Addressable::URI.parse(url).normalize - @options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket }) - @headers = {} + @verb = verb + @url = Addressable::URI.parse(url).normalize + @http_client = options.delete(:http_client) + @options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket }) + @headers = {} raise Mastodon::HostValidationError, 'Instance does not support hidden service connections' if block_hidden_service? @@ -50,15 +56,24 @@ class Request def perform begin - response = http_client.headers(headers).public_send(@verb, @url.to_s, @options) + response = http_client.public_send(@verb, @url.to_s, @options.merge(headers: headers)) rescue => e raise e.class, "#{e.message} on #{@url}", e.backtrace[0] end begin - yield response.extend(ClientLimit) if block_given? + response = response.extend(ClientLimit) + + # If we are using a persistent connection, we have to + # read every response to be able to move forward at all. + # However, simply calling #to_s or #flush may not be safe, + # as the response body, if malicious, could be too big + # for our memory. So we use the #body_with_limit method + response.body_with_limit if http_client.persistent? + + yield response if block_given? ensure - http_client.close + http_client.close unless http_client.persistent? end end @@ -76,6 +91,10 @@ class Request %w(http https).include?(parsed_url.scheme) && parsed_url.host.present? end + + def http_client + HTTP.use(:auto_inflate).timeout(:per_operation, TIMEOUT.dup).follow(max_hops: 2) + end end private @@ -116,16 +135,8 @@ class Request end end - def timeout - # We enforce a 1s timeout on DNS resolving, 10s timeout on socket opening - # and 5s timeout on the TLS handshake, meaning the worst case should take - # about 16s in total - - { connect: 5, read: 10, write: 10 } - end - def http_client - @http_client ||= HTTP.use(:auto_inflate).timeout(:per_operation, timeout).follow(max_hops: 2) + @http_client ||= Request.http_client end def use_proxy? @@ -169,20 +180,41 @@ class Request return super(host, *args) if thru_hidden_service?(host) outer_e = nil + port = args.first Resolv::DNS.open do |dns| dns.timeouts = 5 addresses = dns.getaddresses(host).take(2) - time_slot = 10.0 / addresses.size addresses.each do |address| begin raise Mastodon::HostValidationError if PrivateAddressCheck.private_address?(IPAddr.new(address.to_s)) - ::Timeout.timeout(time_slot, HTTP::TimeoutError) do - return super(address.to_s, *args) + sock = ::Socket.new(::Socket::AF_INET, ::Socket::SOCK_STREAM, 0) + sockaddr = ::Socket.pack_sockaddr_in(port, address.to_s) + + sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) + + begin + sock.connect_nonblock(sockaddr) + rescue IO::WaitWritable + if IO.select(nil, [sock], nil, Request::TIMEOUT[:connect]) + begin + sock.connect_nonblock(sockaddr) + rescue Errno::EISCONN + # Yippee! + rescue + sock.close + raise + end + else + sock.close + raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds" + end end + + return sock rescue => e outer_e = e end diff --git a/app/lib/request_pool.rb b/app/lib/request_pool.rb new file mode 100644 index 000000000..e5899a79a --- /dev/null +++ b/app/lib/request_pool.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +require_relative './connection_pool/shared_connection_pool' + +class RequestPool + def self.current + @current ||= RequestPool.new + end + + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency&.positive? + + Thread.new(frequency, pool) do |t, p| + loop do + sleep t + p.flush + end + end + end + end + + MAX_IDLE_TIME = 30 + WAIT_TIMEOUT = 5 + MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i + + class Connection + attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh + + def initialize(site) + @site = site + @http_client = http_client + @last_used_at = nil + @created_at = current_time + @dead = false + @fresh = true + end + + def use + @last_used_at = current_time + @in_use = true + + retries = 0 + + begin + yield @http_client + rescue HTTP::ConnectionError + # It's possible the connection was closed, so let's + # try re-opening it once + + close + + if @fresh || retries.positive? + raise + else + @http_client = http_client + retries += 1 + retry + end + rescue StandardError + # If this connection raises errors of any kind, it's + # better if it gets reaped as soon as possible + + close + @dead = true + raise + end + ensure + @fresh = false + @in_use = false + end + + def seconds_idle + current_time - (@last_used_at || @created_at) + end + + def close + @http_client.close + end + + private + + def http_client + Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME) + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + + def initialize + @pool = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) } + @reaper = Reaper.new(self, 30) + @reaper.run + end + + def with(site, &block) + @pool.with(site) do |connection| + ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do + connection.use(&block) + end + end + end + + delegate :size, :flush, to: :@pool +end diff --git a/app/lib/sidekiq_error_handler.rb b/app/lib/sidekiq_error_handler.rb index 23785cf05..8eb6b942d 100644 --- a/app/lib/sidekiq_error_handler.rb +++ b/app/lib/sidekiq_error_handler.rb @@ -3,9 +3,11 @@ class SidekiqErrorHandler def call(*) yield - rescue Mastodon::HostValidationError => e - Rails.logger.error "#{e.class}: #{e.message}" - Rails.logger.error e.backtrace.join("\n") + rescue Mastodon::HostValidationError # Do not retry + ensure + socket = Thread.current[:statsd_socket] + socket&.close + Thread.current[:statsd_socket] = nil end end diff --git a/app/models/concerns/attachmentable.rb b/app/models/concerns/attachmentable.rb index 24f5968de..7c78bb456 100644 --- a/app/models/concerns/attachmentable.rb +++ b/app/models/concerns/attachmentable.rb @@ -60,7 +60,9 @@ module Attachmentable end def calculated_content_type(attachment) - Paperclip.run('file', '-b --mime :file', file: attachment.queued_for_write[:original].path).split(/[:;\s]+/).first.chomp + content_type = Paperclip.run('file', '-b --mime :file', file: attachment.queued_for_write[:original].path).split(/[:;\s]+/).first.chomp + content_type = 'video/mp4' if content_type == 'video/x-m4v' + content_type rescue Terrapin::CommandLineError '' end diff --git a/app/models/custom_filter.rb b/app/models/custom_filter.rb index 342207a55..382562fb8 100644 --- a/app/models/custom_filter.rb +++ b/app/models/custom_filter.rb @@ -35,6 +35,13 @@ class CustomFilter < ApplicationRecord before_validation :clean_up_contexts after_commit :remove_cache + def expires_in + return @expires_in if defined?(@expires_in) + return nil if expires_at.nil? + + [30.minutes, 1.hour, 6.hours, 12.hours, 1.day, 1.week].find { |expires_in| expires_in.from_now >= expires_at } + end + private def clean_up_contexts diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb index 05c017bdf..3857e7c16 100644 --- a/app/services/activitypub/process_account_service.rb +++ b/app/services/activitypub/process_account_service.rb @@ -15,6 +15,8 @@ class ActivityPub::ProcessAccountService < BaseService @domain = domain @collections = {} + return if auto_suspend? + RedisLock.acquire(lock_options) do |lock| if lock.acquired? @account = Account.find_remote(@username, @domain) @@ -55,7 +57,7 @@ class ActivityPub::ProcessAccountService < BaseService @account.domain = @domain @account.private_key = nil @account.suspended_at = domain_block.created_at if auto_suspend? - @account.silenced_at = domain_block.created_at if auto_silence? + @account.silenced_at = domain_block.created_at if auto_silence? end def update_account diff --git a/app/services/resolve_account_service.rb b/app/services/resolve_account_service.rb index 57c9ccfe1..e557706da 100644 --- a/app/services/resolve_account_service.rb +++ b/app/services/resolve_account_service.rb @@ -48,7 +48,7 @@ class ResolveAccountService < BaseService return end - return if links_missing? + return if links_missing? || auto_suspend? return Account.find_local(@username) if TagManager.instance.local_domain?(@domain) RedisLock.acquire(lock_options) do |lock| diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 5e4c391f0..818fd8f5d 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -17,6 +17,7 @@ class ActivityPub::DeliveryWorker @json = json @source_account = Account.find(source_account_id) @inbox_url = inbox_url + @host = Addressable::URI.parse(inbox_url).normalized_site perform_request @@ -28,16 +29,18 @@ class ActivityPub::DeliveryWorker private - def build_request - request = Request.new(:post, @inbox_url, body: @json) + def build_request(http_client) + request = Request.new(:post, @inbox_url, body: @json, http_client: http_client) request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) request.add_headers(HEADERS) end def perform_request light = Stoplight(@inbox_url) do - build_request.perform do |response| - raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) + request_pool.with(@host) do |http_client| + build_request(http_client).perform do |response| + raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response) + end end end @@ -51,10 +54,14 @@ class ActivityPub::DeliveryWorker end def response_error_unsalvageable?(response) - (400...500).cover?(response.code) && ![401, 408, 429].include?(response.code) + response.code == 501 || ((400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)) end def failure_tracker @failure_tracker ||= DeliveryFailureTracker.new(@inbox_url) end + + def request_pool + RequestPool.current + end end |