about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js107
1 files changed, 60 insertions, 47 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 5145732e2..5c050fd2b 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -1,5 +1,5 @@
 import os from 'os';
-import cluster from 'cluster';
+import throng from 'throng';
 import dotenv from 'dotenv';
 import express from 'express';
 import http from 'http';
@@ -16,6 +16,8 @@ dotenv.config({
   path: env === 'production' ? '.env.production' : '.env',
 });
 
+log.level = process.env.LOG_LEVEL || 'verbose';
+
 const dbUrlToConfig = (dbUrl) => {
   if (!dbUrl) {
     return {};
@@ -65,24 +67,15 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
   }));
 };
 
-if (cluster.isMaster) {
-  // Cluster master
-  const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
-
-  const fork = () => {
-    const worker = cluster.fork();
+const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
 
-    worker.on('exit', (code, signal) => {
-      log.error(`Worker died with exit code ${code}, signal ${signal} received.`);
-      setTimeout(() => fork(), 0);
-    });
-  };
+const startMaster = () => {
+  log.info(`Starting streaming API server master with ${numWorkers} workers`);
+};
 
-  for (let i = 0; i < core; i++) fork();
+const startWorker = (workerId) => {
+  log.info(`Starting worker ${workerId}`);
 
-  log.info(`Starting streaming API server master with ${core} workers`);
-} else {
-  // Cluster worker
   const pgConfigs = {
     development: {
       database: 'mastodon_development',
@@ -130,6 +123,7 @@ if (cluster.isMaster) {
     if (!callbacks) {
       return;
     }
+
     callbacks.forEach(callback => callback(message));
   });
 
@@ -215,9 +209,9 @@ if (cluster.isMaster) {
   };
 
   const errorMiddleware = (err, req, res, next) => {
-    log.error(req.requestId, err);
+    log.error(req.requestId, err.toString());
     res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
-    res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }));
+    res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
   };
 
   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
@@ -249,8 +243,9 @@ if (cluster.isMaster) {
           const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []);
           const accountDomain    = unpackedPayload.account.acct.split('@')[1];
 
-          if (req.filteredLanguages.indexOf(unpackedPayload.language) !== -1) {
+          if (Array.isArray(req.filteredLanguages) && req.filteredLanguages.includes(unpackedPayload.language)) {
             log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
+            done();
             return;
           }
 
@@ -271,6 +266,7 @@ if (cluster.isMaster) {
 
             transmit();
           }).catch(err => {
+            done();
             log.error(err);
           });
         });
@@ -309,26 +305,13 @@ if (cluster.isMaster) {
   };
 
   // Setup stream output to WebSockets
-  const streamToWs = (req, ws) => {
-    const heartbeat = setInterval(() => {
-      // TODO: Can't add multiple listeners, due to the limitation of uws.
-      if (ws.readyState !== ws.OPEN) {
-        log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
-        clearInterval(heartbeat);
-        return;
-      }
-
-      ws.ping();
-    }, 15000);
-
-    return (event, payload) => {
-      if (ws.readyState !== ws.OPEN) {
-        log.error(req.requestId, 'Tried writing to closed socket');
-        return;
-      }
+  const streamToWs = (req, ws) => (event, payload) => {
+    if (ws.readyState !== ws.OPEN) {
+      log.error(req.requestId, 'Tried writing to closed socket');
+      return;
+    }
 
-      ws.send(JSON.stringify({ event, payload }));
-    };
+    ws.send(JSON.stringify({ event, payload }));
   };
 
   // Setup stream end for WebSockets
@@ -372,6 +355,12 @@ if (cluster.isMaster) {
     const token    = location.query.access_token;
     const req      = { requestId: uuid.v4() };
 
+    ws.isAlive = true;
+
+    ws.on('pong', () => {
+      ws.isAlive = true;
+    });
+
     accountFromToken(token, req, err => {
       if (err) {
         log.error(req.requestId, err);
@@ -401,16 +390,40 @@ if (cluster.isMaster) {
     });
   });
 
+  const wsInterval = setInterval(() => {
+    wss.clients.forEach(ws => {
+      if (ws.isAlive === false) {
+        ws.terminate();
+        return;
+      }
+
+      ws.isAlive = false;
+      ws.ping('', false, true);
+    });
+  }, 30000);
+
   server.listen(process.env.PORT || 4000, () => {
-    log.level = process.env.LOG_LEVEL || 'verbose';
-    log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`);
+    log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
   });
 
-  process.on('SIGINT', exit);
-  process.on('SIGTERM', exit);
-  process.on('exit', exit);
-
-  function exit() {
+  const onExit = () => {
+    log.info(`Worker ${workerId} exiting, bye bye`);
     server.close();
-  }
-}
+  };
+
+  const onError = (err) => {
+    log.error(err);
+  };
+
+  process.on('SIGINT', onExit);
+  process.on('SIGTERM', onExit);
+  process.on('exit', onExit);
+  process.on('error', onError);
+};
+
+throng({
+  workers: numWorkers,
+  lifetime: Infinity,
+  start: startWorker,
+  master: startMaster,
+});