# frozen_string_literal: true require 'csv' class ImportService < BaseService include RoutingHelper include JsonLdHelper ROWS_PROCESSING_LIMIT = 20_000 CONTENT_TYPES = %w(text/bbcode+markdown text/markdown text/bbcode text/html text/plain text/console).freeze VISIBILITIES = [:public, :unlisted, :private, :direct, :limited].freeze IMPORT_STATUS_ATTRIBUTES = [ 'id', 'content_type', 'spoiler_text', 'text', 'footer', 'in_reply_to_id', 'reply', 'reblog_of_id', 'created_at', 'conversation_id', 'sensitive', 'language', 'local_only', 'hidden', 'visibility', ].freeze def call(import) @import = import @account = @import.account case @import.type when 'following' import_follows! when 'blocking' import_blocks! when 'muting' import_mutes! when 'domain_blocking' import_domain_blocks! when 'statuses' import_statuses! end end private def import_follows! parse_import_data!(['Account address']) import_relationships!('follow', 'unfollow', @account.following, follow_limit, reblogs: 'Show boosts') end def import_blocks! parse_import_data!(['Account address']) import_relationships!('block', 'unblock', @account.blocking, ROWS_PROCESSING_LIMIT) end def import_mutes! parse_import_data!(['Account address']) import_relationships!('mute', 'unmute', @account.muting, ROWS_PROCESSING_LIMIT, notifications: 'Hide notifications') end def import_domain_blocks! parse_import_data!(['#domain']) items = @data.take(ROWS_PROCESSING_LIMIT).map { |row| row['#domain'].strip } if @import.overwrite? presence_hash = items.each_with_object({}) { |id, mapping| mapping[id] = true } @account.domain_blocks.find_each do |domain_block| if presence_hash[domain_block.domain] items.delete(domain_block.domain) else @account.unblock_domain!(domain_block.domain) end end end items.each do |domain| @account.block_domain!(domain) end AfterAccountDomainBlockWorker.push_bulk(items) do |domain| [@account.id, domain] end end def import_statuses! parse_import_data_json! return if @data.nil? if @import.overwrite? @account.statuses.without_reblogs.reorder(nil).find_in_batches do |statuses| BatchedRemoveStatusService.new.call(statuses) end end return import_activitypub if @data.kind_of?(Hash) && @data['orderedItems'].present? return unless @data.kind_of?(Array) import_json_statuses end def import_json_statuses @account.user.vars['_bangtags:disable'] = true @account.user.save @data.each do |json| # skip if invalid status object next if json.nil? next unless json.kind_of?(Hash) json.slice!(*IMPORT_STATUS_ATTRIBUTES) json.compact! next if json.blank? # skip if missing reblog unless json['reblog_of_id'].nil? json['reblog_of_id'] = json['reblog_of_id'].to_i next unless (json['reblog_of_id'] != 0 ? Status.where(id: json['reblog_of_id']).exists? : false) end # convert iso8601 strings to DateTime object json['created_at'] = json['created_at'].kind_of?(String) ? DateTime.iso8601(json['created_at']).utc : Time.now.utc if json['id'].blank? json['id'] = nil else # make sure id is an integer status_id = json['id'].to_i json['id'] = status_id != 0 ? status_id : nil # check for duplicate existing_status = Status.find_by_id(json['id']) unless existing_status.nil? # skip if duplicate next if (json['created_at'] - existing_status.created_at).abs < 1 # else drop the conflicting id value json['id'] = nil end end # ensure correct values & value types json['content_type'] = 'text/plain' unless CONTENT_TYPES.include?(json['content_type']) json['spoiler_text'] = '' unless json['spoiler_text'].kind_of?(String) json['text'] = '' unless json['text'].kind_of?(String) json['footer'] = nil unless json['footer'].kind_of?(String) json['reply'] = [true, 1, "1"].include?(json['reply']) json['in_reply_to_id'] = json['in_reply_to_id'].to_i unless json['in_reply_to_id'].nil? json['conversation_id'] = json['conversation_id'].to_i unless json['conversation_id'].nil? json['sensitive'] = [true, 1, "1"].include?(json['sensitive']) json['language'] = 'en' unless json['language'].kind_of?(String) && json['language'].length > 1 json['language'] = ISO_639.find(json['language'])&.alpha2 || @account.user_default_language&.presence || 'en' json['local_only'] = @account.user_always_local_only? || [true, 1, "1"].include?(json['local_only']) json['visibility'] = VISIBILITIES[json['visibility'].to_i] || :unlisted json['hidden'] = [true, 1, "1"].include?(json['hidden']) json['imported'] = true # nullify a missing reply s = Status.find_by(id: json['in_reply_to_id']) if json['in_reply_to_id'] == 0 || s.nil? || s.account_id != @account.id json['conversation_id'] = nil else json['in_reply_to_id'] = nil end # drop a nonexistant conversation id unless json['conversation_id'] == 0 || Conversation.where(id: json['conversation_id']).exists? json['conversation_id'] = nil end ApplicationRecord.transaction do status = @account.statuses.create!(json.compact.symbolize_keys) process_hashtags_service.call(status) process_mentions_service.call(status, skip_notify: true) end rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound, Mastodon::ValidationError => e Rails.logger.error "Error importing status (JSON): #{e}" nil end @account.user.vars.delete('_bangtags:disable') @account.user.save end def import_activitypub account_uri = ActivityPub::TagManager.instance.uri_for(@account) followers_uri = account_followers_url(@account) @data["orderedItems"].each do |activity| next if activity['object'].blank? next unless %w(Create Announce).include?(activity['type']) object = activity['object'] activity['actor'] = account_uri activity['to'] = if activity['to'].kind_of?(Array) activity['to'].uniq.map { |to| to.end_with?('/followers') ? followers_uri : to } else [account_uri] end activity['cc'] = if activity['cc'].kind_of?(Array) activity['cc'].uniq.map { |cc| cc.end_with?('/followers') ? followers_uri : cc } else [] end case activity['type'] when 'Announce' next unless object.kind_of?(String) when 'Note' next unless object.kind_of?(Hash) object['attributedTo'] = account_uri object['to'] = activity['to'] object['cc'] = activity['cc'] object['inReplyTo'] = nil object.delete('attachment') end activity = ActivityPub::Activity.factory(activity, @account, imported: true) activity&.perform rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound, Mastodon::ValidationError, HTTP::ConnectionError, HTTP::TimeoutError, OpenSSL::SSL::SSLError, Paperclip::Errors::NotIdentifiedByImageMagickError, Addressable::URI::InvalidURIError, Mastodon::HostValidationError, Mastodon::LengthValidationError => e Rails.logger.error "Error importing status (ActivityPub): #{e}" nil end end def import_relationships!(action, undo_action, overwrite_scope, limit, extra_fields = {}) items = @data.take(limit).map { |row| [row['Account address']&.strip, Hash[extra_fields.map { |key, header| [key, row[header]&.strip] }]] }.reject { |(id, _)| id.blank? } if @import.overwrite? presence_hash = items.each_with_object({}) { |(id, extra), mapping| mapping[id] = [true, extra] } overwrite_scope.find_each do |target_account| if presence_hash[target_account.acct] items.delete(target_account.acct) extra = presence_hash[target_account.acct][1] Import::RelationshipWorker.perform_async(@account.id, target_account.acct, action, extra) else Import::RelationshipWorker.perform_async(@account.id, target_account.acct, undo_action) end end end Import::RelationshipWorker.push_bulk(items) do |acct, extra| [@account.id, acct, action, extra] end end def parse_import_data!(default_headers) data = CSV.parse(import_data, headers: true) data = CSV.parse(import_data, headers: default_headers) unless data.headers&.first&.strip&.include?(' ') @data = data.reject(&:blank?) rescue CSV::MalformedCSVError @data = nil end def parse_import_data_json! @data = Oj.load(import_data, mode: :strict) rescue Oj::ParseError @data = [] end def import_data Paperclip.io_adapters.for(@import.data).read end def follow_limit FollowLimitValidator.limit_for_account(@account) end def process_mentions_service ProcessMentionsService.new end def process_hashtags_service ProcessHashtagsService.new end end