diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/index.js | 80 |
1 files changed, 48 insertions, 32 deletions
diff --git a/streaming/index.js b/streaming/index.js index ac8e80fb9..9f8ab107e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -283,6 +283,14 @@ const startWorker = (workerId) => { }; /** + * @param {any} req + * @param {string[]} necessaryScopes + * @return {boolean} + */ + const isInScope = (req, necessaryScopes) => + req.scopes.some(scope => necessaryScopes.includes(scope)); + + /** * @param {string} token * @param {any} req * @return {Promise.<void>} @@ -314,7 +322,6 @@ const startWorker = (workerId) => { req.scopes = result.rows[0].scopes.split(' '); req.accountId = result.rows[0].account_id; req.chosenLanguages = result.rows[0].chosen_languages; - req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope)); req.deviceId = result.rows[0].device_id; resolve(); @@ -581,15 +588,13 @@ const startWorker = (workerId) => { * @param {function(string, string): void} output * @param {function(string[], function(string): void): void} attachCloseHandler * @param {boolean=} needsFiltering - * @param {boolean=} notificationOnly * @param {boolean=} allowLocalOnly * @return {function(string): void} */ - const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => { + const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => { const accountId = req.accountId || req.remoteAddress; - const streamType = notificationOnly ? ' (notification)' : ''; - log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`); + log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); const listener = message => { const json = parseJSON(message); @@ -607,14 +612,6 @@ const startWorker = (workerId) => { output(event, encodedPayload); }; - if (notificationOnly && event !== 'notification') { - return; - } - - if (event === 'notification' && !req.allowNotifications) { - return; - } - // Only send local-only statuses to logged-in users if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) { log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`); @@ -767,7 +764,7 @@ const startWorker = (workerId) => { const onSend = streamToHttp(req, res); const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly); }).catch(err => { log.verbose(req.requestId, 'Subscription error:', err.toString()); httpNotFound(res); @@ -785,86 +782,104 @@ const startWorker = (workerId) => { /** * @param {any} req + * @return {string[]} + */ + const channelsForUserStream = req => { + const arr = [`timeline:${req.accountId}`]; + + if (isInScope(req, ['crypto']) && req.deviceId) { + arr.push(`timeline:${req.accountId}:${req.deviceId}`); + } + + if (isInScope(req, ['read', 'read:notifications'])) { + arr.push(`timeline:${req.accountId}:notifications`); + } + + return arr; + }; + + /** + * @param {any} req * @param {string} name * @param {StreamParams} params - * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>} + * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} */ const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { switch(name) { case 'user': resolve({ - channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + channelIds: channelsForUserStream(req), + options: { needsFiltering: false, allowLocalOnly: true }, }); break; case 'user:notification': resolve({ - channelIds: [`timeline:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true }, + channelIds: [`timeline:${req.accountId}:notifications`], + options: { needsFiltering: false, allowLocalOnly: true }, }); break; case 'public': resolve({ channelIds: ['timeline:public'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) }, + options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) }, }); break; case 'public:allow_local_only': resolve({ channelIds: ['timeline:public'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:local': resolve({ channelIds: ['timeline:public:local'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:remote': resolve({ channelIds: ['timeline:public:remote'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + options: { needsFiltering: true, allowLocalOnly: false }, }); break; case 'public:media': resolve({ channelIds: ['timeline:public:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) }, + options: { needsFiltering: true, allowLocalOnly: isTruthy(query.allow_local_only) }, }); break; case 'public:allow_local_only:media': resolve({ channelIds: ['timeline:public:media'], - options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:local:media': resolve({ channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); break; case 'public:remote:media': resolve({ channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false }, + options: { needsFiltering: true, allowLocalOnly: false }, }); break; case 'direct': resolve({ channelIds: [`timeline:direct:${req.accountId}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: false, allowLocalOnly: true }, }); break; @@ -874,7 +889,7 @@ const startWorker = (workerId) => { } else { resolve({ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); } @@ -885,7 +900,7 @@ const startWorker = (workerId) => { } else { resolve({ channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`], - options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: true, allowLocalOnly: true }, }); } @@ -894,7 +909,7 @@ const startWorker = (workerId) => { authorizeListAccess(params.list, req).then(() => { resolve({ channelIds: [`timeline:list:${params.list}`], - options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true }, + options: { needsFiltering: false, allowLocalOnly: true }, }); }).catch(() => { reject('Not authorized to stream this list'); @@ -941,7 +956,8 @@ const startWorker = (workerId) => { const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly); + + const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly); subscriptions[channelIds.join(';')] = { listener, |