From 1ee4a17f3792669d3f03ddcb060aa48b622eca61 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 2 Feb 2017 13:56:14 +0100 Subject: Add logging and filtering to the node.js streaming API --- streaming/index.js | 55 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 9 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 70067d2f6..945e287f5 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -2,6 +2,7 @@ import dotenv from 'dotenv' import express from 'express' import redis from 'redis' import pg from 'pg' +import log from 'npmlog' dotenv.config() @@ -40,6 +41,7 @@ const authenticationMiddleware = (req, res, next) => { pgPool.connect((err, client, done) => { if (err) { + log.error(err) return next(err) } @@ -47,6 +49,7 @@ const authenticationMiddleware = (req, res, next) => { done() if (err) { + log.error(err) return next(err) } @@ -66,10 +69,12 @@ const authenticationMiddleware = (req, res, next) => { const errorMiddleware = (err, req, res, next) => { res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: `${err}` })) + res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' })) } -const streamFrom = (id, res) => { +const streamFrom = (id, req, res, needsFiltering = false) => { + log.verbose(`Starting stream from ${id} for ${req.accountId}`) + res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Transfer-Encoding', 'chunked') @@ -78,11 +83,40 @@ const streamFrom = (id, res) => { redisClient.on('message', (channel, message) => { const { event, payload } = JSON.parse(message) - res.write(`event: ${event}\n`) - res.write(`data: ${payload}\n\n`) + if (needsFiltering) { + pgPool.connect((err, client, done) => { + if (err) { + log.error(err) + return + } + + const unpackedPayload = JSON.parse(payload) + const targetAccountIds = [unpackedPayload.account.id] + unpackedPayload.mentions.map(item => item.id) + (unpackedPayload.reblog ? unpackedPayload.reblog.account.id : []) + + client.query('SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ($2)', [req.accountId, targetAccountIds], (err, result) => { + done() + + if (err) { + log.error(err) + return + } + + if (result.rows.length > 0) { + return + } + + res.write(`event: ${event}\n`) + res.write(`data: ${payload}\n\n`) + }) + }) + } else { + res.write(`event: ${event}\n`) + res.write(`data: ${payload}\n\n`) + } }) - setInterval(() => res.write('\n'), 15000) + // Heartbeat to keep connection alive + setInterval(() => res.write(':thump\n'), 15000) redisClient.subscribe(id) } @@ -90,8 +124,11 @@ const streamFrom = (id, res) => { app.use(authenticationMiddleware) app.use(errorMiddleware) -app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, res)) -app.get('/api/v1/streaming/public', (_, res) => streamFrom('timeline:public', res)) -app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, res)) +app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, req, res)) +app.get('/api/v1/streaming/public', (req, res) => streamFrom('timeline:public', req, res, true)) +app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true)) + +log.level = 'verbose' +log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`) -app.listen(4000) +app.listen(process.env.PORT || 4000) -- cgit