about summary refs log tree commit diff
path: root/streaming/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/index.js')
-rw-r--r--streaming/index.js209
1 files changed, 107 insertions, 102 deletions
diff --git a/streaming/index.js b/streaming/index.js
index ac8e80fb9..2dbb546c0 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => {
  * @param {Object.<string, any>} defaultConfig
  * @param {string} redisUrl
  */
-const redisUrlToClient = (defaultConfig, redisUrl) => {
+const redisUrlToClient = async (defaultConfig, redisUrl) => {
   const config = defaultConfig;
 
+  let client;
+
   if (!redisUrl) {
-    return redis.createClient(config);
+    client = redis.createClient(config);
+  } else if (redisUrl.startsWith('unix://')) {
+    client = redis.createClient(Object.assign(config, {
+      socket: {
+        path: redisUrl.slice(7),
+      },
+    }));
+  } else {
+    client = redis.createClient(Object.assign(config, {
+      url: redisUrl,
+    }));
   }
 
-  if (redisUrl.startsWith('unix://')) {
-    return redis.createClient(redisUrl.slice(7), config);
-  }
+  client.on('error', (err) => log.error('Redis Client Error!', err));
+  await client.connect();
 
-  return redis.createClient(Object.assign(config, {
-    url: redisUrl,
-  }));
+  return client;
 };
 
 const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
@@ -102,7 +111,7 @@ const startMaster = () => {
   log.warn(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
-const startWorker = (workerId) => {
+const startWorker = async (workerId) => {
   log.warn(`Starting worker ${workerId}`);
 
   const pgConfigs = {
@@ -127,7 +136,7 @@ const startWorker = (workerId) => {
 
   if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
     pgConfigs.development.ssl = true;
-    pgConfigs.production.ssl  = true;
+    pgConfigs.production.ssl = true;
   }
 
   const app = express();
@@ -139,9 +148,11 @@ const startWorker = (workerId) => {
   const redisNamespace = process.env.REDIS_NAMESPACE || null;
 
   const redisParams = {
-    host:     process.env.REDIS_HOST     || '127.0.0.1',
-    port:     process.env.REDIS_PORT     || 6379,
-    db:       process.env.REDIS_DB       || 0,
+    socket: {
+      host: process.env.REDIS_HOST || '127.0.0.1',
+      port: process.env.REDIS_PORT || 6379,
+    },
+    database: process.env.REDIS_DB || 0,
     password: process.env.REDIS_PASSWORD || undefined,
   };
 
@@ -151,25 +162,8 @@ const startWorker = (workerId) => {
 
   const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
 
-  const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
-  const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
-
-  /**
-   * @type {Object.<string, Array.<function(string): void>>}
-   */
-  const subs = {};
-
-  redisSubscribeClient.on('message', (channel, message) => {
-    const callbacks = subs[channel];
-
-    log.silly(`New message on channel ${channel}`);
-
-    if (!callbacks) {
-      return;
-    }
-
-    callbacks.forEach(callback => callback(message));
-  });
+  const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
+  const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
 
   /**
    * @param {string[]} channels
@@ -197,14 +191,8 @@ const startWorker = (workerId) => {
    */
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
-    subs[channel] = subs[channel] || [];
 
-    if (subs[channel].length === 0) {
-      log.verbose(`Subscribe ${channel}`);
-      redisSubscribeClient.subscribe(channel);
-    }
-
-    subs[channel].push(callback);
+    redisSubscribeClient.subscribe(channel, callback);
   };
 
   /**
@@ -214,17 +202,7 @@ const startWorker = (workerId) => {
   const unsubscribe = (channel, callback) => {
     log.silly(`Removing listener for ${channel}`);
 
-    if (!subs[channel]) {
-      return;
-    }
-
-    subs[channel] = subs[channel].filter(item => item !== callback);
-
-    if (subs[channel].length === 0) {
-      log.verbose(`Unsubscribe ${channel}`);
-      redisSubscribeClient.unsubscribe(channel);
-      delete subs[channel];
-    }
+    redisSubscribeClient.unsubscribe(channel, callback);
   };
 
   const FALSE_VALUES = [
@@ -283,6 +261,14 @@ const startWorker = (workerId) => {
   };
 
   /**
+   * @param {any} req
+   * @param {string[]} necessaryScopes
+   * @return {boolean}
+   */
+  const isInScope = (req, necessaryScopes) =>
+    req.scopes.some(scope => necessaryScopes.includes(scope));
+
+  /**
    * @param {string} token
    * @param {any} req
    * @return {Promise.<void>}
@@ -314,7 +300,6 @@ const startWorker = (workerId) => {
         req.scopes = result.rows[0].scopes.split(' ');
         req.accountId = result.rows[0].account_id;
         req.chosenLanguages = result.rows[0].chosen_languages;
-        req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope));
         req.deviceId = result.rows[0].device_id;
 
         resolve();
@@ -359,7 +344,7 @@ const startWorker = (workerId) => {
     const onlyMedia = isTruthy(query.only_media);
     const allowLocalOnly = isTruthy(query.allow_local_only);
 
-    switch(path) {
+    switch (path) {
     case '/api/v1/streaming/user':
       return 'user';
     case '/api/v1/streaming/user/notification':
@@ -424,7 +409,7 @@ const startWorker = (workerId) => {
       requiredScopes.push('read:statuses');
     }
 
-    if (requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
+    if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
       resolve();
       return;
     }
@@ -490,7 +475,7 @@ const startWorker = (workerId) => {
 
     const listener = createSystemMessageListener(req, {
 
-      onKill () {
+      onKill() {
         res.end();
       },
 
@@ -542,7 +527,7 @@ const startWorker = (workerId) => {
   };
 
   /**
-   * @param {array}
+   * @param {array} arr
    * @param {number=} shift
    * @return {string}
    */
@@ -581,15 +566,13 @@ const startWorker = (workerId) => {
    * @param {function(string, string): void} output
    * @param {function(string[], function(string): void): void} attachCloseHandler
    * @param {boolean=} needsFiltering
-   * @param {boolean=} notificationOnly
    * @param {boolean=} allowLocalOnly
    * @return {function(string): void}
    */
-  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => {
-    const accountId  = req.accountId || req.remoteAddress;
-    const streamType = notificationOnly ? ' (notification)' : '';
+  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => {
+    const accountId = req.accountId || req.remoteAddress;
 
-    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
+    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 
     const listener = message => {
       const json = parseJSON(message);
@@ -599,22 +582,14 @@ const startWorker = (workerId) => {
       const { event, payload, queued_at } = json;
 
       const transmit = () => {
-        const now            = new Date().getTime();
-        const delta          = now - queued_at;
+        const now = new Date().getTime();
+        const delta = now - queued_at;
         const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 
         log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
         output(event, encodedPayload);
       };
 
-      if (notificationOnly && event !== 'notification') {
-        return;
-      }
-
-      if (event === 'notification' && !req.allowNotifications) {
-        return;
-      }
-
       // Only send local-only statuses to logged-in users
       if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) {
         log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
@@ -628,9 +603,9 @@ const startWorker = (workerId) => {
         return;
       }
 
-      const unpackedPayload  = payload;
+      const unpackedPayload = payload;
       const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
-      const accountDomain    = unpackedPayload.account.acct.split('@')[1];
+      const accountDomain = unpackedPayload.account.acct.split('@')[1];
 
       if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
         log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
@@ -650,7 +625,15 @@ const startWorker = (workerId) => {
         }
 
         const queries = [
-          client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
+          client.query(`SELECT 1
+                        FROM blocks
+                        WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
+                           OR (account_id = $2 AND target_account_id = $1)
+                        UNION
+                        SELECT 1
+                        FROM mutes
+                        WHERE account_id = $1
+                          AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
         ];
 
         if (accountDomain) {
@@ -713,12 +696,12 @@ const startWorker = (workerId) => {
   /**
    * @param {any} req
    * @param {function(): void} [closeHandler]
-   * @return {function(string[], function(string): void)}
+   * @return {function(string[]): void}
    */
-  const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
+  const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
     req.on('close', () => {
       ids.forEach(id => {
-        unsubscribe(id, listener);
+        unsubscribe(id);
       });
 
       if (closeHandler) {
@@ -765,9 +748,9 @@ const startWorker = (workerId) => {
   app.get('/api/v1/streaming/*', (req, res) => {
     channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
       const onSend = streamToHttp(req, res);
-      const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
+      const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
-      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
+      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly);
     }).catch(err => {
       log.verbose(req.requestId, 'Subscription error:', err.toString());
       httpNotFound(res);
@@ -785,86 +768,104 @@ const startWorker = (workerId) => {
 
   /**
    * @param {any} req
+   * @return {string[]}
+   */
+  const channelsForUserStream = req => {
+    const arr = [`timeline:${req.accountId}`];
+
+    if (isInScope(req, ['crypto']) && req.deviceId) {
+      arr.push(`timeline:${req.accountId}:${req.deviceId}`);
+    }
+
+    if (isInScope(req, ['read', 'read:notifications'])) {
+      arr.push(`timeline:${req.accountId}:notifications`);
+    }
+
+    return arr;
+  };
+
+  /**
+   * @param {any} req
    * @param {string} name
    * @param {StreamParams} params
-   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
+   * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
    */
   const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
-    switch(name) {
+    switch (name) {
     case 'user':
       resolve({
-        channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+        channelIds: channelsForUserStream(req),
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
     case 'user:notification':
       resolve({
-        channelIds: [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true },
+        channelIds: [`timeline:${req.accountId}:notifications`],
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
     case 'public':
       resolve({
         channelIds: ['timeline:public'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) },
+        options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) },
       });
 
       break;
     case 'public:allow_local_only':
       resolve({
         channelIds: ['timeline:public'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:local':
       resolve({
         channelIds: ['timeline:public:local'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote':
       resolve({
         channelIds: ['timeline:public:remote'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
+        options: { needsFiltering: true, allowLocalOnly: false },
       });
 
       break;
     case 'public:media':
       resolve({
         channelIds: ['timeline:public:media'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) },
+        options: { needsFiltering: true, allowLocalOnly: isTruthy(query.allow_local_only) },
       });
 
       break;
     case 'public:allow_local_only:media':
       resolve({
         channelIds: ['timeline:public:media'],
-        options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:local:media':
       resolve({
         channelIds: ['timeline:public:local:media'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote:media':
       resolve({
         channelIds: ['timeline:public:remote:media'],
-        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
+        options: { needsFiltering: true, allowLocalOnly: false },
       });
 
       break;
     case 'direct':
       resolve({
         channelIds: [`timeline:direct:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
@@ -874,7 +875,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
-          options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+          options: { needsFiltering: true, allowLocalOnly: true },
         });
       }
 
@@ -885,7 +886,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
-          options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
+          options: { needsFiltering: true, allowLocalOnly: true },
         });
       }
 
@@ -894,7 +895,7 @@ const startWorker = (workerId) => {
       authorizeListAccess(params.list, req).then(() => {
         resolve({
           channelIds: [`timeline:list:${params.list}`],
-          options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
+          options: { needsFiltering: false, allowLocalOnly: true },
         });
       }).catch(() => {
         reject('Not authorized to stream this list');
@@ -934,14 +935,17 @@ const startWorker = (workerId) => {
    * @param {StreamParams} params
    */
   const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
-    checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => {
+    checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
+      channelIds,
+      options,
+    }) => {
       if (subscriptions[channelIds.join(';')]) {
         return;
       }
 
-      const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
+      const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
+      const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly);
 
       subscriptions[channelIds.join(';')] = {
         listener,
@@ -989,7 +993,7 @@ const startWorker = (workerId) => {
 
     const listener = createSystemMessageListener(request, {
 
-      onKill () {
+      onKill() {
         socket.close();
       },
 
@@ -999,7 +1003,8 @@ const startWorker = (workerId) => {
 
     subscriptions[systemChannelId] = {
       listener,
-      stopHeartbeat: () => {},
+      stopHeartbeat: () => {
+      },
     };
   };
 
@@ -1018,7 +1023,7 @@ const startWorker = (workerId) => {
   wss.on('connection', (ws, req) => {
     const location = url.parse(req.url, true);
 
-    req.requestId     = uuid.v4();
+    req.requestId = uuid.v4();
     req.remoteAddress = ws._socket.remoteAddress;
 
     ws.isAlive = true;