diff options
author | Claire <claire.github-309c@sitedethib.com> | 2022-03-22 17:00:38 +0100 |
---|---|---|
committer | Claire <claire.github-309c@sitedethib.com> | 2022-03-22 17:00:38 +0100 |
commit | 59864ff495cb607b4e2632b8b8ca183c1e602626 (patch) | |
tree | 7dfc6dc1fff3d8864497d0ed32aff5862b5e47e9 /streaming | |
parent | 9ff119eecd1079e52a8a41d7b8d61520c4303c2f (diff) | |
parent | 67d550830b3b1a9a2b65b1ab98ea4bcd491666c5 (diff) |
Merge branch 'main' into glitch-soc/merge-upstream
Conflicts: - `.github/dependabot.yml`: Upstream modified it, but we deleted it in glitch-soc. Keep it deleted.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/index.js | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/streaming/index.js b/streaming/index.js index 3fdc9615e..780c4015d 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -167,6 +167,11 @@ const startWorker = async (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; + /** + * @type {Object.<string, Array.<function(string): void>>} + */ + const subs = {}; + const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL); @@ -191,23 +196,55 @@ const startWorker = async (workerId) => { }; /** + * @param {string} message + * @param {string} channel + */ + const onRedisMessage = (message, channel) => { + const callbacks = subs[channel]; + + log.silly(`New message on channel ${channel}`); + + if (!callbacks) { + return; + } + + callbacks.forEach(callback => callback(message)); + }; + + /** * @param {string} channel * @param {function(string): void} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); - redisSubscribeClient.subscribe(channel, callback); + subs[channel] = subs[channel] || []; + + if (subs[channel].length === 0) { + log.verbose(`Subscribe ${channel}`); + redisSubscribeClient.subscribe(channel, onRedisMessage); + } + + subs[channel].push(callback); }; /** * @param {string} channel - * @param {function(string): void} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); - redisSubscribeClient.unsubscribe(channel, callback); + if (!subs[channel]) { + return; + } + + subs[channel] = subs[channel].filter(item => item !== callback); + + if (subs[channel].length === 0) { + log.verbose(`Unsubscribe ${channel}`); + redisSubscribeClient.unsubscribe(channel); + delete subs[channel]; + } }; const FALSE_VALUES = [ |