diff options
-rw-r--r-- | package.json | 1 | ||||
-rw-r--r-- | streaming/index.js | 107 | ||||
-rw-r--r-- | yarn.lock | 6 |
3 files changed, 67 insertions, 47 deletions
diff --git a/package.json b/package.json index 4bf7431e3..84e96fca4 100644 --- a/package.json +++ b/package.json @@ -102,6 +102,7 @@ "sass-loader": "^6.0.3", "stringz": "^0.1.2", "style-loader": "^0.16.1", + "throng": "^4.0.0", "uuid": "^3.0.1", "uws": "^0.14.5", "webpack": "^2.4.1", diff --git a/streaming/index.js b/streaming/index.js index 5145732e2..5c050fd2b 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -1,5 +1,5 @@ import os from 'os'; -import cluster from 'cluster'; +import throng from 'throng'; import dotenv from 'dotenv'; import express from 'express'; import http from 'http'; @@ -16,6 +16,8 @@ dotenv.config({ path: env === 'production' ? '.env.production' : '.env', }); +log.level = process.env.LOG_LEVEL || 'verbose'; + const dbUrlToConfig = (dbUrl) => { if (!dbUrl) { return {}; @@ -65,24 +67,15 @@ const redisUrlToClient = (defaultConfig, redisUrl) => { })); }; -if (cluster.isMaster) { - // Cluster master - const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); - - const fork = () => { - const worker = cluster.fork(); +const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); - worker.on('exit', (code, signal) => { - log.error(`Worker died with exit code ${code}, signal ${signal} received.`); - setTimeout(() => fork(), 0); - }); - }; +const startMaster = () => { + log.info(`Starting streaming API server master with ${numWorkers} workers`); +}; - for (let i = 0; i < core; i++) fork(); +const startWorker = (workerId) => { + log.info(`Starting worker ${workerId}`); - log.info(`Starting streaming API server master with ${core} workers`); -} else { - // Cluster worker const pgConfigs = { development: { database: 'mastodon_development', @@ -130,6 +123,7 @@ if (cluster.isMaster) { if (!callbacks) { return; } + callbacks.forEach(callback => callback(message)); }); @@ -215,9 +209,9 @@ if (cluster.isMaster) { }; const errorMiddleware = (err, req, res, next) => { - log.error(req.requestId, err); + log.error(req.requestId, err.toString()); res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })); + res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' })); }; const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); @@ -249,8 +243,9 @@ if (cluster.isMaster) { const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []); const accountDomain = unpackedPayload.account.acct.split('@')[1]; - if (req.filteredLanguages.indexOf(unpackedPayload.language) !== -1) { + if (Array.isArray(req.filteredLanguages) && req.filteredLanguages.includes(unpackedPayload.language)) { log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`); + done(); return; } @@ -271,6 +266,7 @@ if (cluster.isMaster) { transmit(); }).catch(err => { + done(); log.error(err); }); }); @@ -309,26 +305,13 @@ if (cluster.isMaster) { }; // Setup stream output to WebSockets - const streamToWs = (req, ws) => { - const heartbeat = setInterval(() => { - // TODO: Can't add multiple listeners, due to the limitation of uws. - if (ws.readyState !== ws.OPEN) { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`); - clearInterval(heartbeat); - return; - } - - ws.ping(); - }, 15000); - - return (event, payload) => { - if (ws.readyState !== ws.OPEN) { - log.error(req.requestId, 'Tried writing to closed socket'); - return; - } + const streamToWs = (req, ws) => (event, payload) => { + if (ws.readyState !== ws.OPEN) { + log.error(req.requestId, 'Tried writing to closed socket'); + return; + } - ws.send(JSON.stringify({ event, payload })); - }; + ws.send(JSON.stringify({ event, payload })); }; // Setup stream end for WebSockets @@ -372,6 +355,12 @@ if (cluster.isMaster) { const token = location.query.access_token; const req = { requestId: uuid.v4() }; + ws.isAlive = true; + + ws.on('pong', () => { + ws.isAlive = true; + }); + accountFromToken(token, req, err => { if (err) { log.error(req.requestId, err); @@ -401,16 +390,40 @@ if (cluster.isMaster) { }); }); + const wsInterval = setInterval(() => { + wss.clients.forEach(ws => { + if (ws.isAlive === false) { + ws.terminate(); + return; + } + + ws.isAlive = false; + ws.ping('', false, true); + }); + }, 30000); + server.listen(process.env.PORT || 4000, () => { - log.level = process.env.LOG_LEVEL || 'verbose'; - log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`); + log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`); }); - process.on('SIGINT', exit); - process.on('SIGTERM', exit); - process.on('exit', exit); - - function exit() { + const onExit = () => { + log.info(`Worker ${workerId} exiting, bye bye`); server.close(); - } -} + }; + + const onError = (err) => { + log.error(err); + }; + + process.on('SIGINT', onExit); + process.on('SIGTERM', onExit); + process.on('exit', onExit); + process.on('error', onError); +}; + +throng({ + workers: numWorkers, + lifetime: Infinity, + start: startWorker, + master: startMaster, +}); diff --git a/yarn.lock b/yarn.lock index 5274f2160..c155f1a7e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6498,6 +6498,12 @@ text-table@~0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4" +throng@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/throng/-/throng-4.0.0.tgz#983c6ba1993b58eae859998aa687ffe88df84c17" + dependencies: + lodash.defaults "^4.0.1" + through@2, through@^2.3.6: version "2.3.8" resolved "https://registry.yarnpkg.com/through/-/through-2.3.8.tgz#0dd4c9ffaabc357960b1b724115d7e0e86a2e1f5" |