diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/index.js | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/streaming/index.js b/streaming/index.js index 3fdc9615e..780c4015d 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -167,6 +167,11 @@ const startWorker = async (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; + /** + * @type {Object.<string, Array.<function(string): void>>} + */ + const subs = {}; + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); @@ -191,23 +196,55 @@ const startWorker = async (workerId) => { }; /** + * @param {string} message + * @param {string} channel + */ + const onRedisMessage = (message, channel) => { + const callbacks = subs[channel]; + + log.silly(`New message on channel ${channel}`); + + if (!callbacks) { + return; + } + + callbacks.forEach(callback => callback(message)); + }; + + /** * @param {string} channel * @param {function(string): void} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); - redisSubscribeClient.subscribe(channel, callback); + subs[channel] = subs[channel] || []; + + if (subs[channel].length === 0) { + log.verbose(`Subscribe ${channel}`); + redisSubscribeClient.subscribe(channel, onRedisMessage); + } + + subs[channel].push(callback); }; /** * @param {string} channel - * @param {function(string): void} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); - redisSubscribeClient.unsubscribe(channel, callback); + if (!subs[channel]) { + return; + } + + subs[channel] = subs[channel].filter(item => item !== callback); + + if (subs[channel].length === 0) { + log.verbose(`Unsubscribe ${channel}`); + redisSubscribeClient.unsubscribe(channel); + delete subs[channel]; + } }; const FALSE_VALUES = [ |