返回博客列表
技术架构

MQTT + WebSocket 实时通信技术实战:从船舶监控到不动产物联网

深度分享MQTT协议在船舶监控、智慧渔业等项目中的实战应用,包括连接管理、断线重连、消息队列优化等核心技术,为不动产数字化平台提供技术参考

吴志萍
2025年1月15日
18分钟
#MQTT #WebSocket #实时通信 #物联网 #性能优化 #船舶监控

MQTT + WebSocket 实时通信技术实战:从船舶监控到不动产物联网

🌐 实时通信的业务价值

在我负责的多个项目中,实时通信都是核心技术要求:

  • 船舶AMS系统: 1000+测点秒级数据监控
  • 智慧渔业平台: 渔船位置实时追踪
  • 智慧校园系统: 考勤设备实时状态同步

这些经验为不动产数字化平台的物联网应用奠定了技术基础。

🚀 MQTT 协议深度实践

1. 船舶监控系统 MQTT 架构

/**
 * 船舶设备 MQTT 通信管理器
 * 支持1000+设备测点的高并发实时通信
 */
class ShipMqttManager {
  constructor(config) {
    this.config = {
      brokerUrl: config.brokerUrl || 'ws://localhost:8083/mqtt',
      clientId: config.clientId || `ship_client_${Date.now()}`,
      keepalive: 60,
      reconnectPeriod: 1000,
      connectTimeout: 30000,
      ...config
    }

    this.client = null
    this.isConnected = false
    this.subscriptions = new Map()
    this.messageHandlers = new Map()
    this.deviceStates = new Map()
    this.heartbeatInterval = null

    this.setupConnectionHandlers()
  }

  // 建立MQTT连接
  async connect() {
    return new Promise((resolve, reject) => {
      try {
        // 引入mqtt.js库
        this.client = mqtt.connect(this.config.brokerUrl, {
          clientId: this.config.clientId,
          keepalive: this.config.keepalive,
          reschedulePings: true,
          protocolId: 'MQTT',
          protocolVersion: 4,
          reconnectPeriod: this.config.reconnectPeriod,
          connectTimeout: this.config.connectTimeout,
          clean: true,

          // 遗嘱消息配置
          will: {
            topic: `ship/${this.config.vesselId}/status`,
            payload: JSON.stringify({
              status: 'offline',
              timestamp: Date.now(),
              clientId: this.config.clientId
            }),
            qos: 1,
            retain: true
          }
        })

        // 连接成功处理
        this.client.on('connect', () => {
          console.log('🔗 MQTT连接成功')
          this.isConnected = true
          this.startHeartbeat()
          this.resubscribeAll()
          resolve(this.client)
        })

        // 连接错误处理
        this.client.on('error', (error) => {
          console.error('❌ MQTT连接错误:', error)
          this.isConnected = false
          reject(error)
        })

        // 断线重连处理
        this.client.on('close', () => {
          console.log('🔌 MQTT连接断开')
          this.isConnected = false
          this.stopHeartbeat()
        })

        // 消息接收处理
        this.client.on('message', (topic, message, packet) => {
          this.handleMessage(topic, message, packet)
        })

      } catch (error) {
        reject(error)
      }
    })
  }

  // 设备主题订阅管理
  subscribeDeviceData(vesselId) {
    const deviceTopics = [
      `ship/${vesselId}/main-engine/+/data`,      // 主发电机数据
      `ship/${vesselId}/emergency-gen/+/data`,    // 应急发电机数据
      `ship/${vesselId}/power-system/+/data`,     // 电力系统数据
      `ship/${vesselId}/liquid-level/+/data`,     // 液位系统数据
      `ship/${vesselId}/air-compression/+/data`,  // 空压系统数据
      `ship/${vesselId}/fire-alarm/+/data`,       // 火灾报警数据
      `ship/${vesselId}/cooling-system/+/data`,   // 冷却系统数据
      `ship/${vesselId}/bilge-pump/+/data`        // 舱底泵数据
    ]

    const alertTopics = [
      `ship/${vesselId}/alerts/+`,               // 设备报警
      `ship/${vesselId}/alarms/+`,               // 紧急警报
      `ship/${vesselId}/status/+`                // 状态更新
    ]

    // 订阅所有设备数据主题
    [...deviceTopics, ...alertTopics].forEach(topic => {
      this.subscribe(topic, (message, topicPath) => {
        this.processDeviceMessage(topicPath, message)
      })
    })
  }

  // 消息处理和路由
  processDeviceMessage(topic, message) {
    try {
      const data = JSON.parse(message.toString())
      const topicParts = topic.split('/')

      const deviceInfo = {
        vesselId: topicParts[1],
        system: topicParts[2],
        deviceId: topicParts[3],
        dataType: topicParts[4] || 'data'
      }

      // 数据验证
      if (!this.validateDeviceData(data, deviceInfo)) {
        console.warn('⚠️ 设备数据验证失败:', topic, data)
        return
      }

      // 更新设备状态缓存
      this.updateDeviceState(deviceInfo, data)

      // 触发数据处理回调
      this.triggerDataHandlers(deviceInfo, data)

      // 检查报警条件
      this.checkAlertConditions(deviceInfo, data)

    } catch (error) {
      console.error('💥 设备消息处理失败:', error, topic)
    }
  }

  // 设备数据验证
  validateDeviceData(data, deviceInfo) {
    const requiredFields = ['timestamp', 'value']
    const hasRequiredFields = requiredFields.every(field => data.hasOwnProperty(field))

    if (!hasRequiredFields) return false

    // 时间戳有效性检查
    const now = Date.now()
    const messageAge = now - data.timestamp
    if (messageAge > 300000) { // 5分钟过期
      console.warn('⏰ 设备数据过期:', deviceInfo, messageAge)
      return false
    }

    // 数值范围检查
    if (typeof data.value !== 'number' || isNaN(data.value)) {
      return false
    }

    return true
  }

  // 设备状态缓存管理
  updateDeviceState(deviceInfo, data) {
    const stateKey = `${deviceInfo.system}_${deviceInfo.deviceId}`

    if (!this.deviceStates.has(stateKey)) {
      this.deviceStates.set(stateKey, {
        system: deviceInfo.system,
        deviceId: deviceInfo.deviceId,
        lastUpdate: null,
        values: [],
        alerts: []
      })
    }

    const deviceState = this.deviceStates.get(stateKey)

    // 更新最新数据
    deviceState.lastUpdate = data.timestamp
    deviceState.values.push({
      value: data.value,
      timestamp: data.timestamp,
      unit: data.unit || '',
      quality: data.quality || 'good'
    })

    // 保持最近1000个数据点
    if (deviceState.values.length > 1000) {
      deviceState.values = deviceState.values.slice(-1000)
    }
  }

  // 报警条件检查
  checkAlertConditions(deviceInfo, data) {
    const alertRules = this.getAlertRules(deviceInfo.system, deviceInfo.deviceId)

    alertRules.forEach(rule => {
      const alertResult = this.evaluateAlertRule(rule, data)

      if (alertResult.triggered) {
        this.publishAlert({
          vesselId: deviceInfo.vesselId,
          system: deviceInfo.system,
          deviceId: deviceInfo.deviceId,
          alertType: rule.type,
          severity: rule.severity,
          message: alertResult.message,
          value: data.value,
          threshold: rule.threshold,
          timestamp: Date.now()
        })
      }
    })
  }

  // 发布设备报警
  publishAlert(alertData) {
    const topic = `ship/${alertData.vesselId}/alerts/${alertData.system}`

    this.publish(topic, JSON.stringify(alertData), {
      qos: 1,
      retain: true
    })

    // 记录报警历史
    this.recordAlertHistory(alertData)
  }

  // 心跳机制
  startHeartbeat() {
    this.heartbeatInterval = setInterval(() => {
      if (this.isConnected) {
        const heartbeatData = {
          clientId: this.config.clientId,
          timestamp: Date.now(),
          deviceCount: this.deviceStates.size,
          status: 'online'
        }

        this.publish(
          `ship/${this.config.vesselId}/heartbeat`,
          JSON.stringify(heartbeatData),
          { qos: 0 }
        )
      }
    }, 30000) // 30秒心跳
  }

  stopHeartbeat() {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval)
      this.heartbeatInterval = null
    }
  }
}

2. 智慧渔业实时位置追踪

/**
 * 渔船位置追踪MQTT客户端
 * 支持多船实时位置更新和轨迹记录
 */
class FishingVesselTracker {
  constructor() {
    this.mqttClient = new ShipMqttManager({
      brokerUrl: 'wss://fishery.mqtt.server:8084/mqtt',
      clientId: `fishery_tracker_${Date.now()}`
    })

    this.vesselPositions = new Map()
    this.vesselTrajectories = new Map()
    this.geofences = new Map()

    this.setupPositionTracking()
  }

  // 位置追踪订阅
  setupPositionTracking() {
    // 订阅所有渔船位置更新
    this.mqttClient.subscribe('fishery/vessels/+/position', (message, topic) => {
      this.handlePositionUpdate(topic, message)
    })

    // 订阅渔船状态更新
    this.mqttClient.subscribe('fishery/vessels/+/status', (message, topic) => {
      this.handleStatusUpdate(topic, message)
    })

    // 订阅地理围栏事件
    this.mqttClient.subscribe('fishery/geofence/+/events', (message, topic) => {
      this.handleGeofenceEvent(topic, message)
    })
  }

  // 位置数据处理
  handlePositionUpdate(topic, message) {
    try {
      const vesselId = topic.split('/')[2]
      const positionData = JSON.parse(message.toString())

      // 验证位置数据
      if (!this.validatePositionData(positionData)) {
        return
      }

      // 更新当前位置
      this.updateCurrentPosition(vesselId, positionData)

      // 记录轨迹
      this.recordTrajectory(vesselId, positionData)

      // 检查地理围栏
      this.checkGeofenceViolations(vesselId, positionData)

      // 触发位置更新回调
      this.triggerPositionCallbacks(vesselId, positionData)

    } catch (error) {
      console.error('位置数据处理失败:', error)
    }
  }

  // 位置数据验证
  validatePositionData(data) {
    const { lat, lng, timestamp, speed, heading } = data

    // 基本字段检查
    if (!lat || !lng || !timestamp) return false

    // 坐标范围检查
    if (lat < -90 || lat > 90 || lng < -180 || lng > 180) return false

    // 时间戳有效性检查
    const now = Date.now()
    if (Math.abs(now - timestamp) > 600000) { // 10分钟容忍度
      return false
    }

    // 速度合理性检查
    if (speed && (speed < 0 || speed > 100)) return false

    // 航向角度检查
    if (heading && (heading < 0 || heading >= 360)) return false

    return true
  }

  // 轨迹记录和优化
  recordTrajectory(vesselId, positionData) {
    if (!this.vesselTrajectories.has(vesselId)) {
      this.vesselTrajectories.set(vesselId, [])
    }

    const trajectory = this.vesselTrajectories.get(vesselId)
    const lastPosition = trajectory[trajectory.length - 1]

    // 轨迹点去重和压缩
    if (lastPosition) {
      const distance = this.calculateDistance(
        lastPosition.lat, lastPosition.lng,
        positionData.lat, positionData.lng
      )

      const timeDiff = positionData.timestamp - lastPosition.timestamp

      // 跳过重复或变化很小的点
      if (distance < 10 && timeDiff < 60000) { // 10米内且1分钟内
        return
      }
    }

    // 添加轨迹点
    trajectory.push({
      lat: positionData.lat,
      lng: positionData.lng,
      timestamp: positionData.timestamp,
      speed: positionData.speed || 0,
      heading: positionData.heading || 0
    })

    // 保持最近24小时轨迹
    const cutoffTime = Date.now() - 24 * 60 * 60 * 1000
    const filteredTrajectory = trajectory.filter(point => point.timestamp > cutoffTime)

    this.vesselTrajectories.set(vesselId, filteredTrajectory)
  }

  // 地理围栏检查
  checkGeofenceViolations(vesselId, positionData) {
    this.geofences.forEach((geofence, fenceId) => {
      const isInside = this.isPointInPolygon(
        positionData.lat,
        positionData.lng,
        geofence.coordinates
      )

      const wasInside = this.vesselPositions.get(vesselId)?.insideGeofences?.has(fenceId)

      // 检查围栏状态变化
      if (isInside !== wasInside) {
        this.publishGeofenceEvent({
          vesselId,
          geofenceId: fenceId,
          eventType: isInside ? 'enter' : 'exit',
          position: positionData,
          timestamp: Date.now()
        })
      }

      // 更新围栏状态
      if (!this.vesselPositions.has(vesselId)) {
        this.vesselPositions.set(vesselId, { insideGeofences: new Set() })
      }

      const vesselData = this.vesselPositions.get(vesselId)
      if (isInside) {
        vesselData.insideGeofences.add(fenceId)
      } else {
        vesselData.insideGeofences.delete(fenceId)
      }
    })
  }

  // 发布地理围栏事件
  publishGeofenceEvent(eventData) {
    const topic = `fishery/geofence/${eventData.geofenceId}/events`

    this.mqttClient.publish(topic, JSON.stringify(eventData), {
      qos: 1,
      retain: false
    })

    console.log(`🚨 围栏事件: 渔船${eventData.vesselId} ${eventData.eventType} 围栏${eventData.geofenceId}`)
  }

  // 地理计算工具方法
  calculateDistance(lat1, lng1, lat2, lng2) {
    const R = 6371e3 // 地球半径(米)
    const φ1 = lat1 * Math.PI / 180
    const φ2 = lat2 * Math.PI / 180
    const Δφ = (lat2 - lat1) * Math.PI / 180
    const Δλ = (lng2 - lng1) * Math.PI / 180

    const a = Math.sin(Δφ / 2) * Math.sin(Δφ / 2) +
              Math.cos(φ1) * Math.cos(φ2) *
              Math.sin(Δλ / 2) * Math.sin(Δλ / 2)
    const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))

    return R * c
  }

  isPointInPolygon(lat, lng, polygon) {
    let inside = false
    for (let i = 0, j = polygon.length - 1; i < polygon.length; j = i++) {
      if (((polygon[i][1] > lng) !== (polygon[j][1] > lng)) &&
          (lat < (polygon[j][0] - polygon[i][0]) * (lng - polygon[i][1]) / (polygon[j][1] - polygon[i][1]) + polygon[i][0])) {
        inside = !inside
      }
    }
    return inside
  }
}

3. WebSocket 补充通信方案

/**
 * WebSocket通信管理器
 * 用于浏览器端实时数据展示
 */
class RealtimeWebSocketClient {
  constructor(url) {
    this.url = url
    this.ws = null
    this.isConnected = false
    this.reconnectAttempts = 0
    this.maxReconnectAttempts = 5
    this.messageQueue = []
    this.eventHandlers = new Map()

    this.connect()
  }

  connect() {
    try {
      this.ws = new WebSocket(this.url)

      this.ws.onopen = () => {
        console.log('✅ WebSocket连接成功')
        this.isConnected = true
        this.reconnectAttempts = 0

        // 发送队列中的消息
        this.flushMessageQueue()

        // 发送认证信息
        this.authenticate()
      }

      this.ws.onmessage = (event) => {
        this.handleMessage(event.data)
      }

      this.ws.onclose = (event) => {
        console.log('🔌 WebSocket连接关闭', event.code, event.reason)
        this.isConnected = false
        this.handleReconnect()
      }

      this.ws.onerror = (error) => {
        console.error('❌ WebSocket错误:', error)
        this.isConnected = false
      }

    } catch (error) {
      console.error('WebSocket连接创建失败:', error)
      this.handleReconnect()
    }
  }

  // 断线重连
  handleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)

      console.log(`🔄 ${delay}ms后尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)

      setTimeout(() => {
        this.connect()
      }, delay)
    } else {
      console.error('❌ WebSocket重连失败,已达到最大重试次数')
      this.triggerEvent('reconnect_failed')
    }
  }

  // 消息处理
  handleMessage(data) {
    try {
      const message = JSON.parse(data)
      const { type, payload } = message

      // 根据消息类型分发
      switch (type) {
        case 'vessel_position':
          this.triggerEvent('position_update', payload)
          break

        case 'device_data':
          this.triggerEvent('device_update', payload)
          break

        case 'alert':
          this.triggerEvent('alert_received', payload)
          break

        case 'system_status':
          this.triggerEvent('status_update', payload)
          break

        default:
          console.warn('未知消息类型:', type)
      }

    } catch (error) {
      console.error('消息解析失败:', error, data)
    }
  }

  // 发送消息
  send(message) {
    if (this.isConnected && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message))
    } else {
      // 添加到队列等待连接
      this.messageQueue.push(message)
    }
  }

  // 事件监听注册
  on(event, handler) {
    if (!this.eventHandlers.has(event)) {
      this.eventHandlers.set(event, [])
    }
    this.eventHandlers.get(event).push(handler)
  }

  // 触发事件
  triggerEvent(event, data) {
    if (this.eventHandlers.has(event)) {
      this.eventHandlers.get(event).forEach(handler => {
        try {
          handler(data)
        } catch (error) {
          console.error('事件处理器执行失败:', error)
        }
      })
    }
  }
}

📊 性能优化与监控

消息处理性能优化

// 消息批处理优化
class MessageBatchProcessor {
  constructor(batchSize = 100, flushInterval = 1000) {
    this.batchSize = batchSize
    this.flushInterval = flushInterval
    this.messageBatch = []
    this.flushTimer = null

    this.startBatchProcessor()
  }

  addMessage(message) {
    this.messageBatch.push({
      ...message,
      receivedAt: Date.now()
    })

    if (this.messageBatch.length >= this.batchSize) {
      this.flush()
    }
  }

  flush() {
    if (this.messageBatch.length === 0) return

    const batch = [...this.messageBatch]
    this.messageBatch = []

    // 异步处理批量消息
    requestIdleCallback(() => {
      this.processBatch(batch)
    })
  }

  processBatch(messages) {
    // 按消息类型分组
    const groupedMessages = messages.reduce((groups, message) => {
      const type = message.type || 'default'
      if (!groups[type]) groups[type] = []
      groups[type].push(message)
      return groups
    }, {})

    // 并行处理不同类型的消息
    Object.entries(groupedMessages).forEach(([type, typeMessages]) => {
      this.processMessageType(type, typeMessages)
    })
  }

  startBatchProcessor() {
    this.flushTimer = setInterval(() => {
      this.flush()
    }, this.flushInterval)
  }
}

🎯 不动产物联网应用迁移

技术方案映射

船舶设备监控不动产设备监控

  • 主发电机监控 → 楼宇电力系统监控
  • 液位系统监控 → 供排水系统监控
  • 火灾报警系统 → 消防安全系统监控

渔船位置追踪资产位置管理

  • 实时位置更新 → 移动资产追踪
  • 地理围栏管理 → 区域权限控制
  • 轨迹记录分析 → 资产使用分析

数据实时性保障商业决策支持

  • 秒级数据更新 → 实时运营数据
  • 异常快速响应 → 及时业务预警
  • 历史数据分析 → 趋势预测分析

🔮 技术发展趋势

MQTT 5.0 新特性应用

// MQTT 5.0 增强功能
const mqtt5Config = {
  protocolVersion: 5,

  // 用户属性
  userProperties: {
    'client-type': 'real-estate-monitor',
    'location': 'building-a',
    'department': 'facilities'
  },

  // 消息过期
  messageExpiryInterval: 300, // 5分钟过期

  // 话题别名
  topicAliasMaximum: 100,

  // 请求响应模式
  requestResponseInformation: true,
  requestProblemInformation: true
}

📈 项目成果总结

技术指标

项目连接数量消息吞吐延迟可用性
船舶AMS1000+设备10k/秒<100ms99.9%
智慧渔业500+渔船5k/秒<200ms99.8%
智慧校园200+设备2k/秒<150ms99.9%

业务价值

  • 实时监控: 实现设备状态秒级更新
  • 快速响应: 异常情况毫秒级告警
  • 数据可靠: 消息传输99.9%成功率
  • 系统稳定: 7×24小时连续运行

通过这些实时通信技术的深度实践,我具备了为安腾不动产数字化平台构建高性能、高可靠物联网通信架构的完整能力。

分享这篇文章