about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2022-03-21 19:08:29 +0100
committerGitHub <noreply@github.com>2022-03-21 19:08:29 +0100
commitf29458da1d6b270d44dc0e6a5643a735ecb73aee (patch)
tree037199bfd79a73ec35c01377e0c8967d23ceb0ee /streaming
parent7eb2e791eed587d9eaa7dee3480f03f7db5a6425 (diff)
Fix streaming server sometimes silently dropping subscriptions (#17841)
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js43
1 files changed, 40 insertions, 3 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 3db94b160..d6b445a91 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -167,6 +167,11 @@ const startWorker = async (workerId) => {
 
   const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
 
+  /**
+   * @type {Object.<string, Array.<function(string): void>>}
+   */
+  const subs = {};
+
   const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
   const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
 
@@ -191,23 +196,55 @@ const startWorker = async (workerId) => {
   };
 
   /**
+   * @param {string} message
+   * @param {string} channel
+   */
+  const onRedisMessage = (message, channel) => {
+    const callbacks = subs[channel];
+
+    log.silly(`New message on channel ${channel}`);
+
+    if (!callbacks) {
+      return;
+    }
+
+    callbacks.forEach(callback => callback(message));
+  };
+
+  /**
    * @param {string} channel
    * @param {function(string): void} callback
    */
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
 
-    redisSubscribeClient.subscribe(channel, callback);
+    subs[channel] = subs[channel] || [];
+
+    if (subs[channel].length === 0) {
+      log.verbose(`Subscribe ${channel}`);
+      redisSubscribeClient.subscribe(channel, onRedisMessage);
+    }
+
+    subs[channel].push(callback);
   };
 
   /**
    * @param {string} channel
-   * @param {function(string): void} callback
    */
   const unsubscribe = (channel, callback) => {
     log.silly(`Removing listener for ${channel}`);
 
-    redisSubscribeClient.unsubscribe(channel, callback);
+    if (!subs[channel]) {
+      return;
+    }
+
+    subs[channel] = subs[channel].filter(item => item !== callback);
+
+    if (subs[channel].length === 0) {
+      log.verbose(`Unsubscribe ${channel}`);
+      redisSubscribeClient.unsubscribe(channel);
+      delete subs[channel];
+    }
   };
 
   const FALSE_VALUES = [