From 8ae52dc79235bc9ed6878bed857539d14e0b57da Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 11 Aug 2020 18:24:59 +0200 Subject: [Glitch] Add support for managing multiple stream subscriptions in a single connection Ported ef057584fd2714d94666f9ffef4aa89147eda72c to glitch-soc Signed-off-by: Thibaut Girka --- app/javascript/flavours/glitch/util/stream.js | 284 ++++++++++++++++++++------ 1 file changed, 219 insertions(+), 65 deletions(-) (limited to 'app/javascript/flavours/glitch/util/stream.js') diff --git a/app/javascript/flavours/glitch/util/stream.js b/app/javascript/flavours/glitch/util/stream.js index 0cb2b228f..640455b33 100644 --- a/app/javascript/flavours/glitch/util/stream.js +++ b/app/javascript/flavours/glitch/util/stream.js @@ -1,87 +1,236 @@ +// @ts-check + import WebSocketClient from '@gamestdio/websocket'; -const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); +/** + * @type {WebSocketClient | undefined} + */ +let sharedConnection; -const knownEventTypes = [ - 'update', - 'delete', - 'notification', - 'conversation', - 'filters_changed', -]; +/** + * @typedef Subscription + * @property {string} channelName + * @property {Object.} params + * @property {function(): void} onConnect + * @property {function(StreamEvent): void} onReceive + * @property {function(): void} onDisconnect + */ -export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { - return (dispatch, getState) => { - const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); - const accessToken = getState().getIn(['meta', 'access_token']); - const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + /** + * @typedef StreamEvent + * @property {string} event + * @property {object} payload + */ - let polling = null; +/** + * @type {Array.} + */ +const subscriptions = []; - const setupPolling = () => { - pollingRefresh(dispatch, () => { - polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); - }); - }; +/** + * @type {Object.} + */ +const subscriptionCounters = {}; + +/** + * @param {Subscription} subscription + */ +const addSubscription = subscription => { + subscriptions.push(subscription); +}; + +/** + * @param {Subscription} subscription + */ +const removeSubscription = subscription => { + const index = subscriptions.indexOf(subscription); + + if (index !== -1) { + subscriptions.splice(index, 1); + } +}; + +/** + * @param {Subscription} subscription + */ +const subscribe = ({ channelName, params, onConnect }) => { + const key = channelNameWithInlineParams(channelName, params); + + subscriptionCounters[key] = subscriptionCounters[key] || 0; + + if (subscriptionCounters[key] === 0) { + sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params })); + } + + subscriptionCounters[key] += 1; + onConnect(); +}; + +/** + * @param {Subscription} subscription + */ +const unsubscribe = ({ channelName, params, onDisconnect }) => { + const key = channelNameWithInlineParams(channelName, params); - const clearPolling = () => { - if (polling) { - clearTimeout(polling); - polling = null; + subscriptionCounters[key] = subscriptionCounters[key] || 1; + + if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) { + sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params })); + } + + subscriptionCounters[key] -= 1; + onDisconnect(); +}; + +const sharedCallbacks = { + connected () { + subscriptions.forEach(subscription => subscribe(subscription)); + }, + + received (data) { + const { stream } = data; + + subscriptions.filter(({ channelName, params }) => { + const streamChannelName = stream[0]; + + if (stream.length === 1) { + return channelName === streamChannelName; } - }; - const subscription = getStream(streamingAPIBaseURL, accessToken, path, { - connected () { - if (pollingRefresh) { - clearPolling(); - } + const streamIdentifier = stream[1]; - onConnect(); - }, + if (['hashtag', 'hashtag:local'].includes(channelName)) { + return channelName === streamChannelName && params.tag === streamIdentifier; + } else if (channelName === 'list') { + return channelName === streamChannelName && params.list === streamIdentifier; + } - disconnected () { - if (pollingRefresh) { - polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); - } + return false; + }).forEach(subscription => { + subscription.onReceive(data); + }); + }, - onDisconnect(); + disconnected () { + subscriptions.forEach(({ onDisconnect }) => onDisconnect()); + }, + + reconnected () { + subscriptions.forEach(subscription => subscribe(subscription)); + }, +}; + +/** + * @param {string} channelName + * @param {Object.} params + * @return {string} + */ +const channelNameWithInlineParams = (channelName, params) => { + if (Object.keys(params).length === 0) { + return channelName; + } + + return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`; +}; + +/** + * @param {string} channelName + * @param {Object.} params + * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks + * @return {function(): void} + */ +export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState); + + // If we cannot use a websockets connection, we must fall back + // to using individual connections for each channel + if (!streamingAPIBaseURL.startsWith('ws')) { + const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), { + connected () { + onConnect(); }, received (data) { onReceive(data); }, - reconnected () { - if (pollingRefresh) { - clearPolling(); - pollingRefresh(dispatch); - } + disconnected () { + onDisconnect(); + }, + reconnected () { onConnect(); }, - }); - const disconnect = () => { - if (subscription) { - subscription.close(); - } - - clearPolling(); + return () => { + connection.close(); }; + } + + const subscription = { + channelName, + params, + onConnect, + onReceive, + onDisconnect, + }; + + addSubscription(subscription); + + // If a connection is open, we can execute the subscription right now. Otherwise, + // because we have already registered it, it will be executed on connect + + if (!sharedConnection) { + sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks)); + } else if (sharedConnection.readyState === WebSocketClient.OPEN) { + subscribe(subscription); + } - return disconnect; + return () => { + removeSubscription(subscription); + unsubscribe(subscription); }; -} +}; + +const KNOWN_EVENT_TYPES = [ + 'update', + 'delete', + 'notification', + 'conversation', + 'filters_changed', + 'encrypted_message', + 'announcement', + 'announcement.delete', + 'announcement.reaction', +]; + +/** + * @param {MessageEvent} e + * @param {function(StreamEvent): void} received + */ +const handleEventSourceMessage = (e, received) => { + received({ + event: e.type, + payload: e.data, + }); +}; +/** + * @param {string} streamingAPIBaseURL + * @param {string} accessToken + * @param {string} channelName + * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks + * @return {WebSocketClient | EventSource} + */ +const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => { + const params = channelName.split('&'); -export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { - const params = stream.split('&'); - stream = params.shift(); + channelName = params.shift(); if (streamingAPIBaseURL.startsWith('ws')) { - params.unshift(`stream=${stream}`); const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); ws.onopen = connected; @@ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co return ws; } - stream = stream.replace(/:/g, '/'); + channelName = channelName.replace(/:/g, '/'); + + if (channelName.endsWith(':media')) { + channelName = channelName.replace('/media', ''); + params.push('only_media=true'); + } + params.push(`access_token=${accessToken}`); - const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`); + + const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`); let firstConnect = true; + es.onopen = () => { if (firstConnect) { firstConnect = false; @@ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co reconnected(); } }; - for (let type of knownEventTypes) { - es.addEventListener(type, (e) => { - received({ - event: e.type, - payload: e.data, - }); - }); - } - es.onerror = disconnected; + + KNOWN_EVENT_TYPES.forEach(type => { + es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received)); + }); + + es.onerror = /** @type {function(): void} */ (disconnected); return es; }; -- cgit