about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js99
1 files changed, 80 insertions, 19 deletions
diff --git a/streaming/index.js b/streaming/index.js
index debf7c8bf..b4d09d0ad 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -74,6 +74,7 @@ const startMaster = () => {
   if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
     log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
   }
+
   log.info(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
@@ -449,6 +450,11 @@ const startWorker = (workerId) => {
     });
   };
 
+  const httpNotFound = res => {
+    res.writeHead(404, { 'Content-Type': 'application/json' });
+    res.end(JSON.stringify({ error: 'Not found' }));
+  };
+
   app.use(setRequestId);
   app.use(setRemoteAddress);
   app.use(allowCrossDomain);
@@ -490,11 +496,25 @@ const startWorker = (workerId) => {
   });
 
   app.get('/api/v1/streaming/hashtag', (req, res) => {
-    streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+    const { tag } = req.query;
+
+    if (!tag || tag.length === 0) {
+      httpNotFound(res);
+      return;
+    }
+
+    streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
   });
 
   app.get('/api/v1/streaming/hashtag/local', (req, res) => {
-    streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+    const { tag } = req.query;
+
+    if (!tag || tag.length === 0) {
+      httpNotFound(res);
+      return;
+    }
+
+    streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
   });
 
   app.get('/api/v1/streaming/list', (req, res) => {
@@ -502,8 +522,7 @@ const startWorker = (workerId) => {
 
     authorizeListAccess(listId, req, authorized => {
       if (!authorized) {
-        res.writeHead(404, { 'Content-Type': 'application/json' });
-        res.end(JSON.stringify({ error: 'Not found' }));
+        httpNotFound(res);
         return;
       }
 
@@ -553,9 +572,19 @@ const startWorker = (workerId) => {
       streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
       break;
     case 'hashtag':
+      if (!location.query.tag || location.query.tag.length === 0) {
+        ws.close();
+        return;
+      }
+
       streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
       break;
     case 'hashtag:local':
+      if (!location.query.tag || location.query.tag.length === 0) {
+        ws.close();
+        return;
+      }
+
       streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
       break;
     case 'list':
@@ -588,16 +617,9 @@ const startWorker = (workerId) => {
     });
   }, 30000);
 
-  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
-    server.listen(process.env.SOCKET || process.env.PORT, () => {
-      fs.chmodSync(server.address(), 0o666);
-      log.info(`Worker ${workerId} now listening on ${server.address()}`);
-    });
-  } else {
-    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
-      log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
-    });
-  }
+  attachServerWithConfig(server, address => {
+    log.info(`Worker ${workerId} now listening on ${address}`);
+  });
 
   const onExit = () => {
     log.info(`Worker ${workerId} exiting, bye bye`);
@@ -617,9 +639,48 @@ const startWorker = (workerId) => {
   process.on('uncaughtException', onError);
 };
 
-throng({
-  workers: numWorkers,
-  lifetime: Infinity,
-  start: startWorker,
-  master: startMaster,
+const attachServerWithConfig = (server, onSuccess) => {
+  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
+    server.listen(process.env.SOCKET || process.env.PORT, () => {
+      if (onSuccess) {
+        fs.chmodSync(server.address(), 0o666);
+        onSuccess(server.address());
+      }
+    });
+  } else {
+    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
+      if (onSuccess) {
+        onSuccess(`${server.address().address}:${server.address().port}`);
+      }
+    });
+  }
+};
+
+const onPortAvailable = onSuccess => {
+  const testServer = http.createServer();
+
+  testServer.once('error', err => {
+    onSuccess(err);
+  });
+
+  testServer.once('listening', () => {
+    testServer.once('close', () => onSuccess());
+    testServer.close();
+  });
+
+  attachServerWithConfig(testServer);
+};
+
+onPortAvailable(err => {
+  if (err) {
+    log.error('Could not start server, the port or socket is in use');
+    return;
+  }
+
+  throng({
+    workers: numWorkers,
+    lifetime: Infinity,
+    start: startWorker,
+    master: startMaster,
+  });
 });