about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2018-10-20 02:25:25 +0200
committerGitHub <noreply@github.com>2018-10-20 02:25:25 +0200
commit369cc5f555821d823d4daf7aab3142cdac896a69 (patch)
tree5f5d4f4e5fe76fd6d82d81e83a8de82df757e765
parenteb1b9903a6f60d024d71bffd635e6fec7edc59a9 (diff)
Check if port/socket is available before forking in Streaming API (#9023)
Previously, the server would attempt taking port/socket in worker
process, and if it was taken, fail, which made the master process
create a new worker. This led to really high CPU usage if the
streaming API was started when the port or socket were not
available.

Now, before clustering (forking) into worker processes, a test
server is created and then removed to check if it can be done.
-rw-r--r--streaming/index.js64
1 files changed, 49 insertions, 15 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 3a01be66a..dd1a8d546 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -74,6 +74,7 @@ const startMaster = () => {
   if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
     log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
   }
+
   log.info(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
@@ -616,16 +617,9 @@ const startWorker = (workerId) => {
     });
   }, 30000);
 
-  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
-    server.listen(process.env.SOCKET || process.env.PORT, () => {
-      fs.chmodSync(server.address(), 0o666);
-      log.info(`Worker ${workerId} now listening on ${server.address()}`);
-    });
-  } else {
-    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
-      log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
-    });
-  }
+  attachServerWithConfig(server, address => {
+    log.info(`Worker ${workerId} now listening on ${address}`);
+  });
 
   const onExit = () => {
     log.info(`Worker ${workerId} exiting, bye bye`);
@@ -645,9 +639,49 @@ const startWorker = (workerId) => {
   process.on('uncaughtException', onError);
 };
 
-throng({
-  workers: numWorkers,
-  lifetime: Infinity,
-  start: startWorker,
-  master: startMaster,
+const attachServerWithConfig = (server, onSuccess) => {
+  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
+    server.listen(process.env.SOCKET || process.env.PORT, () => {
+      fs.chmodSync(server.address(), 0o666);
+
+      if (onSuccess) {
+        onSuccess(server.address());
+      }
+    });
+  } else {
+    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
+      if (onSuccess) {
+        onSuccess(`${server.address().address}:${server.address().port}`);
+      }
+    });
+  }
+};
+
+const onPortAvailable = onSuccess => {
+  const testServer = http.createServer();
+
+  testServer.once('error', err => {
+    onSuccess(err);
+  });
+
+  testServer.once('listening', () => {
+    testServer.once('close', () => onSuccess());
+    testServer.close();
+  });
+
+  attachServerWithConfig(testServer);
+};
+
+onPortAvailable(err => {
+  if (err) {
+    log.error('Could not start server, the port or socket is in use');
+    return;
+  }
+
+  throng({
+    workers: numWorkers,
+    lifetime: Infinity,
+    start: startWorker,
+    master: startMaster,
+  });
 });