From cfea28216ffaec9c28ba2f57de868ada482c1779 Mon Sep 17 00:00:00 2001 From: nullkal Date: Tue, 12 Dec 2017 23:13:24 +0900 Subject: make it possible to stream public timelines without authorization (#5977) * make it possible to stream public timelines without authorization * Fix * Make eslint allow `value == null` * Remove redundant line * Improve style and revert .eslintrc.yml * Fix streamWsEnd * Show IP address instead of (anonymous user) * Add missing semicolon --- streaming/index.js | 97 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 27 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index c79a58671..31c597cf0 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -97,6 +97,8 @@ const startWorker = (workerId) => { }; const app = express(); + app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal'); + const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL))); const server = http.createServer(app); const redisNamespace = process.env.REDIS_NAMESPACE || null; @@ -177,6 +179,12 @@ const startWorker = (workerId) => { next(); }; + const setRemoteAddress = (req, res, next) => { + req.remoteAddress = req.connection.remoteAddress; + + next(); + }; + const accountFromToken = (token, req, next) => { pgPool.connect((err, client, done) => { if (err) { @@ -208,17 +216,22 @@ const startWorker = (workerId) => { }); }; - const accountFromRequest = (req, next) => { + const accountFromRequest = (req, next, required = true) => { const authorization = req.headers.authorization; const location = url.parse(req.url, true); const accessToken = location.query.access_token; if (!authorization && !accessToken) { - const err = new Error('Missing access token'); - err.statusCode = 401; + if (required) { + const err = new Error('Missing access token'); + err.statusCode = 401; - next(err); - return; + next(err); + return; + } else { + next(); + return; + } } const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken; @@ -226,7 +239,17 @@ const startWorker = (workerId) => { accountFromToken(token, req, next); }; + const PUBLIC_STREAMS = [ + 'public', + 'public:local', + 'hashtag', + 'hashtag:local', + ]; + const wsVerifyClient = (info, cb) => { + const location = url.parse(info.req.url, true); + const authRequired = !PUBLIC_STREAMS.some(stream => stream === location.query.stream); + accountFromRequest(info.req, err => { if (!err) { cb(true, undefined, undefined); @@ -234,16 +257,24 @@ const startWorker = (workerId) => { log.error(info.req.requestId, err.toString()); cb(false, 401, 'Unauthorized'); } - }); + }, authRequired); }; + const PUBLIC_ENDPOINTS = [ + '/api/v1/streaming/public', + '/api/v1/streaming/public/local', + '/api/v1/streaming/hashtag', + '/api/v1/streaming/hashtag/local', + ]; + const authenticationMiddleware = (req, res, next) => { if (req.method === 'OPTIONS') { next(); return; } - accountFromRequest(req, next); + const authRequired = !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path); + accountFromRequest(req, next, authRequired); }; const errorMiddleware = (err, req, res, {}) => { @@ -275,8 +306,10 @@ const startWorker = (workerId) => { }; const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { + const accountId = req.accountId || req.remoteAddress; + const streamType = notificationOnly ? ' (notification)' : ''; - log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`); + log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`); const listener = message => { const { event, payload, queued_at } = JSON.parse(message); @@ -286,7 +319,7 @@ const startWorker = (workerId) => { const delta = now - queued_at; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; - log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); + log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`); output(event, encodedPayload); }; @@ -313,26 +346,30 @@ const startWorker = (workerId) => { return; } - const queries = [ - client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), - ]; + if (!req.accountId) { + const queries = [ + client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), + ]; - if (accountDomain) { - queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); - } + if (accountDomain) { + queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); + } - Promise.all(queries).then(values => { - done(); + Promise.all(queries).then(values => { + done(); - if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) { - return; - } + if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) { + return; + } + transmit(); + }).catch(err => { + done(); + log.error(err); + }); + } else { transmit(); - }).catch(err => { - done(); - log.error(err); - }); + } }); } else { transmit(); @@ -345,13 +382,15 @@ const startWorker = (workerId) => { // Setup stream output to HTTP const streamToHttp = (req, res) => { + const accountId = req.accountId || req.remoteAddress; + res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Transfer-Encoding', 'chunked'); const heartbeat = setInterval(() => res.write(':thump\n'), 15000); req.on('close', () => { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`); + log.verbose(req.requestId, `Ending stream for ${accountId}`); clearInterval(heartbeat); }); @@ -383,8 +422,10 @@ const startWorker = (workerId) => { // Setup stream end for WebSockets const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => { + const accountId = req.accountId || req.remoteAddress; + ws.on('close', () => { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`); + log.verbose(req.requestId, `Ending stream for ${accountId}`); unsubscribe(id, listener); if (closeHandler) { closeHandler(); @@ -392,7 +433,7 @@ const startWorker = (workerId) => { }); ws.on('error', () => { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`); + log.verbose(req.requestId, `Ending stream for ${accountId}`); unsubscribe(id, listener); if (closeHandler) { closeHandler(); @@ -401,6 +442,7 @@ const startWorker = (workerId) => { }; app.use(setRequestId); + app.use(setRemoteAddress); app.use(allowCrossDomain); app.use(authenticationMiddleware); app.use(errorMiddleware); @@ -451,6 +493,7 @@ const startWorker = (workerId) => { const req = ws.upgradeReq; const location = url.parse(req.url, true); req.requestId = uuid.v4(); + req.remoteAddress = ws._socket.remoteAddress; ws.isAlive = true; -- cgit From c986218c3a98564e38d68689150b33a6aa6c4b3a Mon Sep 17 00:00:00 2001 From: erin Date: Tue, 12 Dec 2017 13:19:33 -0600 Subject: Improve error handling in streaming/index.js (#5968) On an unhandled worker exception, we should log the exception and exit with nonzero status, instead of letting workers silently fail and restarting them in an endless loop. Note: we previously tried to handle the `'error'` signal. That's not a signal Node fires; my patch traps `'uncaughtException'`, which is what the code was _trying_ to do. --- streaming/index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 31c597cf0..198eac1ae 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -563,12 +563,14 @@ const startWorker = (workerId) => { const onError = (err) => { log.error(err); + server.close(); + process.exit(0); }; process.on('SIGINT', onExit); process.on('SIGTERM', onExit); process.on('exit', onExit); - process.on('error', onError); + process.on('uncaughtException', onError); }; throng({ -- cgit From 90e7da16a013e7d9024adbbf484123959f83ef01 Mon Sep 17 00:00:00 2001 From: nullkal Date: Wed, 13 Dec 2017 21:42:16 +0900 Subject: Fix the condition in streaming listener (#6008) --- streaming/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 198eac1ae..303e37ab4 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -346,7 +346,7 @@ const startWorker = (workerId) => { return; } - if (!req.accountId) { + if (req.accountId) { const queries = [ client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)), ]; -- cgit From ccf4f170de86f24d5a243c411b807f1c394723cf Mon Sep 17 00:00:00 2001 From: nullkal Date: Wed, 13 Dec 2017 22:27:36 +0900 Subject: Make sure call `done();` in the listener of public timeline for anonymous connection (#6009) --- streaming/index.js | 1 + 1 file changed, 1 insertion(+) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 303e37ab4..a42a91242 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -368,6 +368,7 @@ const startWorker = (workerId) => { log.error(err); }); } else { + done(); transmit(); } }); -- cgit