diff options
Diffstat (limited to 'streaming/index.js')
-rw-r--r-- | streaming/index.js | 209 |
1 files changed, 107 insertions, 102 deletions
diff --git a/streaming/index.js b/streaming/index.js index ac8e80fb9..2dbb546c0 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => { * @param {Object.<string, any>} defaultConfig * @param {string} redisUrl */ -const redisUrlToClient = (defaultConfig, redisUrl) => { +const redisUrlToClient = async (defaultConfig, redisUrl) => { const config = defaultConfig; + let client; + if (!redisUrl) { - return redis.createClient(config); + client = redis.createClient(config); + } else if (redisUrl.startsWith('unix://')) { + client = redis.createClient(Object.assign(config, { + socket: { + path: redisUrl.slice(7), + }, + })); + } else { + client = redis.createClient(Object.assign(config, { + url: redisUrl, + })); } - if (redisUrl.startsWith('unix://')) { - return redis.createClient(redisUrl.slice(7), config); - } + client.on('error', (err) => log.error('Redis Client Error!', err)); + await client.connect(); - return redis.createClient(Object.assign(config, { - url: redisUrl, - })); + return client; }; const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); @@ -102,7 +111,7 @@ const startMaster = () => { log.warn(`Starting streaming API server master with ${numWorkers} workers`); }; -const startWorker = (workerId) => { +const startWorker = async (workerId) => { log.warn(`Starting worker ${workerId}`); const pgConfigs = { @@ -127,7 +136,7 @@ const startWorker = (workerId) => { if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') { pgConfigs.development.ssl = true; - pgConfigs.production.ssl = true; + pgConfigs.production.ssl = true; } const app = express(); @@ -139,9 +148,11 @@ const startWorker = (workerId) => { const redisNamespace = process.env.REDIS_NAMESPACE || null; const redisParams = { - host: process.env.REDIS_HOST || '127.0.0.1', - port: process.env.REDIS_PORT || 6379, - db: process.env.REDIS_DB || 0, + socket: { + host: process.env.REDIS_HOST || '127.0.0.1', + port: process.env.REDIS_PORT || 6379, + }, + database: process.env.REDIS_DB || 0, password: process.env.REDIS_PASSWORD || undefined, }; @@ -151,25 +162,8 @@ const startWorker = (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; - const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL); - const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); - - /** - * @type {Object.<string, Array.<function(string): void>>} - */ - const subs = {}; - - redisSubscribeClient.on('message', (channel, message) => { - const callbacks = subs[channel]; - - log.silly(`New message on channel ${channel}`); - - if (!callbacks) { - return; - } - - callbacks.forEach(callback => callback(message)); - }); + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); + const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); /** * @param {string[]} channels @@ -197,14 +191,8 @@ const startWorker = (workerId) => { */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); - subs[channel] = subs[channel] || []; - if (subs[channel].length === 0) { - log.verbose(`Subscribe ${channel}`); - redisSubscribeClient.subscribe(channel); - } - - subs[channel].push(callback); + redisSubscribeClient.subscribe(channel, callback); }; /** @@ -214,17 +202,7 @@ const startWorker = (workerId) => { const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); - if (!subs[channel]) { - return; - } - - subs[channel] = subs[channel].filter(item => item !== callback); - - if (subs[channel].length === 0) { - log.verbose(`Unsubscribe ${channel}`); - redisSubscribeClient.unsubscribe(channel); - delete subs[channel]; - } + redisSubscribeClient.unsubscribe(channel, callback); }; const FALSE_VALUES = [ @@ -283,6 +261,14 @@ const startWorker = (workerId) => { }; /** + * @param {any} req + * @param {string[]} necessaryScopes + * @return {boolean} + */ + const isInScope = (req, necessaryScopes) => + req.scopes.some(scope => necessaryScopes.includes(scope)); + + /** * @param {string} token * @param {any} req * @return {Promise.<void>} @@ -314,7 +300,6 @@ const startWorker = (workerId) => { req.scopes = result.rows[0].scopes.split(' '); req.accountId = result.rows[0].account_id; req.chosenLanguages = result.rows[0].chosen_languages; - req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope)); req.deviceId = result.rows[0].device_id; resolve(); @@ -359,7 +344,7 @@ const startWorker = (workerId) => { const onlyMedia = isTruthy(query.only_media); const allowLocalOnly = isTruthy(query.allow_local_only); - switch(path) { + switch (path) { case '/api/v1/streaming/user': return 'user'; case '/api/v1/streaming/user/notification': @@ -424,7 +409,7 @@ const startWorker = (workerId) => { requiredScopes.push('read:statuses'); } - if (requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { + if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { resolve(); return; } @@ -490,7 +475,7 @@ const startWorker = (workerId) => { const listener = createSystemMessageListener(req, { - onKill () { + onKill() { res.end(); }, @@ -542,7 +527,7 @@ const startWorker = (workerId) => { }; /** - * @param {array} + * @param {array} arr * @param {number=} shift * @return {string} */ @@ -581,15 +566,13 @@ const startWorker = (workerId) => { * @param {function(string, string): void} output * @param {function(string[], function(string): void): void} attachCloseHandler * @param {boolean=} needsFiltering - * @param {boolean=} notificationOnly * @param {boolean=} allowLocalOnly * @return {function(string): void} */ - const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => { - const accountId = req.accountId || req.remoteAddress; - const streamType = notificationOnly ? ' (notification)' : ''; + const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => { + const accountId = req.accountId || req.remoteAddress; - log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`); + log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); const listener = message => { const json = parseJSON(message); @@ -599,22 +582,14 @@ const startWorker = (workerId) => { const { event, payload, queued_at } = json; const transmit = () => { - const now = new Date().getTime(); - const delta = now - queued_at; + const now = new Date().getTime(); + const delta = now - queued_at; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); output(event, encodedPayload); }; - if (notificationOnly && event !== 'notification') { - return; - } - - if (event === 'notification' && !req.allowNotifications) { - return; - } - // Only send local-only statuses to logged-in users if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) { log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`); @@ -628,9 +603,9 @@ const startWorker = (workerId) => { return; } - const unpackedPayload = payload; + const unpackedPayload = payload; const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)); - const accountDomain = unpackedPayload.account.acct.split('@')[1]; + const accountDomain = unpackedPayload.account.acct.split('@')[1]; if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) { log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`); @@ -650,7 +625,15 @@ const startWorker = (workerId) => { } const queries = [ - client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), + client.query(`SELECT 1 + FROM blocks + WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) + OR (account_id = $2 AND target_account_id = $1) + UNION + SELECT 1 + FROM mutes + WHERE account_id = $1 + AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), ]; if (accountDomain) { @@ -713,12 +696,12 @@ const startWorker = (workerId) => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @return {function(string[], function(string): void)} + * @return {function(string[]): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { + const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id, listener); + unsubscribe(id); }); if (closeHandler) { @@ -765,9 +748,9 @@ const startWorker = (workerId) => { app.get('/api/v1/streaming/*', (req, res) => { channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { const onSend = streamToHttp(req, res); - const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly); }).catch(err => { log.verbose(req.requestId, 'Subscription error:', err.toString()); httpNotFound(res); @@ -785,86 +768,104 @@ const startWorker = (workerId) => { /** * @param {any} req + * @return {string[]} + */ + const channelsForUserStream = req => { + const arr = [`timeline:${req.accountId}`]; + + if (isInScope(req, ['crypto']) && req.deviceId) { + arr.push(`timeline:${req.accountId}:${req.deviceId}`); + } + + if (isInScope(req, ['read', 'read:notifications'])) { + arr.push(`timeline:${req.accountId}:notifications`); + } + + return arr; + }; + + /** + * @param {any} req * @param {string} name * @param {StreamParams} params - * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>} + * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} */ const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { - switch(name) { + switch (name) { case 'user': resolve({ - channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + channelIds: channelsForUserStream(req), + options: { needsFiltering: false, allowLocalOnly: true }, }); break; case 'user:notification': resolve({ - channelIds: [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true }, + channelIds: [`timeline:${req.accountId}:notifications`], + options: { needsFiltering: false, allowLocalOnly: true }, }); break; case 'public': resolve({ channelIds: ['timeline:public'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) }, + options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) }, }); break; case 'public:allow_local_only': resolve({ channelIds: ['timeline:public'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:local': resolve({ channelIds: ['timeline:public:local'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:remote': resolve({ channelIds: ['timeline:public:remote'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + options: { needsFiltering: true, allowLocalOnly: false }, }); break; case 'public:media': resolve({ channelIds: ['timeline:public:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) }, + options: { needsFiltering: true, allowLocalOnly: isTruthy(query.allow_local_only) }, }); break; case 'public:allow_local_only:media': resolve({ channelIds: ['timeline:public:media'], - options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:local:media': resolve({ channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:remote:media': resolve({ channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + options: { needsFiltering: true, allowLocalOnly: false }, }); break; case 'direct': resolve({ channelIds: [`timeline:direct:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: false, allowLocalOnly: true }, }); break; @@ -874,7 +875,7 @@ const startWorker = (workerId) => { } else { resolve({ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); } @@ -885,7 +886,7 @@ const startWorker = (workerId) => { } else { resolve({ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); } @@ -894,7 +895,7 @@ const startWorker = (workerId) => { authorizeListAccess(params.list, req).then(() => { resolve({ channelIds: [`timeline:list:${params.list}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: false, allowLocalOnly: true }, }); }).catch(() => { reject('Not authorized to stream this list'); @@ -934,14 +935,17 @@ const startWorker = (workerId) => { * @param {StreamParams} params */ const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => - checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => { + checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ + channelIds, + options, + }) => { if (subscriptions[channelIds.join(';')]) { return; } - const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); + const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly); subscriptions[channelIds.join(';')] = { listener, @@ -989,7 +993,7 @@ const startWorker = (workerId) => { const listener = createSystemMessageListener(request, { - onKill () { + onKill() { socket.close(); }, @@ -999,7 +1003,8 @@ const startWorker = (workerId) => { subscriptions[systemChannelId] = { listener, - stopHeartbeat: () => {}, + stopHeartbeat: () => { + }, }; }; @@ -1018,7 +1023,7 @@ const startWorker = (workerId) => { wss.on('connection', (ws, req) => { const location = url.parse(req.url, true); - req.requestId = uuid.v4(); + req.requestId = uuid.v4(); req.remoteAddress = ws._socket.remoteAddress; ws.isAlive = true; |