about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-05-28 16:25:26 +0200
committerGitHub <noreply@github.com>2017-05-28 16:25:26 +0200
commit425d02287a243797da2baf70c0bfd61341180956 (patch)
tree530e8925e808ad510a7b5cb3aa74e3001a698618
parent2e429c0c25c0f82abb6a6b348195cd541052397e (diff)
Improve streaming API cluster logging (#3370)
* Improve streaming API cluster logging

* Less verbose error middleware logging (stack trace useless there)

* Fix error logging

* Prevent potential issue

* Add missing "done()" in catch of Promise.all, websocket heartbeat re-implemented like in example

* I actually forgot a done(), the absolute madman
-rw-r--r--package.json1
-rw-r--r--streaming/index.js107
-rw-r--r--yarn.lock6
3 files changed, 67 insertions, 47 deletions
diff --git a/package.json b/package.json
index 4bf7431e3..84e96fca4 100644
--- a/package.json
+++ b/package.json
@@ -102,6 +102,7 @@
     "sass-loader": "^6.0.3",
     "stringz": "^0.1.2",
     "style-loader": "^0.16.1",
+    "throng": "^4.0.0",
     "uuid": "^3.0.1",
     "uws": "^0.14.5",
     "webpack": "^2.4.1",
diff --git a/streaming/index.js b/streaming/index.js
index 5145732e2..5c050fd2b 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -1,5 +1,5 @@
 import os from 'os';
-import cluster from 'cluster';
+import throng from 'throng';
 import dotenv from 'dotenv';
 import express from 'express';
 import http from 'http';
@@ -16,6 +16,8 @@ dotenv.config({
   path: env === 'production' ? '.env.production' : '.env',
 });
 
+log.level = process.env.LOG_LEVEL || 'verbose';
+
 const dbUrlToConfig = (dbUrl) => {
   if (!dbUrl) {
     return {};
@@ -65,24 +67,15 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
   }));
 };
 
-if (cluster.isMaster) {
-  // Cluster master
-  const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
-
-  const fork = () => {
-    const worker = cluster.fork();
+const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
 
-    worker.on('exit', (code, signal) => {
-      log.error(`Worker died with exit code ${code}, signal ${signal} received.`);
-      setTimeout(() => fork(), 0);
-    });
-  };
+const startMaster = () => {
+  log.info(`Starting streaming API server master with ${numWorkers} workers`);
+};
 
-  for (let i = 0; i < core; i++) fork();
+const startWorker = (workerId) => {
+  log.info(`Starting worker ${workerId}`);
 
-  log.info(`Starting streaming API server master with ${core} workers`);
-} else {
-  // Cluster worker
   const pgConfigs = {
     development: {
       database: 'mastodon_development',
@@ -130,6 +123,7 @@ if (cluster.isMaster) {
     if (!callbacks) {
       return;
     }
+
     callbacks.forEach(callback => callback(message));
   });
 
@@ -215,9 +209,9 @@ if (cluster.isMaster) {
   };
 
   const errorMiddleware = (err, req, res, next) => {
-    log.error(req.requestId, err);
+    log.error(req.requestId, err.toString());
     res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
-    res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }));
+    res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
   };
 
   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
@@ -249,8 +243,9 @@ if (cluster.isMaster) {
           const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []);
           const accountDomain    = unpackedPayload.account.acct.split('@')[1];
 
-          if (req.filteredLanguages.indexOf(unpackedPayload.language) !== -1) {
+          if (Array.isArray(req.filteredLanguages) && req.filteredLanguages.includes(unpackedPayload.language)) {
             log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
+            done();
             return;
           }
 
@@ -271,6 +266,7 @@ if (cluster.isMaster) {
 
             transmit();
           }).catch(err => {
+            done();
             log.error(err);
           });
         });
@@ -309,26 +305,13 @@ if (cluster.isMaster) {
   };
 
   // Setup stream output to WebSockets
-  const streamToWs = (req, ws) => {
-    const heartbeat = setInterval(() => {
-      // TODO: Can't add multiple listeners, due to the limitation of uws.
-      if (ws.readyState !== ws.OPEN) {
-        log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
-        clearInterval(heartbeat);
-        return;
-      }
-
-      ws.ping();
-    }, 15000);
-
-    return (event, payload) => {
-      if (ws.readyState !== ws.OPEN) {
-        log.error(req.requestId, 'Tried writing to closed socket');
-        return;
-      }
+  const streamToWs = (req, ws) => (event, payload) => {
+    if (ws.readyState !== ws.OPEN) {
+      log.error(req.requestId, 'Tried writing to closed socket');
+      return;
+    }
 
-      ws.send(JSON.stringify({ event, payload }));
-    };
+    ws.send(JSON.stringify({ event, payload }));
   };
 
   // Setup stream end for WebSockets
@@ -372,6 +355,12 @@ if (cluster.isMaster) {
     const token    = location.query.access_token;
     const req      = { requestId: uuid.v4() };
 
+    ws.isAlive = true;
+
+    ws.on('pong', () => {
+      ws.isAlive = true;
+    });
+
     accountFromToken(token, req, err => {
       if (err) {
         log.error(req.requestId, err);
@@ -401,16 +390,40 @@ if (cluster.isMaster) {
     });
   });
 
+  const wsInterval = setInterval(() => {
+    wss.clients.forEach(ws => {
+      if (ws.isAlive === false) {
+        ws.terminate();
+        return;
+      }
+
+      ws.isAlive = false;
+      ws.ping('', false, true);
+    });
+  }, 30000);
+
   server.listen(process.env.PORT || 4000, () => {
-    log.level = process.env.LOG_LEVEL || 'verbose';
-    log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`);
+    log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
   });
 
-  process.on('SIGINT', exit);
-  process.on('SIGTERM', exit);
-  process.on('exit', exit);
-
-  function exit() {
+  const onExit = () => {
+    log.info(`Worker ${workerId} exiting, bye bye`);
     server.close();
-  }
-}
+  };
+
+  const onError = (err) => {
+    log.error(err);
+  };
+
+  process.on('SIGINT', onExit);
+  process.on('SIGTERM', onExit);
+  process.on('exit', onExit);
+  process.on('error', onError);
+};
+
+throng({
+  workers: numWorkers,
+  lifetime: Infinity,
+  start: startWorker,
+  master: startMaster,
+});
diff --git a/yarn.lock b/yarn.lock
index 5274f2160..c155f1a7e 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -6498,6 +6498,12 @@ text-table@~0.2.0:
   version "0.2.0"
   resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"
 
+throng@^4.0.0:
+  version "4.0.0"
+  resolved "https://registry.yarnpkg.com/throng/-/throng-4.0.0.tgz#983c6ba1993b58eae859998aa687ffe88df84c17"
+  dependencies:
+    lodash.defaults "^4.0.1"
+
 through@2, through@^2.3.6:
   version "2.3.8"
   resolved "https://registry.yarnpkg.com/through/-/through-2.3.8.tgz#0dd4c9ffaabc357960b1b724115d7e0e86a2e1f5"