diff options
Diffstat (limited to 'app/javascript/flavours/glitch/util/stream.js')
-rw-r--r-- | app/javascript/flavours/glitch/util/stream.js | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/app/javascript/flavours/glitch/util/stream.js b/app/javascript/flavours/glitch/util/stream.js new file mode 100644 index 000000000..0cb2b228f --- /dev/null +++ b/app/javascript/flavours/glitch/util/stream.js @@ -0,0 +1,119 @@ +import WebSocketClient from '@gamestdio/websocket'; + +const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); + +const knownEventTypes = [ + 'update', + 'delete', + 'notification', + 'conversation', + 'filters_changed', +]; + +export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { + return (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + + let polling = null; + + const setupPolling = () => { + pollingRefresh(dispatch, () => { + polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); + }); + }; + + const clearPolling = () => { + if (polling) { + clearTimeout(polling); + polling = null; + } + }; + + const subscription = getStream(streamingAPIBaseURL, accessToken, path, { + connected () { + if (pollingRefresh) { + clearPolling(); + } + + onConnect(); + }, + + disconnected () { + if (pollingRefresh) { + polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); + } + + onDisconnect(); + }, + + received (data) { + onReceive(data); + }, + + reconnected () { + if (pollingRefresh) { + clearPolling(); + pollingRefresh(dispatch); + } + + onConnect(); + }, + + }); + + const disconnect = () => { + if (subscription) { + subscription.close(); + } + + clearPolling(); + }; + + return disconnect; + }; +} + + +export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { + const params = stream.split('&'); + stream = params.shift(); + + if (streamingAPIBaseURL.startsWith('ws')) { + params.unshift(`stream=${stream}`); + const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); + + ws.onopen = connected; + ws.onmessage = e => received(JSON.parse(e.data)); + ws.onclose = disconnected; + ws.onreconnect = reconnected; + + return ws; + } + + stream = stream.replace(/:/g, '/'); + params.push(`access_token=${accessToken}`); + const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`); + + let firstConnect = true; + es.onopen = () => { + if (firstConnect) { + firstConnect = false; + connected(); + } else { + reconnected(); + } + }; + for (let type of knownEventTypes) { + es.addEventListener(type, (e) => { + received({ + event: e.type, + payload: e.data, + }); + }); + } + es.onerror = disconnected; + + return es; +}; |