From 2e112e240666b62b8c3d4fa201fb24b841f6c92b Mon Sep 17 00:00:00 2001 From: Yamagishi Kazutoshi Date: Sun, 21 May 2017 00:31:47 +0900 Subject: Improve eslint rules (#3147) * Add semi to ESLint rules * Add padded-blocks to ESLint rules * Add comma-dangle to ESLint rules * add config/webpack and storyboard * add streaming/ * yarn test:lint -- --fix --- streaming/index.js | 356 ++++++++++++++++++++++++++--------------------------- 1 file changed, 178 insertions(+), 178 deletions(-) (limited to 'streaming/index.js') diff --git a/streaming/index.js b/streaming/index.js index 7f30b4e88..cd1963121 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -1,56 +1,56 @@ import os from 'os'; import cluster from 'cluster'; -import dotenv from 'dotenv' -import express from 'express' -import http from 'http' -import redis from 'redis' -import pg from 'pg' -import log from 'npmlog' -import url from 'url' -import WebSocket from 'uws' -import uuid from 'uuid' - -const env = process.env.NODE_ENV || 'development' +import dotenv from 'dotenv'; +import express from 'express'; +import http from 'http'; +import redis from 'redis'; +import pg from 'pg'; +import log from 'npmlog'; +import url from 'url'; +import WebSocket from 'uws'; +import uuid from 'uuid'; + +const env = process.env.NODE_ENV || 'development'; dotenv.config({ - path: env === 'production' ? '.env.production' : '.env' -}) + path: env === 'production' ? '.env.production' : '.env', +}); const dbUrlToConfig = (dbUrl) => { if (!dbUrl) { - return {} + return {}; } - const params = url.parse(dbUrl) - const config = {} + const params = url.parse(dbUrl); + const config = {}; if (params.auth) { - [config.user, config.password] = params.auth.split(':') + [config.user, config.password] = params.auth.split(':'); } if (params.hostname) { - config.host = params.hostname + config.host = params.hostname; } if (params.port) { - config.port = params.port + config.port = params.port; } if (params.pathname) { - config.database = params.pathname.split('/')[1] + config.database = params.pathname.split('/')[1]; } - const ssl = params.query && params.query.ssl + const ssl = params.query && params.query.ssl; if (ssl) { - config.ssl = ssl === 'true' || ssl === '1' + config.ssl = ssl === 'true' || ssl === '1'; } - return config -} + return config; +}; if (cluster.isMaster) { // Cluster master - const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)) + const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); const fork = () => { const worker = cluster.fork(); @@ -63,14 +63,14 @@ if (cluster.isMaster) { for (let i = 0; i < core; i++) fork(); - log.info(`Starting streaming API server master with ${core} workers`) + log.info(`Starting streaming API server master with ${core} workers`); } else { // Cluster worker const pgConfigs = { development: { database: 'mastodon_development', host: '/var/run/postgresql', - max: 10 + max: 10, }, production: { @@ -79,315 +79,315 @@ if (cluster.isMaster) { database: process.env.DB_NAME || 'mastodon_production', host: process.env.DB_HOST || 'localhost', port: process.env.DB_PORT || 5432, - max: 10 - } - } + max: 10, + }, + }; - const app = express() - const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL))) - const server = http.createServer(app) - const wss = new WebSocket.Server({ server }) - const redisNamespace = process.env.REDIS_NAMESPACE || null + const app = express(); + const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL))); + const server = http.createServer(app); + const wss = new WebSocket.Server({ server }); + const redisNamespace = process.env.REDIS_NAMESPACE || null; const redisParams = { host: process.env.REDIS_HOST || '127.0.0.1', port: process.env.REDIS_PORT || 6379, db: process.env.REDIS_DB || 0, password: process.env.REDIS_PASSWORD, - url: process.env.REDIS_URL || null - } + url: process.env.REDIS_URL || null, + }; if (redisNamespace) { - redisParams.namespace = redisNamespace + redisParams.namespace = redisNamespace; } - const redisPrefix = redisNamespace ? `${redisNamespace}:` : '' + const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; - const redisClient = redis.createClient(redisParams) + const redisClient = redis.createClient(redisParams); - const subs = {} + const subs = {}; redisClient.on('pmessage', (_, channel, message) => { - const callbacks = subs[channel] + const callbacks = subs[channel]; - log.silly(`New message on channel ${channel}`) + log.silly(`New message on channel ${channel}`); if (!callbacks) { - return + return; } - callbacks.forEach(callback => callback(message)) - }) + callbacks.forEach(callback => callback(message)); + }); - redisClient.psubscribe(`${redisPrefix}timeline:*`) + redisClient.psubscribe(`${redisPrefix}timeline:*`); const subscribe = (channel, callback) => { - log.silly(`Adding listener for ${channel}`) - subs[channel] = subs[channel] || [] - subs[channel].push(callback) - } + log.silly(`Adding listener for ${channel}`); + subs[channel] = subs[channel] || []; + subs[channel].push(callback); + }; const unsubscribe = (channel, callback) => { - log.silly(`Removing listener for ${channel}`) - subs[channel] = subs[channel].filter(item => item !== callback) - } + log.silly(`Removing listener for ${channel}`); + subs[channel] = subs[channel].filter(item => item !== callback); + }; const allowCrossDomain = (req, res, next) => { - res.header('Access-Control-Allow-Origin', '*') - res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control') - res.header('Access-Control-Allow-Methods', 'GET, OPTIONS') + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control'); + res.header('Access-Control-Allow-Methods', 'GET, OPTIONS'); - next() - } + next(); + }; const setRequestId = (req, res, next) => { - req.requestId = uuid.v4() - res.header('X-Request-Id', req.requestId) + req.requestId = uuid.v4(); + res.header('X-Request-Id', req.requestId); - next() - } + next(); + }; const accountFromToken = (token, req, next) => { pgPool.connect((err, client, done) => { if (err) { - next(err) - return + next(err); + return; } client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 LIMIT 1', [token], (err, result) => { - done() + done(); if (err) { - next(err) - return + next(err); + return; } if (result.rows.length === 0) { - err = new Error('Invalid access token') - err.statusCode = 401 + err = new Error('Invalid access token'); + err.statusCode = 401; - next(err) - return + next(err); + return; } - req.accountId = result.rows[0].account_id + req.accountId = result.rows[0].account_id; - next() - }) - }) - } + next(); + }); + }); + }; const authenticationMiddleware = (req, res, next) => { if (req.method === 'OPTIONS') { - next() - return + next(); + return; } - const authorization = req.get('Authorization') + const authorization = req.get('Authorization'); if (!authorization) { - const err = new Error('Missing access token') - err.statusCode = 401 + const err = new Error('Missing access token'); + err.statusCode = 401; - next(err) - return + next(err); + return; } - const token = authorization.replace(/^Bearer /, '') + const token = authorization.replace(/^Bearer /, ''); - accountFromToken(token, req, next) - } + accountFromToken(token, req, next); + }; const errorMiddleware = (err, req, res, next) => { - log.error(req.requestId, err) - res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) - } + log.error(req.requestId, err); + res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })); + }; const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { - log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) + log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`); const listener = message => { - const { event, payload, queued_at } = JSON.parse(message) + const { event, payload, queued_at } = JSON.parse(message); const transmit = () => { - const now = new Date().getTime() + const now = new Date().getTime(); const delta = now - queued_at; - log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) - output(event, payload) - } + log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`); + output(event, payload); + }; // Only messages that may require filtering are statuses, since notifications // are already personalized and deletes do not matter if (needsFiltering && event === 'update') { pgPool.connect((err, client, done) => { if (err) { - log.error(err) - return + log.error(err); + return; } - const unpackedPayload = JSON.parse(payload) - const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []) - const accountDomain = unpackedPayload.account.acct.split('@')[1] + const unpackedPayload = JSON.parse(payload); + const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []); + const accountDomain = unpackedPayload.account.acct.split('@')[1]; const queries = [ client.query(`SELECT 1 FROM blocks WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 1)}) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 1)})`, [req.accountId].concat(targetAccountIds)), - ] + ]; if (accountDomain) { - queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])) + queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); } Promise.all(queries).then(values => { - done() + done(); if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) { - return + return; } - transmit() + transmit(); }).catch(err => { - log.error(err) - }) - }) + log.error(err); + }); + }); } else { - transmit() + transmit(); } - } + }; - subscribe(`${redisPrefix}${id}`, listener) - attachCloseHandler(`${redisPrefix}${id}`, listener) - } + subscribe(`${redisPrefix}${id}`, listener); + attachCloseHandler(`${redisPrefix}${id}`, listener); + }; // Setup stream output to HTTP const streamToHttp = (req, res) => { - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Transfer-Encoding', 'chunked') + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Transfer-Encoding', 'chunked'); - const heartbeat = setInterval(() => res.write(':thump\n'), 15000) + const heartbeat = setInterval(() => res.write(':thump\n'), 15000); req.on('close', () => { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`) - clearInterval(heartbeat) - }) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`); + clearInterval(heartbeat); + }); return (event, payload) => { - res.write(`event: ${event}\n`) - res.write(`data: ${payload}\n\n`) - } - } + res.write(`event: ${event}\n`); + res.write(`data: ${payload}\n\n`); + }; + }; // Setup stream end for HTTP const streamHttpEnd = req => (id, listener) => { req.on('close', () => { - unsubscribe(id, listener) - }) - } + unsubscribe(id, listener); + }); + }; // Setup stream output to WebSockets const streamToWs = (req, ws) => { const heartbeat = setInterval(() => { // TODO: Can't add multiple listeners, due to the limitation of uws. if (ws.readyState !== ws.OPEN) { - log.verbose(req.requestId, `Ending stream for ${req.accountId}`) - clearInterval(heartbeat) - return + log.verbose(req.requestId, `Ending stream for ${req.accountId}`); + clearInterval(heartbeat); + return; } - ws.ping() - }, 15000) + ws.ping(); + }, 15000); return (event, payload) => { if (ws.readyState !== ws.OPEN) { - log.error(req.requestId, 'Tried writing to closed socket') - return + log.error(req.requestId, 'Tried writing to closed socket'); + return; } - ws.send(JSON.stringify({ event, payload })) - } - } + ws.send(JSON.stringify({ event, payload })); + }; + }; // Setup stream end for WebSockets const streamWsEnd = ws => (id, listener) => { ws.on('close', () => { - unsubscribe(id, listener) - }) + unsubscribe(id, listener); + }); ws.on('error', e => { - unsubscribe(id, listener) - }) - } + unsubscribe(id, listener); + }); + }; - app.use(setRequestId) - app.use(allowCrossDomain) - app.use(authenticationMiddleware) - app.use(errorMiddleware) + app.use(setRequestId); + app.use(allowCrossDomain); + app.use(authenticationMiddleware); + app.use(errorMiddleware); app.get('/api/v1/streaming/user', (req, res) => { - streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)) - }) + streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)); + }); app.get('/api/v1/streaming/public', (req, res) => { - streamFrom('timeline:public', req, streamToHttp(req, res), streamHttpEnd(req), true) - }) + streamFrom('timeline:public', req, streamToHttp(req, res), streamHttpEnd(req), true); + }); app.get('/api/v1/streaming/public/local', (req, res) => { - streamFrom('timeline:public:local', req, streamToHttp(req, res), streamHttpEnd(req), true) - }) + streamFrom('timeline:public:local', req, streamToHttp(req, res), streamHttpEnd(req), true); + }); app.get('/api/v1/streaming/hashtag', (req, res) => { - streamFrom(`timeline:hashtag:${req.query.tag}`, req, streamToHttp(req, res), streamHttpEnd(req), true) - }) + streamFrom(`timeline:hashtag:${req.query.tag}`, req, streamToHttp(req, res), streamHttpEnd(req), true); + }); app.get('/api/v1/streaming/hashtag/local', (req, res) => { - streamFrom(`timeline:hashtag:${req.query.tag}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true) - }) + streamFrom(`timeline:hashtag:${req.query.tag}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true); + }); wss.on('connection', ws => { - const location = url.parse(ws.upgradeReq.url, true) - const token = location.query.access_token - const req = { requestId: uuid.v4() } + const location = url.parse(ws.upgradeReq.url, true); + const token = location.query.access_token; + const req = { requestId: uuid.v4() }; accountFromToken(token, req, err => { if (err) { - log.error(req.requestId, err) - ws.close() - return + log.error(req.requestId, err); + ws.close(); + return; } switch(location.query.stream) { case 'user': - streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws)) + streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws)); break; case 'public': - streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(ws), true) + streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(ws), true); break; case 'public:local': - streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(ws), true) + streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(ws), true); break; case 'hashtag': - streamFrom(`timeline:hashtag:${location.query.tag}`, req, streamToWs(req, ws), streamWsEnd(ws), true) + streamFrom(`timeline:hashtag:${location.query.tag}`, req, streamToWs(req, ws), streamWsEnd(ws), true); break; case 'hashtag:local': - streamFrom(`timeline:hashtag:${location.query.tag}:local`, req, streamToWs(req, ws), streamWsEnd(ws), true) + streamFrom(`timeline:hashtag:${location.query.tag}:local`, req, streamToWs(req, ws), streamWsEnd(ws), true); break; default: - ws.close() + ws.close(); } - }) - }) + }); + }); server.listen(process.env.PORT || 4000, () => { - log.level = process.env.LOG_LEVEL || 'verbose' - log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`) - }) + log.level = process.env.LOG_LEVEL || 'verbose'; + log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`); + }); - process.on('SIGINT', exit) - process.on('SIGTERM', exit) - process.on('exit', exit) + process.on('SIGINT', exit); + process.on('SIGTERM', exit); + process.on('exit', exit); function exit() { - server.close() + server.close(); } } -- cgit