性能优化与最佳实践
1. 连接管理
class SSEConnectionManager {
constructor() {
this.connections = new Map()
this.maxConnections = 1000
}
addConnection(id, response) {
if (this.connections.size >= this.maxConnections) {
this.cleanupStaleConnections()
}
this.connections.set(id, {
response,
lastPing: Date.now(),
created: Date.now(),
})
}
cleanupStaleConnections() {
const now = Date.now()
const staleThreshold = 5 * 60 * 1000
for (const [id, conn] of this.connections.entries()) {
if (now - conn.lastPing > staleThreshold) {
try {
conn.response.end()
} catch (e) {
console.log('清理连接时出错:', e.message)
}
this.connections.delete(id)
}
}
}
broadcast(message) {
const deadConnections = []
for (const [id, conn] of this.connections.entries()) {
try {
conn.response.write(`data: ${JSON.stringify(message)}\n\n`)
conn.lastPing = Date.now()
} catch (error) {
deadConnections.push(id)
}
}
deadConnections.forEach((id) => this.connections.delete(id))
}
}
2. 消息压缩
const zlib = require('zlib')
function compressMessage(data) {
const jsonString = JSON.stringify(data)
if (jsonString.length > 1024) {
return zlib.gzipSync(jsonString).toString('base64')
}
return jsonString
}
function sendCompressedMessage(res, data) {
const compressed = compressMessage(data)
const isCompressed = compressed !== JSON.stringify(data)
res.write(
`data: ${JSON.stringify({
compressed: isCompressed,
data: compressed,
})}\n\n`
)
}
3. 错误处理与监控
class SSEMonitor {
constructor() {
this.metrics = {
totalConnections: 0,
activeConnections: 0,
messagesSent: 0,
errors: 0,
reconnections: 0,
}
}
recordConnection() {
this.metrics.totalConnections++
this.metrics.activeConnections++
}
recordDisconnection() {
this.metrics.activeConnections = Math.max(
0,
this.metrics.activeConnections - 1
)
}
recordMessage() {
this.metrics.messagesSent++
}
recordError() {
this.metrics.errors++
}
recordReconnection() {
this.metrics.reconnections++
}
getMetrics() {
return {
...this.metrics,
timestamp: new Date().toISOString(),
}
}
}
const monitor = new SSEMonitor()
app.get('/events', (req, res) => {
monitor.recordConnection()
req.on('close', () => {
monitor.recordDisconnection()
})
req.on('error', () => {
monitor.recordError()
monitor.recordDisconnection()
})
})
app.get('/metrics', (req, res) => {
res.json(monitor.getMetrics())
})
安全考虑
1. 认证与授权
const jwt = require('jsonwebtoken')
function authenticateSSE(req, res, next) {
const token = req.query.token || req.headers.authorization?.split(' ')[1]
if (!token) {
return res.status(401).json({error: '缺少认证token'})
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET)
req.user = decoded
next()
} catch (error) {
return res.status(401).json({error: '无效的token'})
}
}
app.get('/secure-events', authenticateSSE, (req, res) => {
console.log(`用户 ${req.user.userId} 连接到安全事件流`)
})
2. 速率限制
const rateLimit = require('express-rate-limit')
const sseRateLimit = rateLimit({
windowMs: 15 * 60 * 1000,
max: 10,
message: '连接过于频繁,请稍后再试',
standardHeaders: true,
legacyHeaders: false,
})
app.get('/events', sseRateLimit, (req, res) => {
})
3. CORS 配置
const cors = require('cors')
const corsOptions = {
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true,
optionsSuccessStatus: 200,
}
app.use(cors(corsOptions))
app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': req.headers.origin || '*',
'Access-Control-Allow-Credentials': 'true',
'Access-Control-Allow-Headers': 'Cache-Control, Last-Event-ID',
})
})
调试与测试
1. SSE 调试工具
function sseDebugger(req, res, next) {
const originalWrite = res.write
res.write = function (chunk, encoding) {
console.log('📤 发送SSE数据:', chunk.toString())
return originalWrite.call(this, chunk, encoding)
}
next()
}
app.use('/events', sseDebugger)
2. 客户端调试
class DebugSSEClient {
constructor(url) {
this.url = url
this.eventSource = null
this.stats = {
connected: false,
messagesReceived: 0,
errors: 0,
reconnects: 0,
lastMessage: null,
}
this.connect()
}
connect() {
console.log('🔗 连接SSE:', this.url)
this.eventSource = new EventSource(this.url)
this.eventSource.onopen = (event) => {
console.log('✅ SSE连接已建立', event)
this.stats.connected = true
this.logStats()
}
this.eventSource.onmessage = (event) => {
console.log('📨 收到消息:', event.data)
this.stats.messagesReceived++
this.stats.lastMessage = {
data: event.data,
timestamp: new Date().toISOString(),
lastEventId: event.lastEventId,
}
this.logStats()
}
this.eventSource.onerror = (event) => {
console.error('❌ SSE连接错误:', event)
this.stats.connected = false
this.stats.errors++
this.stats.reconnects++
this.logStats()
}
}
logStats() {
console.table(this.stats)
}
getStats() {
return this.stats
}
}
const debugClient = new DebugSSEClient('/events')
3. 单元测试示例
const request = require('supertest')
const app = require('../app')
describe('SSE端点测试', () => {
test('应该建立SSE连接', (done) => {
const req = request(app)
.get('/events')
.expect(200)
.expect('Content-Type', /text\/event-stream/)
let messageCount = 0
req.on('data', (chunk) => {
const data = chunk.toString()
if (data.includes('data:')) {
messageCount++
if (messageCount >= 2) {
req.destroy()
done()
}
}
})
req.on('error', done)
})
test('应该支持Last-Event-ID', (done) => {
request(app)
.get('/events')
.set('Last-Event-ID', '123')
.expect(200)
.end((err, res) => {
if (err) return done(err)
done()
})
})
})
生产环境部署
1. Nginx 反向代理配置
server {
listen 80;
server_name example.com;
location /events {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache off;
proxy_buffering off;
proxy_read_timeout 24h;
proxy_send_timeout 24h;
}
}
2. Docker 配置
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
CMD ["node", "server.js"]
3. 集群部署考虑
const redis = require('redis')
const client = redis.createClient()
client.subscribe('sse-broadcast')
client.on('message', (channel, message) => {
if (channel === 'sse-broadcast') {
const data = JSON.parse(message)
broadcastToLocalConnections(data)
}
})
function broadcastToCluster(message) {
client.publish('sse-broadcast', JSON.stringify(message))
}