Nestjs 微服务实战 - 动态微服务创建链接

2024-01-08 14:54:52

所有的微服务都需要做服务治理

服务治理包括(配置中心、服务发现、注册服务等等),常见的包括 Java 的 Nacos,这里不关注与服务治理,只说明,如何用 nest 网关,并且在网关层动态实现微服务注入

nestjs 官网的案例明显是偏向于手动注册微服务的,例如:

/** Model */
@Module({
  imports: [
 	/** Model 中使用并注册 */ 
    ClientsModule.register([
      { name: 'MATH_SERVICE', transport: Transport.TCP },
    ]),
  ]
  ...
});

/** service / controller 使用 */
constructor(
  /** 通过 @Inject 装饰器只能挨个进行倒入,有多少服务倒入多少 */
  @Inject('MATH_SERVICE') private client: ClientProxy,
) {}

以上属于官网列出的例子,对于真正的微服务开发肯定是不够用的,因为服务并没有根据配置中心的配置进行动态变更倒入,这种情况下,假如某个服务很占用 CPU、内存导致荡机了,也无法进行动态扩展,只能去重启服务手动更改配置,而我需要的是完全动态,每个服务部署都独立部署出来,对于高CPU的服务进行动态扩展服务器,进行自动切换 ip 端口线路。

首先所有的服务都不要在 Model 中进行注册,建议全丢在 service / controller 中使用,我这里以 controller 为例:

import { All, Controller, Req, Res } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
import { createProxyServer } from 'http-proxy';

/** 动态数组,这里后面要通过定时任务去配置中心拉取 */
const serviceList = [
  {
    name: 'user-center',
    host: '127.0.0.1',
    port: 65531,
  },
];

@Controller()
export class AppController {
  /** 首先所有路由不管任何请求方式全部代理到这个方法上 */
  @All('*')
  async root(@Req() req, @Res() res): Promise<any> {
    /** 获取所有可获取的参数 */
    const { method, originalUrl, query, body, headers } = req;

    /** 根据 url 字符串切割一下获取,主要获取第一级和第二级路由 */
    const list = originalUrl.split('?')[0].split('/').filter((item: string) => item !== '');

    /** 所有,开头为 /api 的参数,并且路由大于并且等于 2 级以上的,基本都可以调用微服务 */
    if (list.length >= 2 && list[0] === 'api') {
    
      /** 微服务真正的地址,是三级路径 */
      const path = `/${list.slice(2).join('/')}`;

      /** 服务名称是二级目录 */
      const service = list[1];

      /** 查找服务名称是否存在 */
      const server = serviceList.find((item) => item.name === service);

      /** 服务存在 */
      if (server) {
      
        /** 错误执行 */
        const errorNext = (err: string) => {
          /** 地址错误 */
          if (err === 'There is no matching message handler defined in the remote service.') {
            /** 手动关闭服务链接 */
            this[service].close();
            /** 清楚链接,下次链接如果服务可用就触发重新链接 */
            this[service] = undefined;
            
            res.status(404);
            res.send({
              code: -1,
              msg: '地址错误',
            });
            
          } else {
            console.log(`其他错误: ${err}`);
          }
        };
        /** 当前 class 中找不到服务,需要创建服务 */
        if (!this[service]) {
          /** 动态创建服务链接 */
          this[service] = ClientProxyFactory.create({
            transport: Transport.TCP,
            options: {
              host: server.host,
              port: server.port,
            },
          });
          try {
            /** 链接服务 */
            await this[service].connect();
            /** 发送消息*/
            const data = this[service].send(
              /** 服务的接口地址是 { method = 请求方法, path = 等于相对路径 }  */
              { method, path },
              /** 把所有 http 的参数全部传递过去,让那边服务自己判断处理 */
              { query, body, headers },
            );
            /** 接受流消息 */
            data.subscribe({
              /** 成功执行 next */
              next: (_res_: any) => {
                res.status(200);
                res.send(_res_);
              },
              /** 失败吧错误信息丢个 errorNext */
              error: (err: any) => errorNext(err),
            });
          } /** 服务存在,但是可能荡机了,又或者服务正在重启过程中 */ catch (err) {
            /** 手动关闭服务链接 */
            this[service].close();
            /** 清楚链接,下次链接如果服务可用就触发重新链接 */
            this[service] = undefined;
            res.status(503);
            res.send({
              code: -1,
              msg: '当前服务正在维护中',
            });
          }
        } /** 服务已存在 */ else {
          /** 发送消息*/
          const data = this[service].send(
            /** 服务的接口地址是 { method = 请求方法, path = 等于相对路径 }  */
            { method, path },
            /** 把所有 http 的参数全部传递过去,让那边服务自己判断处理 */
            { query, body, headers },
          );
          /** 接受流消息 */
          data.subscribe({
            /** 成功执行 next */
            next: (_res_: any) => {
              res.status(200);
              res.send(_res_);
            },
            /** 失败吧错误信息丢个 errorNext */
            error: (err: any) => errorNext(err),
          });
        }
      } /** 未找到服务 */ else {
        res.status(404);
        res.send({
          code: -1,
          msg: '未找到对应的服务',
        });
      }
    } /** 反向代理到 admin 地址 */ else if (list.length >= 2 && list[0] === 'admin') {
      const proxy = createProxyServer({
        target: 'http://127.0.0.1:4000',
        changeOrigin: true,
      });
      proxy.on('error', () => {
        res.status(503);
        res.send({
          code: -1,
          msg: '当前服务正在维护中',
        });
      });
      proxy.web(req, res);
    } /** 反向代理到 nuxt ssr 前端页面地址 */ else {
      const proxy = createProxyServer({
        target: 'http://127.0.0.1:5000',
        changeOrigin: true,
      });
      proxy.on('error', () => {
        res.status(503);
        res.send({
          code: -1,
          msg: '当前服务正在维护中',
        });
      });
      proxy.web(req, res);
    }
  }
}

以上是我的核心逻辑部分,主要逻辑应该是一下步骤,

/** 1、创建服务 */
const proxy = ClientProxyFactory.create({
  transport: Transport.TCP,
  options: {
    host: '127.0.0.1',
    port: 65531,
  },
});

/** 2、服务连接 */
await proxy.connect();

/** 3、发送消息 */
const data = this[service].send(
  消息接口,
  附加数据,
);

/** 4、接受流消息 */
data.subscribe({
  /** 成功消息 */
  next: (_res_: any) => {
    res.status(200);
    res.send(_res_);
  },
  /** 排错 */
  error: (err: any) => {},
});

文章来源:https://blog.csdn.net/weixin_43704471/article/details/135403752
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。