微信小程序直播 数据同步与转存

本教程所用项目框架为eggjs 官方文档, 数据库使用 mysql 和 redis 缓存数据库
本教程参照文档: 小程序直播 | 微信开放文档

【获取直播房间列表】接口规则:该接口仅供商家后台调用,调用限额 500 次/天,建议开发者自己做缓存(此接口与下面【获取回放视频】接口共用 500 次/天限制,请合理分配调用频次)

根据以上接口规则, 我们接下来需要做的是:

  1. 通过调用该接口将直播间列表数据全部保存到数据库
  2. 前端小程序调用自己的后台接口拉取数据库中的数据

整体项目分为5个部分:

1. 添加插件配置以及数据模型定义
2. 定时获取 access_token 并存储
3. 定义路由
4. 定义service服务层
5. 定义controller控制层

以下是具体实现流程:

一. 添加插件配置以及定义数据模型

  1. 安装并配置 egg-redis 插件
  • 安装
npm i egg-redis --save
  • 在 config/plugin.js 中引入 egg-redis 插件
exports.redis = {
  enable: true,
  package: 'egg-redis',
};
  • 在 config/config.default.js 中编写 redis 配置
config.redis = {
    client: {
      port: 6379,
      host: 'xx.xx.xx.xx',
      password: 'pwdxxxxxx',
      db: 0,
    },
  };

2. 安装并配置 egg-sequelize 插件

  • 安装
npm install --save egg-sequelize mysql2
  • 在 config/plugin.js 中引入 egg-sequelize 插件
exports.sequelize = {
  enable: true,
  package: 'egg-sequelize',
};
  • 在 config/config.default.js 中编写 sequelize 配置
config.sequelize = {
    dialect: 'mysql',
    database: 'live_data',
    host: 'xx.xx.xx.xx',
    port: '3306',
    username: 'user',
    password: 'pwdxxxxxxx',
    timezone: '+08:00', // 由于orm用的UTC时间,这里必须加上东八区,否则取出来的时间相差8小时
    define: { // model的全局配置
      timestamps: true, // 添加create,update,delete时间戳
      paranoid: false, // 添加软删除
      freezeTableName: true, // 防止修改表名为复数
      underscored: false, // 防止驼峰式字段被默认转为下划线
    },
    dialectOptions: {
      charset: 'utf8mb4',
      typeCast(field, next) {
        // for reading from database
        if (field.type === 'DATETIME') {
          return field.string();
        }
        return next();
      },
    },
  };

3. 定义直播数据模型

'use strict';

module.exports = app => {
  const { INTEGER, STRING, TEXT } = app.Sequelize;
  const Wxlive = app.model.define('wxlive', {
    id: { type: INTEGER, primaryKey: true, autoIncrement: true },
    roomid: { type: INTEGER(11), comment: '直播间id' },
    name: { type: STRING(512), comment: '标题' },
    cover_img: { type: STRING(512), comment: '封面' },
    live_status: { type: INTEGER(11), comment: '直播状态' },
    start_time: { type: INTEGER(11), comment: '开始时间' },
    end_time: { type: INTEGER(11), comment: '结束时间' },
    anchor_name: { type: STRING(255), comment: '主播' },
    anchor_img: { type: STRING(512), comment: '主播头像' },
    goods: { type: TEXT, comment: '商品' },
    live_replay: { type: TEXT, comment: '回放内容' },
    is_top: { type: INTEGER(10), defaultValue: 0, comment: '置顶' },
  });
  return Wxlive;
};


二. 定时获取 access_token 并存储

  1. 在 service 文件夹下, 新建 wx.js 文件, 定义读取和加载 access_token 的方法
/**
   * 读出redis中的accesstoken
   */
  async getAccessToken() {
    const { app } = this;
    let accessToken = await app.redis.get('_accesstoken');
    if (!accessToken) {
      accessToken = await this.fetchAccessToken();
    }
    return accessToken;
  }

  /**
   * 网络获取accesstoken
   */
  async fetchAccessToken() {
    const { ctx, app } = this;
    const appId = 'xxxxxx';
    const appSecret = 'xxxxxx';
    const url = `https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=${appId}&secret=${appSecret}`;
    const res = await ctx.curl(url, { dataType: 'json' });
    if (res.status === 200 && res.data.access_token) {
      await app.redis.set('_accesstoken', res.data.access_token);
      return res.data.access_token;
    }
    return '';
  }


2. 在 schedule 文件夹下,新建 wx_task.js 文件,用于定时每7000秒获取一次access_token

'use strict';

const Subscription = require('egg').Subscription;

class GetAccessToken extends Subscription {
  static get schedule() {
    return {
      immediate: true,
      interval: 7000 * 1000,
      type: 'all', // 指定所有的 worker 都需要执行
    };
  }
  async subscribe() {
    const { ctx, service, app } = this;
    ctx.logger.info('【项目运行环境】:' + app.config.env);
    const env = app.config.env;
    if (env !== 'local') {
      await service.wx.fetchAccessToken();
    }
  }
}
module.exports = GetAccessToken;

三. 定义路由

'use strict';

module.exports = app => {
  const { router, controller } = app;
  router.get('/live/syncRoomList', controller.live.syncRoomList); // 同步直播间列表
  router.get('/live/syncLiveReplay', controller.live.syncLiveReplay); // 同步直播回放
  router.get('/live/getRoomList', controller.live.getRoomList); // 前端获取直播间列表
};

四. 定义service服务层

  1. 定义一个用来记录当天请求次数的方法
/**
   * 记录当天的请求次数, type: 1:直播列表 2:回放视频
   */
  async syncReqNum(type) {
    const { ctx, app } = this;
    const date = ctx.helper.getYMD();
    const numKey = `${type === 1 ? '_inc_live_roominfo_reqnum_' : '_inc_live_replay_reqnum_'}${date}`;
    const roomReqNum = await app.redis.get(numKey);
    const num = roomReqNum ? parseInt(roomReqNum) + 1 : 1;
    await app.redis.set(numKey, num + '');
  }

2. 查询本地数据库,获取直播房间列表

/**
   * 获取直播房间列表
   */
  async getRoomList(limit, offset) {
    const { app } = this;
    const options = {
      offset,
      limit,
      order: [[ 'start_time', 'desc' ], [ 'is_top', 'desc' ]],
      where: {},
    };
    return app.model.Wxlive.findAndCountAll(options);
  }

3. 传递房间ID,获取对应的回放源数据

/**
   * 同步回放源视频
   */
  async syncLiveReplay(roomId) {
    const { ctx, app, service } = this;
    const accessToken = await service.wx.getAccessToken();
    if (!accessToken) {
      return '';
    }
    const url = `http://api.weixin.qq.com/wxa/business/getliveinfo?access_token=${accessToken}`;
    const params = { start: 0, limit: 100, room_id: roomId, action: 'get_replay' };
    const res = await ctx.curl(url, { method: 'POST', contentType: 'json', data: params, dataType: 'json' });
    ctx.logger.info('【获取回放源视频】 ==> ' + JSON.stringify(res.data));
    await this.syncReqNum(2); // 记录当天的请求次数
    if (res.data.errcode === 0 && res.data.errmsg === 'ok') {
      const liveReplay = JSON.stringify(res.data.live_replay); // 一场直播可能会产生多个视频片段,此处是列表的字符串
      const model = await app.model.Wxlive.findOne({ where: { roomid: roomId } });
      if (model) {
        await model.update({ live_replay: liveReplay });
      }
      return liveReplay;
    }
    return ''; // 代表未创建直播房间
  }


4. 循环加载直播房间列表

跟小程序直播后台直播记录保持一致, 有则更新, 无则删除

/**
   * 同步直播房间列表
   */
  async syncRoomList() {
    const { ctx, app, service } = this;
    const accessToken = await service.wx.getAccessToken();
    if (!accessToken) {
      return { code: 0, msg: '同步直播间列表失败: errcode=40001, errmsg=直播间列表为空' };
    }
    let page = 1;
    const pageSize = 50;
    const roomIds = []; // 直播间ID列表
    const url = `http://api.weixin.qq.com/wxa/business/getliveinfo?access_token=${accessToken}`;
    while (true) {
      const params = { start: (page - 1) * pageSize, limit: pageSize };
      const res = await ctx.curl(url, { method: 'POST', contentType: 'json', data: params, dataType: 'json' });
      await this.syncReqNum(1); // 记录当天的请求次数
      const errcode = res.data.errcode;
      if (errcode !== 0) {
        if (errcode === 1) {
          return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=直播间列表为空` };
        } else if (errcode === 48001) {
          return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=小程序没有直播权限` };
        }
        return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=${res.data.errmsg}` };
      }
      const roomList = res.data.room_info;
      for (const room of roomList) {
        const roomId = room.roomid;
        roomIds.push(roomId); // 添加到直播间ID列表
        const wxlive = await app.model.Wxlive.findOne({ where: { roomid: roomId } });
        const updateData = { name: room.name, cover_img: room.cover_img, live_status: room.live_status, start_time: room.start_time, end_time: room.end_time, anchor_name: room.anchor_name, anchor_img: room.anchor_img, goods: JSON.stringify(room.goods) };
        if (!wxlive) {
          // 不存在,创建一个
          const insertData = updateData;
          insertData.roomid = roomId;
          await app.model.Wxlive.create(insertData);
          if (room.live_status === 103) {
            // 直播已结束, 需要获取回放源视频保存到数据库
            await this.syncLiveReplay(roomId);
          }
          continue;
        }
        // 数据库中存在, 判断已结束的直播是否有回放地址
        if (wxlive.live_status === 103 && !wxlive.live_replay) {
          // 不存在回放地址, 则需要获取回放源视频保存到数据库
          await this.syncLiveReplay(roomId);
        }
        await wxlive.update(updateData); // 更新数据库中的直播数据
      }
      if (res.data.total < page * pageSize) {
        // 总数小于 页码*页长, 跳出循环
        break;
      }
      page++; // 查询下一页
    }
    // 当所有直播列表都遍历完后, 删除已经不存在的直播数据
    const arr = await app.model.Wxlive.findAll({ where: { roomid: { [app.Sequelize.Op.notIn]: roomIds } } });
    if (arr && arr.length > 0) {
      for (const item of arr) {
        await item.destroy();
      }
    }
    return { code: 1, msg: '同步直播间列表成功' };
  }


五. 定义controller控制层

'use strict';

const Controller = require('egg').Controller;

class LiveController extends Controller {
  /**
   * 同步直播房间列表
   */
  async syncRoomList() {
    const { ctx, service } = this;
    ctx.body = await service.live.syncRoomList();
  }
  /**
   * 同步回放源视频
   */
  async syncLiveReplay() {
    const { ctx, service } = this;
    const roomId = parseInt(ctx.query.roomId || '0');
    const liveReplay = await service.live.syncLiveReplay(roomId);
    ctx.body = { code: 1, msg: 'success', data: liveReplay };
  }
  /**
   * 获取直播房间列表
   */
  async getRoomList() {
    const { ctx, service } = this;
    const limit = ctx.helper.toInt(ctx.query.pageSize || 10);
    const pageNum = ctx.helper.toInt(ctx.query.pageNum || 1);
    const offset = (pageNum - 1) * limit;
    const res = await service.live.getRoomList(limit, offset);
    const liveList = res.rows;
    if (res.count > 0) {
      for (const item of liveList) {
        if (item.goods && ctx.helper.isJSON(item.goods)) {
          item.goods = JSON.parse(item.goods);
        } else {
          item.goods = [];
        }
        if (item.live_replay && ctx.helper.isJSON(item.live_replay)) {
          item.live_replay = JSON.parse(item.live_replay);
        } else {
          item.live_replay = [];
        }
        if (!item.anchor_img) {
          item.anchor_img = 'https://www.xxx.com/app-logo.png';
        }
      }
    }
    ctx.body = { total_count: res.count, page_size: limit, cur_page: pageNum, data: liveList };
  }
}

module.exports = LiveController;