about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2021-03-24 09:37:41 +0100
committerGitHub <noreply@github.com>2021-03-24 09:37:41 +0100
commit49814d579932496b52cd8f6a0194c21c21f83099 (patch)
treefd82dcd8897d97df06f49b81a16e863735351497 /streaming
parentc3aef491d66aec743a3a53e934a494f653745b61 (diff)
Switch from deprecated ClusterWS/cws to ws package (#15932)
* Switch from deprecated ClusterWS/cws to ws package

Fixes #15184

Co-authored-by: Edho Arief <me@nanaya.pro>

* Make bufferutil and utf-8-validate optional dependencies

Co-authored-by: Edho Arief <me@nanaya.pro>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js22
1 files changed, 19 insertions, 3 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 3279bd94e..c50d35583 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -9,9 +9,9 @@ const redis = require('redis');
 const pg = require('pg');
 const log = require('npmlog');
 const url = require('url');
-const { WebSocketServer } = require('@clusterws/cws');
 const uuid = require('uuid');
 const fs = require('fs');
+const WebSocket = require('ws');
 
 const env = process.env.NODE_ENV || 'development';
 const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
@@ -766,7 +766,7 @@ const startWorker = (workerId) => {
     });
   });
 
-  const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
+  const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
 
   /**
    * @typedef StreamParams
@@ -999,6 +999,12 @@ const startWorker = (workerId) => {
     req.requestId     = uuid.v4();
     req.remoteAddress = ws._socket.remoteAddress;
 
+    ws.isAlive = true;
+
+    ws.on('pong', () => {
+      ws.isAlive = true;
+    });
+
     /**
      * @type {WebSocketSession}
      */
@@ -1048,7 +1054,17 @@ const startWorker = (workerId) => {
     }
   });
 
-  wss.startAutoPing(30000);
+  setInterval(() => {
+    wss.clients.forEach(ws => {
+      if (ws.isAlive === false) {
+        ws.terminate();
+        return;
+      }
+
+      ws.isAlive = false;
+      ws.ping('', false, true);
+    });
+  }, 30000);
 
   attachServerWithConfig(server, address => {
     log.info(`Worker ${workerId} now listening on ${address}`);