about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2021-09-26 13:23:28 +0200
committerGitHub <noreply@github.com>2021-09-26 13:23:28 +0200
commita0d4129893c797f78d28ba9df5d35646f7bb0d80 (patch)
treebe6b5c2dbb4e648ede66e96eda2adcab2bac33ee /streaming
parent52e5c07948c4c91b73062846e1f19ea278ec0e24 (diff)
Refactor notifications to go through a separate stream in streaming API (#16765)
Eliminate need to have custom notifications filtering logic in the
streaming API code by publishing notifications into a separate stream
and then simply using the multi-stream capability to subscribe to that
stream when necessary
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js75
1 files changed, 45 insertions, 30 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 7bb645a13..67cd48b43 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -283,6 +283,14 @@ const startWorker = (workerId) => {
   };
 
   /**
+   * @param {any} req
+   * @param {string[]} necessaryScopes
+   * @return {boolean}
+   */
+  const isInScope = (req, necessaryScopes) =>
+    req.scopes.some(scope => necessaryScopes.includes(scope));
+
+  /**
    * @param {string} token
    * @param {any} req
    * @return {Promise.<void>}
@@ -314,7 +322,6 @@ const startWorker = (workerId) => {
         req.scopes = result.rows[0].scopes.split(' ');
         req.accountId = result.rows[0].account_id;
         req.chosenLanguages = result.rows[0].chosen_languages;
-        req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope));
         req.deviceId = result.rows[0].device_id;
 
         resolve();
@@ -580,14 +587,12 @@ const startWorker = (workerId) => {
    * @param {function(string, string): void} output
    * @param {function(string[], function(string): void): void} attachCloseHandler
    * @param {boolean=} needsFiltering
-   * @param {boolean=} notificationOnly
    * @return {function(string): void}
    */
-  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
+  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
     const accountId  = req.accountId || req.remoteAddress;
-    const streamType = notificationOnly ? ' (notification)' : '';
 
-    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
+    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 
     const listener = message => {
       const json = parseJSON(message);
@@ -605,14 +610,6 @@ const startWorker = (workerId) => {
         output(event, encodedPayload);
       };
 
-      if (notificationOnly && event !== 'notification') {
-        return;
-      }
-
-      if (event === 'notification' && !req.allowNotifications) {
-        return;
-      }
-
       // Only messages that may require filtering are statuses, since notifications
       // are already personalized and deletes do not matter
       if (!needsFiltering || event !== 'update') {
@@ -759,7 +756,7 @@ const startWorker = (workerId) => {
       const onSend = streamToHttp(req, res);
       const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
-      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly);
+      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
     }).catch(err => {
       log.verbose(req.requestId, 'Subscription error:', err.toString());
       httpNotFound(res);
@@ -777,72 +774,90 @@ const startWorker = (workerId) => {
 
   /**
    * @param {any} req
+   * @return {string[]}
+   */
+  const channelsForUserStream = req => {
+    const arr = [`timeline:${req.accountId}`];
+
+    if (isInScope(req, ['crypto']) && req.deviceId) {
+      arr.push(`timeline:${req.accountId}:${req.deviceId}`);
+    }
+
+    if (isInScope(req, ['read', 'read:notifications'])) {
+      arr.push(`timeline:${req.accountId}:notifications`);
+    }
+
+    return arr;
+  };
+
+  /**
+   * @param {any} req
    * @param {string} name
    * @param {StreamParams} params
-   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
+   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
    */
   const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
     switch(name) {
     case 'user':
       resolve({
-        channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false },
+        channelIds: channelsForUserStream(req),
+        options: { needsFiltering: false },
       });
 
       break;
     case 'user:notification':
       resolve({
-        channelIds: [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: true },
+        channelIds: [`timeline:${req.accountId}:notifications`],
+        options: { needsFiltering: false },
       });
 
       break;
     case 'public':
       resolve({
         channelIds: ['timeline:public'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'public:local':
       resolve({
         channelIds: ['timeline:public:local'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'public:remote':
       resolve({
         channelIds: ['timeline:public:remote'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'public:media':
       resolve({
         channelIds: ['timeline:public:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'public:local:media':
       resolve({
         channelIds: ['timeline:public:local:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'public:remote:media':
       resolve({
         channelIds: ['timeline:public:remote:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true },
       });
 
       break;
     case 'direct':
       resolve({
         channelIds: [`timeline:direct:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false },
+        options: { needsFiltering: false },
       });
 
       break;
@@ -852,7 +867,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
-          options: { needsFiltering: true, notificationOnly: false },
+          options: { needsFiltering: true },
         });
       }
 
@@ -863,7 +878,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
-          options: { needsFiltering: true, notificationOnly: false },
+          options: { needsFiltering: true },
         });
       }
 
@@ -872,7 +887,7 @@ const startWorker = (workerId) => {
       authorizeListAccess(params.list, req).then(() => {
         resolve({
           channelIds: [`timeline:list:${params.list}`],
-          options: { needsFiltering: false, notificationOnly: false },
+          options: { needsFiltering: false },
         });
       }).catch(() => {
         reject('Not authorized to stream this list');
@@ -919,7 +934,7 @@ const startWorker = (workerId) => {
 
       const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly);
+      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
 
       subscriptions[channelIds.join(';')] = {
         listener,