From bc4fa6b198557a7f3989eb0865e2c77ac7451d29 Mon Sep 17 00:00:00 2001 From: kibigo! Date: Sun, 3 Dec 2017 23:26:40 -0800 Subject: Rename themes -> flavours ? ? --- app/javascript/flavours/glitch/util/stream.js | 73 +++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 app/javascript/flavours/glitch/util/stream.js (limited to 'app/javascript/flavours/glitch/util/stream.js') diff --git a/app/javascript/flavours/glitch/util/stream.js b/app/javascript/flavours/glitch/util/stream.js new file mode 100644 index 000000000..36c68ffc5 --- /dev/null +++ b/app/javascript/flavours/glitch/util/stream.js @@ -0,0 +1,73 @@ +import WebSocketClient from 'websocket.js'; + +export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { + return (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + let polling = null; + + const setupPolling = () => { + polling = setInterval(() => { + pollingRefresh(dispatch); + }, 20000); + }; + + const clearPolling = () => { + if (polling) { + clearInterval(polling); + polling = null; + } + }; + + const subscription = getStream(streamingAPIBaseURL, accessToken, path, { + connected () { + if (pollingRefresh) { + clearPolling(); + } + onConnect(); + }, + + disconnected () { + if (pollingRefresh) { + setupPolling(); + } + onDisconnect(); + }, + + received (data) { + onReceive(data); + }, + + reconnected () { + if (pollingRefresh) { + clearPolling(); + pollingRefresh(dispatch); + } + onConnect(); + }, + + }); + + const disconnect = () => { + if (subscription) { + subscription.close(); + } + clearPolling(); + }; + + return disconnect; + }; +} + + +export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { + const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`); + + ws.onopen = connected; + ws.onmessage = e => received(JSON.parse(e.data)); + ws.onclose = disconnected; + ws.onreconnect = reconnected; + + return ws; +}; -- cgit