diff options
Diffstat (limited to 'app/javascript/flavours/glitch/util/stream.js')
-rw-r--r-- | app/javascript/flavours/glitch/util/stream.js | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/app/javascript/flavours/glitch/util/stream.js b/app/javascript/flavours/glitch/util/stream.js new file mode 100644 index 000000000..c4642344f --- /dev/null +++ b/app/javascript/flavours/glitch/util/stream.js @@ -0,0 +1,82 @@ +import WebSocketClient from 'websocket.js'; + +const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max)); + +export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { + return (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + + let polling = null; + + const setupPolling = () => { + pollingRefresh(dispatch, () => { + polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000)); + }); + }; + + const clearPolling = () => { + if (polling) { + clearTimeout(polling); + polling = null; + } + }; + + const subscription = getStream(streamingAPIBaseURL, accessToken, path, { + connected () { + if (pollingRefresh) { + clearPolling(); + } + + onConnect(); + }, + + disconnected () { + if (pollingRefresh) { + polling = setTimeout(() => setupPolling(), randomIntUpTo(40000)); + } + + onDisconnect(); + }, + + received (data) { + onReceive(data); + }, + + reconnected () { + if (pollingRefresh) { + clearPolling(); + pollingRefresh(dispatch); + } + + onConnect(); + }, + + }); + + const disconnect = () => { + if (subscription) { + subscription.close(); + } + + clearPolling(); + }; + + return disconnect; + }; +} + + +export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { + const params = [ `stream=${stream}` ]; + + const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); + + ws.onopen = connected; + ws.onmessage = e => received(JSON.parse(e.data)); + ws.onclose = disconnected; + ws.onreconnect = reconnected; + + return ws; +}; |