From 8ae52dc79235bc9ed6878bed857539d14e0b57da Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 11 Aug 2020 18:24:59 +0200 Subject: [Glitch] Add support for managing multiple stream subscriptions in a single connection Ported ef057584fd2714d94666f9ffef4aa89147eda72c to glitch-soc Signed-off-by: Thibaut Girka --- .../flavours/glitch/actions/streaming.js | 100 ++++++++++++++++++--- 1 file changed, 90 insertions(+), 10 deletions(-) (limited to 'app/javascript/flavours/glitch/actions') diff --git a/app/javascript/flavours/glitch/actions/streaming.js b/app/javascript/flavours/glitch/actions/streaming.js index 0253c24b2..35db5dcc9 100644 --- a/app/javascript/flavours/glitch/actions/streaming.js +++ b/app/javascript/flavours/glitch/actions/streaming.js @@ -1,3 +1,5 @@ +// @ts-check + import { connectStream } from 'flavours/glitch/util/stream'; import { updateTimeline, @@ -19,24 +21,59 @@ import { getLocale } from 'mastodon/locales'; const { messages } = getLocale(); -export function connectTimelineStream (timelineId, path, pollingRefresh = null, accept = null) { +/** + * @param {number} max + * @return {number} + */ +const randomUpTo = max => + Math.floor(Math.random() * Math.floor(max)); - return connectStream (path, pollingRefresh, (dispatch, getState) => { +/** + * @param {string} timelineId + * @param {string} channelName + * @param {Object.} params + * @param {Object} options + * @param {function(Function, Function): void} [options.fallback] + * @param {function(object): boolean} [options.accept] + * @return {function(): void} + */ +export const connectTimelineStream = (timelineId, channelName, params = {}, options = {}) => + connectStream(channelName, params, (dispatch, getState) => { const locale = getState().getIn(['meta', 'locale']); + let pollingId; + + /** + * @param {function(Function, Function): void} fallback + */ + const useFallback = fallback => { + fallback(dispatch, () => { + pollingId = setTimeout(() => useFallback(fallback), 20000 + randomUpTo(20000)); + }); + }; + return { onConnect() { dispatch(connectTimeline(timelineId)); + + if (pollingId) { + clearTimeout(pollingId); + pollingId = null; + } }, onDisconnect() { dispatch(disconnectTimeline(timelineId)); + + if (options.fallback) { + pollingId = setTimeout(() => useFallback(options.fallback), randomUpTo(40000)); + } }, onReceive (data) { switch(data.event) { case 'update': - dispatch(updateTimeline(timelineId, JSON.parse(data.payload), accept)); + dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept)); break; case 'delete': dispatch(deleteFromTimelines(data.payload)); @@ -63,17 +100,60 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null, }, }; }); -} +/** + * @param {Function} dispatch + * @param {function(): void} done + */ const refreshHomeTimelineAndNotification = (dispatch, done) => { dispatch(expandHomeTimeline({}, () => dispatch(expandNotifications({}, () => dispatch(fetchAnnouncements(done)))))); }; -export const connectUserStream = () => connectTimelineStream('home', 'user', refreshHomeTimelineAndNotification); -export const connectCommunityStream = ({ onlyMedia } = {}) => connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); -export const connectPublicStream = ({ onlyMedia, onlyRemote, allowLocalOnly } = {}) => connectTimelineStream(`public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`); -export const connectHashtagStream = (id, tag, local, accept) => connectTimelineStream(`hashtag:${id}${local ? ':local' : ''}`, `hashtag${local ? ':local' : ''}&tag=${tag}`, null, accept); -export const connectDirectStream = () => connectTimelineStream('direct', 'direct'); -export const connectListStream = id => connectTimelineStream(`list:${id}`, `list&list=${id}`); +/** + * @return {function(): void} + */ +export const connectUserStream = () => + connectTimelineStream('home', 'user', {}, { fallback: refreshHomeTimelineAndNotification }); + +/** + * @param {Object} options + * @param {boolean} [options.onlyMedia] + * @return {function(): void} + */ +export const connectCommunityStream = ({ onlyMedia } = {}) => + connectTimelineStream(`community${onlyMedia ? ':media' : ''}`, `public:local${onlyMedia ? ':media' : ''}`); + +/** + * @param {Object} options + * @param {boolean} [options.onlyMedia] + * @param {boolean} [options.onlyRemote] + * @param {boolean} [options.allowLocalOnly] + * @return {function(): void} + */ +export const connectPublicStream = ({ onlyMedia, onlyRemote, allowLocalOnly } = {}) => + connectTimelineStream(`public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`, `public${onlyRemote ? ':remote' : (allowLocalOnly ? ':allow_local_only' : '')}${onlyMedia ? ':media' : ''}`); + +/** + * @param {string} columnId + * @param {string} tagName + * @param {boolean} onlyLocal + * @param {function(object): boolean} accept + * @return {function(): void} + */ +export const connectHashtagStream = (columnId, tagName, onlyLocal, accept) => + connectTimelineStream(`hashtag:${columnId}${onlyLocal ? ':local' : ''}`, `hashtag${onlyLocal ? ':local' : ''}`, { tag: tagName }, { accept }); + +/** + * @return {function(): void} + */ +export const connectDirectStream = () => + connectTimelineStream('direct', 'direct'); + +/** + * @param {string} listId + * @return {function(): void} + */ +export const connectListStream = listId => + connectTimelineStream(`list:${listId}`, 'list', { list: listId }); -- cgit