From 18e7ef6edabf85020f04fe6582ad5cdaae253d8a Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 11 Oct 2018 19:24:43 +0200 Subject: Add check for missing tag param in streaming API (#8955) * Add check for missing tag param in streaming API Fixes error: ``` TypeError: Cannot read property 'toLowerCase' of undefined at app.get (.../streaming/index.js:493:50) ``` * Fix code style issues --- streaming/index.js | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index debf7c8bf..3a01be66a 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -449,6 +449,11 @@ const startWorker = (workerId) => { }); }; + const httpNotFound = res => { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); + }; + app.use(setRequestId); app.use(setRemoteAddress); app.use(allowCrossDomain); @@ -490,11 +495,25 @@ const startWorker = (workerId) => { }); app.get('/api/v1/streaming/hashtag', (req, res) => { - streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true); + const { tag } = req.query; + + if (!tag || tag.length === 0) { + httpNotFound(res); + return; + } + + streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true); }); app.get('/api/v1/streaming/hashtag/local', (req, res) => { - streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true); + const { tag } = req.query; + + if (!tag || tag.length === 0) { + httpNotFound(res); + return; + } + + streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true); }); app.get('/api/v1/streaming/list', (req, res) => { @@ -502,8 +521,7 @@ const startWorker = (workerId) => { authorizeListAccess(listId, req, authorized => { if (!authorized) { - res.writeHead(404, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Not found' })); + httpNotFound(res); return; } @@ -553,9 +571,19 @@ const startWorker = (workerId) => { streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true); break; case 'hashtag': + if (!location.query.tag || location.query.tag.length === 0) { + ws.close(); + return; + } + streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); break; case 'hashtag:local': + if (!location.query.tag || location.query.tag.length === 0) { + ws.close(); + return; + } + streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); break; case 'list': -- cgit From 369cc5f555821d823d4daf7aab3142cdac896a69 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Sat, 20 Oct 2018 02:25:25 +0200 Subject: Check if port/socket is available before forking in Streaming API (#9023) Previously, the server would attempt taking port/socket in worker process, and if it was taken, fail, which made the master process create a new worker. This led to really high CPU usage if the streaming API was started when the port or socket were not available. Now, before clustering (forking) into worker processes, a test server is created and then removed to check if it can be done. --- streaming/index.js | 64 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 15 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 3a01be66a..dd1a8d546 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -74,6 +74,7 @@ const startMaster = () => { if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) { log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.'); } + log.info(`Starting streaming API server master with ${numWorkers} workers`); }; @@ -616,16 +617,9 @@ const startWorker = (workerId) => { }); }, 30000); - if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { - server.listen(process.env.SOCKET || process.env.PORT, () => { - fs.chmodSync(server.address(), 0o666); - log.info(`Worker ${workerId} now listening on ${server.address()}`); - }); - } else { - server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => { - log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`); - }); - } + attachServerWithConfig(server, address => { + log.info(`Worker ${workerId} now listening on ${address}`); + }); const onExit = () => { log.info(`Worker ${workerId} exiting, bye bye`); @@ -645,9 +639,49 @@ const startWorker = (workerId) => { process.on('uncaughtException', onError); }; -throng({ - workers: numWorkers, - lifetime: Infinity, - start: startWorker, - master: startMaster, +const attachServerWithConfig = (server, onSuccess) => { + if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { + server.listen(process.env.SOCKET || process.env.PORT, () => { + fs.chmodSync(server.address(), 0o666); + + if (onSuccess) { + onSuccess(server.address()); + } + }); + } else { + server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => { + if (onSuccess) { + onSuccess(`${server.address().address}:${server.address().port}`); + } + }); + } +}; + +const onPortAvailable = onSuccess => { + const testServer = http.createServer(); + + testServer.once('error', err => { + onSuccess(err); + }); + + testServer.once('listening', () => { + testServer.once('close', () => onSuccess()); + testServer.close(); + }); + + attachServerWithConfig(testServer); +}; + +onPortAvailable(err => { + if (err) { + log.error('Could not start server, the port or socket is in use'); + return; + } + + throng({ + workers: numWorkers, + lifetime: Infinity, + start: startWorker, + master: startMaster, + }); }); -- cgit From 8d70d3de3842cd079484b8e683516ff291de7e24 Mon Sep 17 00:00:00 2001 From: Gomasy Date: Sun, 21 Oct 2018 23:41:33 +0900 Subject: Fix crash when using UNIX socket (#9036) --- streaming/index.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index dd1a8d546..b4d09d0ad 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -642,9 +642,8 @@ const startWorker = (workerId) => { const attachServerWithConfig = (server, onSuccess) => { if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { server.listen(process.env.SOCKET || process.env.PORT, () => { - fs.chmodSync(server.address(), 0o666); - if (onSuccess) { + fs.chmodSync(server.address(), 0o666); onSuccess(server.address()); } }); -- cgit