about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js135
1 files changed, 61 insertions, 74 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 190f96b51..ff873cd04 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => {
  * @param {Object.<string, any>} defaultConfig
  * @param {string} redisUrl
  */
-const redisUrlToClient = (defaultConfig, redisUrl) => {
+const redisUrlToClient = async (defaultConfig, redisUrl) => {
   const config = defaultConfig;
 
+  let client;
+
   if (!redisUrl) {
-    return redis.createClient(config);
+    client = redis.createClient(config);
+  } else if (redisUrl.startsWith('unix://')) {
+    client = redis.createClient(Object.assign(config, {
+      socket: {
+        path: redisUrl.slice(7),
+      },
+    }));
+  } else {
+    client = redis.createClient(Object.assign(config, {
+      url: redisUrl,
+    }));
   }
 
-  if (redisUrl.startsWith('unix://')) {
-    return redis.createClient(redisUrl.slice(7), config);
-  }
+  client.on('error', (err) => log.error('Redis Client Error!', err));
+  await client.connect();
 
-  return redis.createClient(Object.assign(config, {
-    url: redisUrl,
-  }));
+  return client;
 };
 
 const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
@@ -102,7 +111,7 @@ const startMaster = () => {
   log.warn(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
-const startWorker = (workerId) => {
+const startWorker = async (workerId) => {
   log.warn(`Starting worker ${workerId}`);
 
   const pgConfigs = {
@@ -127,7 +136,7 @@ const startWorker = (workerId) => {
 
   if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
     pgConfigs.development.ssl = true;
-    pgConfigs.production.ssl  = true;
+    pgConfigs.production.ssl = true;
   }
 
   const app = express();
@@ -139,9 +148,11 @@ const startWorker = (workerId) => {
   const redisNamespace = process.env.REDIS_NAMESPACE || null;
 
   const redisParams = {
-    host:     process.env.REDIS_HOST     || '127.0.0.1',
-    port:     process.env.REDIS_PORT     || 6379,
-    db:       process.env.REDIS_DB       || 0,
+    socket: {
+      host: process.env.REDIS_HOST || '127.0.0.1',
+      port: process.env.REDIS_PORT || 6379,
+    },
+    database: process.env.REDIS_DB || 0,
     password: process.env.REDIS_PASSWORD || undefined,
   };
 
@@ -151,25 +162,8 @@ const startWorker = (workerId) => {
 
   const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
 
-  const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
-  const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
-
-  /**
-   * @type {Object.<string, Array.<function(string): void>>}
-   */
-  const subs = {};
-
-  redisSubscribeClient.on('message', (channel, message) => {
-    const callbacks = subs[channel];
-
-    log.silly(`New message on channel ${channel}`);
-
-    if (!callbacks) {
-      return;
-    }
-
-    callbacks.forEach(callback => callback(message));
-  });
+  const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
+  const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
 
   /**
    * @param {string[]} channels
@@ -197,34 +191,16 @@ const startWorker = (workerId) => {
    */
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
-    subs[channel] = subs[channel] || [];
-
-    if (subs[channel].length === 0) {
-      log.verbose(`Subscribe ${channel}`);
-      redisSubscribeClient.subscribe(channel);
-    }
 
-    subs[channel].push(callback);
+    redisSubscribeClient.subscribe(channel, callback);
   };
 
   /**
    * @param {string} channel
-   * @param {function(string): void} callback
    */
-  const unsubscribe = (channel, callback) => {
-    log.silly(`Removing listener for ${channel}`);
-
-    if (!subs[channel]) {
-      return;
-    }
+  const unsubscribe = (channel) => {
 
-    subs[channel] = subs[channel].filter(item => item !== callback);
-
-    if (subs[channel].length === 0) {
-      log.verbose(`Unsubscribe ${channel}`);
-      redisSubscribeClient.unsubscribe(channel);
-      delete subs[channel];
-    }
+    redisSubscribeClient.unsubscribe(channel);
   };
 
   const FALSE_VALUES = [
@@ -366,7 +342,7 @@ const startWorker = (workerId) => {
     const onlyMedia = isTruthy(query.only_media);
     const allowLocalOnly = isTruthy(query.allow_local_only);
 
-    switch(path) {
+    switch (path) {
     case '/api/v1/streaming/user':
       return 'user';
     case '/api/v1/streaming/user/notification':
@@ -497,7 +473,7 @@ const startWorker = (workerId) => {
 
     const listener = createSystemMessageListener(req, {
 
-      onKill () {
+      onKill() {
         res.end();
       },
 
@@ -549,7 +525,7 @@ const startWorker = (workerId) => {
   };
 
   /**
-   * @param {array}
+   * @param {array} arr
    * @param {number=} shift
    * @return {string}
    */
@@ -592,7 +568,7 @@ const startWorker = (workerId) => {
    * @return {function(string): void}
    */
   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => {
-    const accountId  = req.accountId || req.remoteAddress;
+    const accountId = req.accountId || req.remoteAddress;
 
     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 
@@ -604,8 +580,8 @@ const startWorker = (workerId) => {
       const { event, payload, queued_at } = json;
 
       const transmit = () => {
-        const now            = new Date().getTime();
-        const delta          = now - queued_at;
+        const now = new Date().getTime();
+        const delta = now - queued_at;
         const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 
         log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
@@ -625,9 +601,9 @@ const startWorker = (workerId) => {
         return;
       }
 
-      const unpackedPayload  = payload;
+      const unpackedPayload = payload;
       const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
-      const accountDomain    = unpackedPayload.account.acct.split('@')[1];
+      const accountDomain = unpackedPayload.account.acct.split('@')[1];
 
       if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
         log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
@@ -647,7 +623,15 @@ const startWorker = (workerId) => {
         }
 
         const queries = [
-          client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
+          client.query(`SELECT 1
+                        FROM blocks
+                        WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
+                           OR (account_id = $2 AND target_account_id = $1)
+                        UNION
+                        SELECT 1
+                        FROM mutes
+                        WHERE account_id = $1
+                          AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
         ];
 
         if (accountDomain) {
@@ -710,12 +694,12 @@ const startWorker = (workerId) => {
   /**
    * @param {any} req
    * @param {function(): void} [closeHandler]
-   * @return {function(string[], function(string): void)}
+   * @return {function(string[]): void}
    */
-  const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
+  const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
     req.on('close', () => {
       ids.forEach(id => {
-        unsubscribe(id, listener);
+        unsubscribe(id);
       });
 
       if (closeHandler) {
@@ -762,7 +746,7 @@ const startWorker = (workerId) => {
   app.get('/api/v1/streaming/*', (req, res) => {
     channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
       const onSend = streamToHttp(req, res);
-      const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
+      const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
       streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly);
     }).catch(err => {
@@ -805,7 +789,7 @@ const startWorker = (workerId) => {
    * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
    */
   const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
-    switch(name) {
+    switch (name) {
     case 'user':
       resolve({
         channelIds: channelsForUserStream(req),
@@ -949,15 +933,17 @@ const startWorker = (workerId) => {
    * @param {StreamParams} params
    */
   const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
-    checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => {
+    checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
+      channelIds,
+      options,
+    }) => {
       if (subscriptions[channelIds.join(';')]) {
         return;
       }
 
-      const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
+      const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-
-      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly);
+      const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly);
 
       subscriptions[channelIds.join(';')] = {
         listener,
@@ -1005,7 +991,7 @@ const startWorker = (workerId) => {
 
     const listener = createSystemMessageListener(request, {
 
-      onKill () {
+      onKill() {
         socket.close();
       },
 
@@ -1015,7 +1001,8 @@ const startWorker = (workerId) => {
 
     subscriptions[systemChannelId] = {
       listener,
-      stopHeartbeat: () => {},
+      stopHeartbeat: () => {
+      },
     };
   };
 
@@ -1034,7 +1021,7 @@ const startWorker = (workerId) => {
   wss.on('connection', (ws, req) => {
     const location = url.parse(req.url, true);
 
-    req.requestId     = uuid.v4();
+    req.requestId = uuid.v4();
     req.remoteAddress = ws._socket.remoteAddress;
 
     ws.isAlive = true;