性能优化与最佳实践

1. 连接管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 连接池管理
class SSEConnectionManager {
constructor() {
this.connections = new Map()
this.maxConnections = 1000
}

addConnection(id, response) {
// 检查连接数限制
if (this.connections.size >= this.maxConnections) {
this.cleanupStaleConnections()
}

this.connections.set(id, {
response,
lastPing: Date.now(),
created: Date.now(),
})
}

cleanupStaleConnections() {
const now = Date.now()
const staleThreshold = 5 * 60 * 1000 // 5分钟

for (const [id, conn] of this.connections.entries()) {
if (now - conn.lastPing > staleThreshold) {
try {
conn.response.end()
} catch (e) {
console.log('清理连接时出错:', e.message)
}
this.connections.delete(id)
}
}
}

broadcast(message) {
const deadConnections = []

for (const [id, conn] of this.connections.entries()) {
try {
conn.response.write(`data: ${JSON.stringify(message)}\n\n`)
conn.lastPing = Date.now()
} catch (error) {
deadConnections.push(id)
}
}

// 清理失效连接
deadConnections.forEach((id) => this.connections.delete(id))
}
}

2. 消息压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const zlib = require('zlib')

function compressMessage(data) {
const jsonString = JSON.stringify(data)

// 对于大消息启用压缩
if (jsonString.length > 1024) {
return zlib.gzipSync(jsonString).toString('base64')
}

return jsonString
}

// 发送压缩消息
function sendCompressedMessage(res, data) {
const compressed = compressMessage(data)
const isCompressed = compressed !== JSON.stringify(data)

res.write(
`data: ${JSON.stringify({
compressed: isCompressed,
data: compressed,
})}\n\n`
)
}

3. 错误处理与监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class SSEMonitor {
constructor() {
this.metrics = {
totalConnections: 0,
activeConnections: 0,
messagesSent: 0,
errors: 0,
reconnections: 0,
}
}

recordConnection() {
this.metrics.totalConnections++
this.metrics.activeConnections++
}

recordDisconnection() {
this.metrics.activeConnections = Math.max(
0,
this.metrics.activeConnections - 1
)
}

recordMessage() {
this.metrics.messagesSent++
}

recordError() {
this.metrics.errors++
}

recordReconnection() {
this.metrics.reconnections++
}

getMetrics() {
return {
...this.metrics,
timestamp: new Date().toISOString(),
}
}
}

const monitor = new SSEMonitor()

// 在SSE端点中使用
app.get('/events', (req, res) => {
monitor.recordConnection()

// ... SSE逻辑

req.on('close', () => {
monitor.recordDisconnection()
})

req.on('error', () => {
monitor.recordError()
monitor.recordDisconnection()
})
})

// 监控端点
app.get('/metrics', (req, res) => {
res.json(monitor.getMetrics())
})

安全考虑

1. 认证与授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const jwt = require('jsonwebtoken')

function authenticateSSE(req, res, next) {
const token = req.query.token || req.headers.authorization?.split(' ')[1]

if (!token) {
return res.status(401).json({error: '缺少认证token'})
}

try {
const decoded = jwt.verify(token, process.env.JWT_SECRET)
req.user = decoded
next()
} catch (error) {
return res.status(401).json({error: '无效的token'})
}
}

// 受保护的SSE端点
app.get('/secure-events', authenticateSSE, (req, res) => {
// ... SSE逻辑
console.log(`用户 ${req.user.userId} 连接到安全事件流`)
})

2. 速率限制

1
2
3
4
5
6
7
8
9
10
11
12
13
const rateLimit = require('express-rate-limit')

const sseRateLimit = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 10, // 每个IP最多10个连接
message: '连接过于频繁,请稍后再试',
standardHeaders: true,
legacyHeaders: false,
})

app.get('/events', sseRateLimit, (req, res) => {
// ... SSE逻辑
})

3. CORS 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const cors = require('cors')

const corsOptions = {
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true,
optionsSuccessStatus: 200,
}

app.use(cors(corsOptions))

// 或者在SSE端点中手动设置
app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': req.headers.origin || '*',
'Access-Control-Allow-Credentials': 'true',
'Access-Control-Allow-Headers': 'Cache-Control, Last-Event-ID',
})

// ... 其他逻辑
})

调试与测试

1. SSE 调试工具

1
2
3
4
5
6
7
8
9
10
11
12
13
// 调试中间件
function sseDebugger(req, res, next) {
const originalWrite = res.write

res.write = function (chunk, encoding) {
console.log('📤 发送SSE数据:', chunk.toString())
return originalWrite.call(this, chunk, encoding)
}

next()
}

app.use('/events', sseDebugger)

2. 客户端调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class DebugSSEClient {
constructor(url) {
this.url = url
this.eventSource = null
this.stats = {
connected: false,
messagesReceived: 0,
errors: 0,
reconnects: 0,
lastMessage: null,
}

this.connect()
}

connect() {
console.log('🔗 连接SSE:', this.url)
this.eventSource = new EventSource(this.url)

this.eventSource.onopen = (event) => {
console.log('✅ SSE连接已建立', event)
this.stats.connected = true
this.logStats()
}

this.eventSource.onmessage = (event) => {
console.log('📨 收到消息:', event.data)
this.stats.messagesReceived++
this.stats.lastMessage = {
data: event.data,
timestamp: new Date().toISOString(),
lastEventId: event.lastEventId,
}
this.logStats()
}

this.eventSource.onerror = (event) => {
console.error('❌ SSE连接错误:', event)
this.stats.connected = false
this.stats.errors++
this.stats.reconnects++
this.logStats()
}
}

logStats() {
console.table(this.stats)
}

getStats() {
return this.stats
}
}

// 使用
const debugClient = new DebugSSEClient('/events')

3. 单元测试示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const request = require('supertest')
const app = require('../app') // 你的Express应用

describe('SSE端点测试', () => {
test('应该建立SSE连接', (done) => {
const req = request(app)
.get('/events')
.expect(200)
.expect('Content-Type', /text\/event-stream/)

let messageCount = 0

req.on('data', (chunk) => {
const data = chunk.toString()
if (data.includes('data:')) {
messageCount++
if (messageCount >= 2) {
req.destroy()
done()
}
}
})

req.on('error', done)
})

test('应该支持Last-Event-ID', (done) => {
request(app)
.get('/events')
.set('Last-Event-ID', '123')
.expect(200)
.end((err, res) => {
if (err) return done(err)
// 验证响应包含续传消息
done()
})
})
})

生产环境部署

1. Nginx 反向代理配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server {
listen 80;
server_name example.com;

location /events {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;

# SSE特定配置
proxy_cache off;
proxy_buffering off;
proxy_read_timeout 24h;
proxy_send_timeout 24h;
}
}

2. Docker 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY . .

EXPOSE 3000

# 设置健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1

CMD ["node", "server.js"]

3. 集群部署考虑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 使用Redis进行跨实例消息广播
const redis = require('redis')
const client = redis.createClient()

// 订阅消息
client.subscribe('sse-broadcast')

client.on('message', (channel, message) => {
if (channel === 'sse-broadcast') {
const data = JSON.parse(message)
// 广播给当前实例的所有连接
broadcastToLocalConnections(data)
}
})

// 发布消息到所有实例
function broadcastToCluster(message) {
client.publish('sse-broadcast', JSON.stringify(message))
}