From f29458da1d6b270d44dc0e6a5643a735ecb73aee Mon Sep 17 00:00:00 2001 From: Claire Date: Mon, 21 Mar 2022 19:08:29 +0100 Subject: Fix streaming server sometimes silently dropping subscriptions (#17841) --- streaming/index.js | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/index.js b/streaming/index.js index 3db94b160..d6b445a91 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -167,6 +167,11 @@ const startWorker = async (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; + /** + * @type {Object.>} + */ + const subs = {}; + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); @@ -190,6 +195,22 @@ const startWorker = async (workerId) => { }; }; + /** + * @param {string} message + * @param {string} channel + */ + const onRedisMessage = (message, channel) => { + const callbacks = subs[channel]; + + log.silly(`New message on channel ${channel}`); + + if (!callbacks) { + return; + } + + callbacks.forEach(callback => callback(message)); + }; + /** * @param {string} channel * @param {function(string): void} callback @@ -197,17 +218,33 @@ const startWorker = async (workerId) => { const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); - redisSubscribeClient.subscribe(channel, callback); + subs[channel] = subs[channel] || []; + + if (subs[channel].length === 0) { + log.verbose(`Subscribe ${channel}`); + redisSubscribeClient.subscribe(channel, onRedisMessage); + } + + subs[channel].push(callback); }; /** * @param {string} channel - * @param {function(string): void} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); - redisSubscribeClient.unsubscribe(channel, callback); + if (!subs[channel]) { + return; + } + + subs[channel] = subs[channel].filter(item => item !== callback); + + if (subs[channel].length === 0) { + log.verbose(`Unsubscribe ${channel}`); + redisSubscribeClient.unsubscribe(channel); + delete subs[channel]; + } }; const FALSE_VALUES = [ -- cgit