From ef057584fd2714d94666f9ffef4aa89147eda72c Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 11 Aug 2020 18:24:59 +0200 Subject: Add support for managing multiple stream subscriptions in a single connection (#14524) --- app/javascript/mastodon/actions/streaming.js | 99 +++++++++- app/javascript/mastodon/stream.js | 284 +++++++++++++++++++++------ 2 files changed, 308 insertions(+), 75 deletions(-) (limited to 'app/javascript') diff --git a/app/javascript/mastodon/actions/streaming.js b/app/javascript/mastodon/actions/streaming.js index d998fcac4..beb5c6a4a 100644 --- a/app/javascript/mastodon/actions/streaming.js +++ b/app/javascript/mastodon/actions/streaming.js @@ -1,3 +1,5 @@ +// @ts-check + import { connectStream } from '../stream'; import { updateTimeline, @@ -19,24 +21,59 @@ import { getLocale } from '../locales'; const { messages } = getLocale(); -export function connectTimelineStream (timelineId, path, pollingRefresh = null, accept = null) { +/** + * @param {number} max + * @return {number} + */ +const randomUpTo = max => + Math.floor(Math.random() * Math.floor(max)); - return connectStream (path, pollingRefresh, (dispatch, getState) => { +/** + * @param {string} timelineId + * @param {string} channelName + * @param {Object.} params + * @param {Object} options + * @param {function(Function, Function): void} [options.fallback] + * @param {function(object): boolean} [options.accept] + * @return {function(): void} + */ +export const connectTimelineStream = (timelineId, channelName, params = {}, options = {}) => + connectStream(channelName, params, (dispatch, getState) => { const locale = getState().getIn(['meta', 'locale']); + let pollingId; + + /** + * @param {function(Function, Function): void} fallback + */ + const useFallback = fallback => { + fallback(dispatch, () => { + pollingId = setTimeout(() => useFallback(fallback), 20000 + randomUpTo(20000)); + }); + }; + return { onConnect() { dispatch(connectTimeline(timelineId)); + + if (pollingId) { + clearTimeout(pollingId); + pollingId = null; + } }, onDisconnect() { dispatch(disconnectTimeline(timelineId)); + + if (options.fallback) { + pollingId = setTimeout(() => useFallback(options.fallback), randomUpTo(40000)); + } }, onReceive (data) { switch(data.event) { case 'update': - dispatch(updateTimeline(timelineId, JSON.parse(data.payload), accept)); + dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept)); break; case 'delete': dispatch(deleteFromTimelines(data.payload)); @@ -63,17 +100,59 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null, }, }; }); -} +/** + * @param {Function} dispatch + * @param {function(): void} done + */ const refreshHomeTimelineAndNotification = (dispatch, done) => { dispatch(expandHomeTimeline({}, () => dispatch(expandNotifications({}, () => dispatch(fetchAnnouncements(done)))))); }; -export const connectUserStream = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification); -export const connectCommunityStream = ({ onlyMedia } = {}) => connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); -export const connectPublicStream = ({ onlyMedia, onlyRemote } = {}) => connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`); -export const connectHashtagStream = (id, tag, local, accept) => connectTimelineStream(`hashtag:${id}${local ? ':local' : ''}`, `hashtag${local ? ':local' : ''}&tag=${tag}`, null, accept); -export const connectDirectStream = () => connectTimelineStream('direct', 'direct'); -export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`); +/** + * @return {function(): void} + */ +export const connectUserStream = () => + connectTimelineStream('home', 'user', {}, { fallback: refreshHomeTimelineAndNotification }); + +/** + * @param {Object} options + * @param {boolean} [options.onlyMedia] + * @return {function(): void} + */ +export const connectCommunityStream = ({ onlyMedia } = {}) => + connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); + +/** + * @param {Object} options + * @param {boolean} [options.onlyMedia] + * @param {boolean} [options.onlyRemote] + * @return {function(): void} + */ +export const connectPublicStream = ({ onlyMedia, onlyRemote } = {}) => + connectTimelineStream(`public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : ''}${onlyMedia ? ':media' : ''}`); + +/** + * @param {string} columnId + * @param {string} tagName + * @param {boolean} onlyLocal + * @param {function(object): boolean} accept + * @return {function(): void} + */ +export const connectHashtagStream = (columnId, tagName, onlyLocal, accept) => + connectTimelineStream(`hashtag:${columnId}${onlyLocal ? ':local' : ''}`, `hashtag${onlyLocal ? ':local' : ''}`, { tag: tagName }, { accept }); + +/** + * @return {function(): void} + */ +export const connectDirectStream = () => + connectTimelineStream('direct', 'direct'); + +/** + * @param {string} listId + * @return {function(): void} + */ +export const connectListStream = listId => + connectTimelineStream(`list:${listId}`, 'list', { list: listId }); diff --git a/app/javascript/mastodon/stream.js b/app/javascript/mastodon/stream.js index 0cb2b228f..640455b33 100644 --- a/app/javascript/mastodon/stream.js +++ b/app/javascript/mastodon/stream.js @@ -1,87 +1,236 @@ +// @ts-check + import WebSocketClient from '@gamestdio/websocket'; -const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); +/** + * @type {WebSocketClient | undefined} + */ +let sharedConnection; -const knownEventTypes = [ - 'update', - 'delete', - 'notification', - 'conversation', - 'filters_changed', -]; +/** + * @typedef Subscription + * @property {string} channelName + * @property {Object.} params + * @property {function(): void} onConnect + * @property {function(StreamEvent): void} onReceive + * @property {function(): void} onDisconnect + */ -export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { - return (dispatch, getState) => { - const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); - const accessToken = getState().getIn(['meta', 'access_token']); - const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + /** + * @typedef StreamEvent + * @property {string} event + * @property {object} payload + */ - let polling = null; +/** + * @type {Array.} + */ +const subscriptions = []; - const setupPolling = () => { - pollingRefresh(dispatch, () => { - polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); - }); - }; +/** + * @type {Object.} + */ +const subscriptionCounters = {}; + +/** + * @param {Subscription} subscription + */ +const addSubscription = subscription => { + subscriptions.push(subscription); +}; + +/** + * @param {Subscription} subscription + */ +const removeSubscription = subscription => { + const index = subscriptions.indexOf(subscription); + + if (index !== -1) { + subscriptions.splice(index, 1); + } +}; + +/** + * @param {Subscription} subscription + */ +const subscribe = ({ channelName, params, onConnect }) => { + const key = channelNameWithInlineParams(channelName, params); + + subscriptionCounters[key] = subscriptionCounters[key] || 0; + + if (subscriptionCounters[key] === 0) { + sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params })); + } + + subscriptionCounters[key] += 1; + onConnect(); +}; + +/** + * @param {Subscription} subscription + */ +const unsubscribe = ({ channelName, params, onDisconnect }) => { + const key = channelNameWithInlineParams(channelName, params); - const clearPolling = () => { - if (polling) { - clearTimeout(polling); - polling = null; + subscriptionCounters[key] = subscriptionCounters[key] || 1; + + if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) { + sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params })); + } + + subscriptionCounters[key] -= 1; + onDisconnect(); +}; + +const sharedCallbacks = { + connected () { + subscriptions.forEach(subscription => subscribe(subscription)); + }, + + received (data) { + const { stream } = data; + + subscriptions.filter(({ channelName, params }) => { + const streamChannelName = stream[0]; + + if (stream.length === 1) { + return channelName === streamChannelName; } - }; - const subscription = getStream(streamingAPIBaseURL, accessToken, path, { - connected () { - if (pollingRefresh) { - clearPolling(); - } + const streamIdentifier = stream[1]; - onConnect(); - }, + if (['hashtag', 'hashtag:local'].includes(channelName)) { + return channelName === streamChannelName && params.tag === streamIdentifier; + } else if (channelName === 'list') { + return channelName === streamChannelName && params.list === streamIdentifier; + } - disconnected () { - if (pollingRefresh) { - polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); - } + return false; + }).forEach(subscription => { + subscription.onReceive(data); + }); + }, - onDisconnect(); + disconnected () { + subscriptions.forEach(({ onDisconnect }) => onDisconnect()); + }, + + reconnected () { + subscriptions.forEach(subscription => subscribe(subscription)); + }, +}; + +/** + * @param {string} channelName + * @param {Object.} params + * @return {string} + */ +const channelNameWithInlineParams = (channelName, params) => { + if (Object.keys(params).length === 0) { + return channelName; + } + + return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`; +}; + +/** + * @param {string} channelName + * @param {Object.} params + * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks + * @return {function(): void} + */ +export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState); + + // If we cannot use a websockets connection, we must fall back + // to using individual connections for each channel + if (!streamingAPIBaseURL.startsWith('ws')) { + const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), { + connected () { + onConnect(); }, received (data) { onReceive(data); }, - reconnected () { - if (pollingRefresh) { - clearPolling(); - pollingRefresh(dispatch); - } + disconnected () { + onDisconnect(); + }, + reconnected () { onConnect(); }, - }); - const disconnect = () => { - if (subscription) { - subscription.close(); - } - - clearPolling(); + return () => { + connection.close(); }; + } + + const subscription = { + channelName, + params, + onConnect, + onReceive, + onDisconnect, + }; + + addSubscription(subscription); + + // If a connection is open, we can execute the subscription right now. Otherwise, + // because we have already registered it, it will be executed on connect + + if (!sharedConnection) { + sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks)); + } else if (sharedConnection.readyState === WebSocketClient.OPEN) { + subscribe(subscription); + } - return disconnect; + return () => { + removeSubscription(subscription); + unsubscribe(subscription); }; -} +}; + +const KNOWN_EVENT_TYPES = [ + 'update', + 'delete', + 'notification', + 'conversation', + 'filters_changed', + 'encrypted_message', + 'announcement', + 'announcement.delete', + 'announcement.reaction', +]; + +/** + * @param {MessageEvent} e + * @param {function(StreamEvent): void} received + */ +const handleEventSourceMessage = (e, received) => { + received({ + event: e.type, + payload: e.data, + }); +}; +/** + * @param {string} streamingAPIBaseURL + * @param {string} accessToken + * @param {string} channelName + * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks + * @return {WebSocketClient | EventSource} + */ +const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => { + const params = channelName.split('&'); -export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { - const params = stream.split('&'); - stream = params.shift(); + channelName = params.shift(); if (streamingAPIBaseURL.startsWith('ws')) { - params.unshift(`stream=${stream}`); const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); ws.onopen = connected; @@ -92,11 +241,19 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co return ws; } - stream = stream.replace(/:/g, '/'); + channelName = channelName.replace(/:/g, '/'); + + if (channelName.endsWith(':media')) { + channelName = channelName.replace('/media', ''); + params.push('only_media=true'); + } + params.push(`access_token=${accessToken}`); - const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`); + + const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`); let firstConnect = true; + es.onopen = () => { if (firstConnect) { firstConnect = false; @@ -105,15 +262,12 @@ export default function getStream(streamingAPIBaseURL, accessToken, stream, { co reconnected(); } }; - for (let type of knownEventTypes) { - es.addEventListener(type, (e) => { - received({ - event: e.type, - payload: e.data, - }); - }); - } - es.onerror = disconnected; + + KNOWN_EVENT_TYPES.forEach(type => { + es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received)); + }); + + es.onerror = /** @type {function(): void} */ (disconnected); return es; }; -- cgit