neutron服务启动源码分析(四)

2024-01-07 21:51:47
本次了解一下neutron openvswitch agent的启动流程

neutron openvswitch agent的启动命令

#/var/lib/kolla/venv/bin/python /var/lib/kolla/venv/bin/neutron-openvswitch-agent --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini
  1. neutron/cmd/eventlet/plugins/ovs_neutron_agent.py
import neutron.plugins.ml2.drivers.openvswitch.agent.main as agent_main


def main():
    agent_main.main()
  1. neutron/plugins/mls2/drivers/openvswitch/agent/main.py

这里初始了配置,import 了native的module

cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
                      'common.config')


_main_modules = {
    'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
                 'ovs_ofctl.main',
    'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
                 'native.main',
}


def main():
    common_config.init(sys.argv[1:])
    driver_name = cfg.CONF.OVS.of_interface
    mod_name = _main_modules[driver_name]
    mod = importutils.import_module(mod_name)
    mod.init_config()
    common_config.setup_logging()
    profiler.setup("neutron-ovs-agent", cfg.CONF.host)
    mod.main()
  1. neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/main.py

调用了OVSNeutronAgentOSKenApp的start方法

def main():
    app_manager.AppManager.run_apps([
        'neutron.plugins.ml2.drivers.openvswitch.agent.'
        'openflow.native.ovs_oskenapp',
    ])
  1. neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/ovs_oskenapp.py
def agent_main_wrapper(bridge_classes):
    try:
        ovs_agent.main(bridge_classes)
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception("Agent main thread died of an exception")
    finally:
        # The following call terminates os-ken's AppManager.run_apps(),
        # which is needed for clean shutdown of an agent process.
        # The close() call must be called in another thread, otherwise
        # it suicides and ends prematurely.
        hub.spawn(app_manager.AppManager.get_instance().close)


class OVSNeutronAgentOSKenApp(app_manager.OSKenApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def start(self):
        # Start os-ken event loop thread
        super(OVSNeutronAgentOSKenApp, self).start()

        def _make_br_cls(br_cls):
            return functools.partial(br_cls, os_ken_app=self)

        # Start agent main loop thread
        bridge_classes = {
            'br_int': _make_br_cls(br_int.OVSIntegrationBridge),
            'br_phys': _make_br_cls(br_phys.OVSPhysicalBridge),
            'br_tun': _make_br_cls(br_tun.OVSTunnelBridge),
        }
        return hub.spawn(agent_main_wrapper, bridge_classes, raise_error=True)
  1. neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py

校验 tunnel 类型和local ip,

def main(bridge_classes):
    prepare_xen_compute()
    ovs_capabilities.register()
    ext_manager.register_opts(cfg.CONF)
    agent_config.setup_privsep()
    service_conf.register_service_opts(service_conf.RPC_EXTRA_OPTS, cfg.CONF)

    ext_mgr = ext_manager.L2AgentExtensionsManager(cfg.CONF)

    # now that all extensions registered their options, we can log them
    n_utils.log_opt_values(LOG)

    validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)

    try:
        agent = OVSNeutronAgent(bridge_classes, ext_mgr, cfg.CONF)
        capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
    except (RuntimeError, ValueError) as e:
        LOG.error("%s Agent terminated!", e)
        sys.exit(1)
    agent.daemon_loop()
  1. neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py

初始化neutron agent, 创建br-int 交换机,启用tunnel则创建br-tun交换机以及流表, agent侧开启rpc,如果启用dvr则下发dvr流表,初始化firewall (目前是 基于iptables 的 neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver)

class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
                      dvr_rpc.DVRAgentRpcCallbackMixin):
   def __init__(self, bridge_classes, ext_manager, conf=None):
        '''Constructor.

        :param bridge_classes: a dict for bridge classes.
        :param conf: an instance of ConfigOpts
        '''
        super(OVSNeutronAgent, self).__init__()
        self.conf = conf or cfg.CONF
        self.ovs = ovs_lib.BaseOVS()
        self.ext_manager = ext_manager
        agent_conf = self.conf.AGENT
        ovs_conf = self.conf.OVS

        self.fullsync = False
        # init bridge classes with configured datapath type.
        self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
            functools.partial(bridge_classes[b],
                              datapath_type=ovs_conf.datapath_type)
            for b in ('br_int', 'br_phys', 'br_tun'))

        self.use_veth_interconnection = ovs_conf.use_veth_interconnection
        self.veth_mtu = agent_conf.veth_mtu
        self.available_local_vlans = set(six.moves.range(
            n_const.MIN_VLAN_TAG, n_const.MAX_VLAN_TAG + 1))
        self.tunnel_types = agent_conf.tunnel_types or []
        self.l2_pop = agent_conf.l2_population
        # TODO(ethuleau): Change ARP responder so it's not dependent on the
        #                 ML2 l2 population mechanism driver.
        self.enable_distributed_routing = agent_conf.enable_distributed_routing
        self.arp_responder_enabled = agent_conf.arp_responder and self.l2_pop

        host = self.conf.host
        self.agent_id = 'ovs-agent-%s' % host

        self.enable_tunneling = bool(self.tunnel_types)

        # Validate agent configurations
        self._check_agent_configurations()

        # Keep track of int_br's device count for use by _report_state()
        self.int_br_device_count = 0

        self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
        self.setup_integration_br()
        # Stores port update notifications for processing in main rpc loop
        self.updated_ports = set()
        # Stores port delete notifications
        self.deleted_ports = set()
        # Stores the port IDs whose binding has been deactivated
        self.deactivated_bindings = set()
        # Stores the port IDs whose binding has been activated
        self.activated_bindings = set()

        self.network_ports = collections.defaultdict(set)
        # keeps association between ports and ofports to detect ofport change
        self.vifname_to_ofport_map = {}
        # Stores newly created bridges
        self.added_bridges = list()
        self.bridge_mappings = self._parse_bridge_mappings(
            ovs_conf.bridge_mappings)
        self.rp_bandwidths = place_utils.parse_rp_bandwidths(
            ovs_conf.resource_provider_bandwidths)

        br_set = set(six.itervalues(self.bridge_mappings))
        n_utils.validate_rp_bandwidth(self.rp_bandwidths,
                                      br_set)
        self.rp_inventory_defaults = place_utils.parse_rp_inventory_defaults(
            ovs_conf.resource_provider_inventory_defaults)
        self.rp_hypervisors = utils.default_rp_hypervisors(
            ovs_conf.resource_provider_hypervisors,
            {k: [v] for k, v in self.bridge_mappings.items()}
        )

        self.setup_physical_bridges(self.bridge_mappings)
        self.vlan_manager = vlanmanager.LocalVlanManager()

        self._reset_tunnel_ofports()

        self.polling_interval = agent_conf.polling_interval
        self.minimize_polling = agent_conf.minimize_polling
        self.ovsdb_monitor_respawn_interval = (
            agent_conf.ovsdb_monitor_respawn_interval or
            constants.DEFAULT_OVSDBMON_RESPAWN)
        self.local_ip = ovs_conf.local_ip
        self.tunnel_count = 0
        self.vxlan_udp_port = agent_conf.vxlan_udp_port
        self.dont_fragment = agent_conf.dont_fragment
        self.tunnel_csum = agent_conf.tunnel_csum
        self.tos = ('inherit'
                    if agent_conf.dscp_inherit
                    else (int(agent_conf.dscp) << 2
                          if agent_conf.dscp
                          else None))
        self.tun_br = None
        self.patch_int_ofport = constants.OFPORT_INVALID
        self.patch_tun_ofport = constants.OFPORT_INVALID
        if self.enable_tunneling:
            # The patch_int_ofport and patch_tun_ofport are updated
            # here inside the call to setup_tunnel_br()
            self.setup_tunnel_br(ovs_conf.tunnel_bridge)
            self.setup_tunnel_br_flows()

        self.enable_vip = False
        self.groups = None
        self.setup_rpc()

        self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
            self.context,
            self.dvr_plugin_rpc,
            self.int_br,
            self.tun_br,
            self.bridge_mappings,
            self.phys_brs,
            self.int_ofports,
            self.phys_ofports,
            self.patch_int_ofport,
            self.patch_tun_ofport,
            host,
            self.enable_tunneling,
            self.enable_distributed_routing,
            self.arp_responder_enabled)

        if self.enable_distributed_routing:
            self.dvr_agent.setup_dvr_flows(self.bridge_mappings)

        # Collect additional bridges to monitor
        self.ancillary_brs = self.setup_ancillary_bridges(
            ovs_conf.integration_bridge, ovs_conf.tunnel_bridge)

        agent_api = ovs_ext_api.OVSAgentExtensionAPI(self.int_br,
                                                     self.tun_br,
                                                     self.phys_brs)
        self.ext_manager.initialize(
            self.connection, constants.EXTENSION_DRIVER_TYPE, agent_api)

        # In order to keep existed device's local vlan unchanged,
        # restore local vlan mapping at start
        self._restore_local_vlan_map()

        # Security group agent support
        self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
            self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
            integration_bridge=self.int_br)
        self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
            self.sg_agent)

        self.sg_agent.init_ovs_dvr_firewall(self.dvr_agent)

        # we default to False to provide backward compat with out of tree
        # firewall drivers that expect the logic that existed on the Neutron
        # server which only enabled hybrid plugging based on the use of the
        # hybrid driver.
        hybrid_plug = getattr(self.sg_agent.firewall,
                              'OVS_HYBRID_PLUG_REQUIRED', False)
        self.prevent_arp_spoofing = (
            not self.sg_agent.firewall.provides_arp_spoofing_protection)

        self.failed_report_state = False
        # TODO(mangelajo): optimize resource_versions to only report
        #                  versions about resources which are common,
        #                  or which are used by specific extensions.
        self.agent_state = {
            'binary': 'neutron-openvswitch-agent',
            'host': host,
            'topic': n_const.L2_AGENT_TOPIC,
            'configurations': {'bridge_mappings': self.bridge_mappings,
                               c_const.RP_BANDWIDTHS: self.rp_bandwidths,
                               c_const.RP_INVENTORY_DEFAULTS:
                                   self.rp_inventory_defaults,
                               'resource_provider_hypervisors':
                               self.rp_hypervisors,
                               'integration_bridge':
                               ovs_conf.integration_bridge,
                               'tunnel_types': self.tunnel_types,
                               'tunneling_ip': self.local_ip,
                               'l2_population': self.l2_pop,
                               'arp_responder_enabled':
                               self.arp_responder_enabled,
                               'enable_distributed_routing':
                               self.enable_distributed_routing,
                               'log_agent_heartbeats':
                               agent_conf.log_agent_heartbeats,
                               'extensions': self.ext_manager.names(),
                               'datapath_type': ovs_conf.datapath_type,
                               'ovs_capabilities': self.ovs.capabilities,
                               'vhostuser_socket_dir':
                               ovs_conf.vhostuser_socket_dir,
                               portbindings.OVS_HYBRID_PLUG: hybrid_plug},
            'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
            'agent_type': agent_conf.agent_type,
            'start_flag': True}

        report_interval = agent_conf.report_interval
        if report_interval:
            heartbeat = loopingcall.FixedIntervalLoopingCall(
                self._report_state)
            heartbeat.start(interval=report_interval)
        # Initialize iteration counter
        self.iter_num = 0
        self.run_daemon_loop = True

        self.catch_sigterm = False
        self.catch_sighup = False

        # The initialization is complete; we can start receiving messages
        self.connection.consume_in_threads()

        self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout

        self.install_ingress_direct_goto_flows()

        self._register_rpc_consumers()
        self.idc_lb_vxlan_remote_ips = ovs_conf.idc_lb_vxlan_remote_ips
        self.to_idc_lb_hash_fields = ovs_conf.to_idc_lb_hash_fields
        if self.arp_responder_enabled and self.enable_tunneling and self.idc_lb_vxlan_remote_ips:
            self.enable_vip = True
            #key is net_uuid, value is list of network's vip port
            self.network_vip = dict()
            #key is vip port_id, key is dict of {mac:xxx, net_uuid:xxx, vlan: xxx, fixed_ips:xxx}
            self.vip_detail = dict()
            #key is vxlan remote ip, value is ovs ofport
            self.vip_vxlan_ofport = dict()
            bucket_num = 0
            for ip in self.idc_lb_vxlan_remote_ips.split(','):
                port_name = self.get_tunnel_name(
                    n_const.TYPE_VXLAN, self.local_ip, ip)
                if port_name is not None:
                    ofport = self._setup_tunnel_port(self.tun_br,
                                                     port_name,
                                                     ip,
                                                     n_const.TYPE_VXLAN)
                    self.vip_vxlan_ofport[ip] = ofport
                    self.tun_br.set_port_bfd(port_name, True, self.local_ip, ip)
                    bucket_num = bucket_num + 1
            self.groups = self.tun_br.dump_groups(bucket_num)
            LOG.debug("group table is %s", self.groups)
  1. neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
 def daemon_loop(self):
        # Start everything.
        LOG.info("Agent initialized successfully, now running... ")
        signal.signal(signal.SIGTERM, self._handle_sigterm)
        if hasattr(signal, 'SIGHUP'):
            signal.signal(signal.SIGHUP, self._handle_sighup)
        br_names = [br.br_name for br in self.phys_brs.values()]

        self.ovs.ovsdb.idl_monitor.start_bridge_monitor(br_names)
        with polling.get_polling_manager(
                self.minimize_polling,
                self.ovsdb_monitor_respawn_interval) as pm:
            self.rpc_loop(polling_manager=pm)
  1. neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py

重点关注 tunnel_sync和 process_network_ports, 其中 process_network_ports主要获取本机相关devices, 代码 while self._check_and_handle_signal() 会一直死循环检查 bridge, tunnel, port 变更等

def rpc_loop(self, polling_manager):
    idl_monitor = self.ovs.ovsdb.idl_monitor
        sync = False
        ports = set()
        updated_ports_copy = set()
        activated_bindings_copy = set()
        ancillary_ports = set()
        tunnel_sync = True
        ovs_restarted = False
        bridges_recreated = False
        consecutive_resyncs = 0
        need_clean_stale_flow = True
        ports_not_ready_yet = set()
        failed_devices = {'added': set(), 'removed': set()}
        failed_ancillary_devices = {'added': set(), 'removed': set()}
        failed_devices_retries_map = {}
        while self._check_and_handle_signal():
            if self.fullsync:
                LOG.info("rpc_loop doing a full sync.")
                sync = True
                self.fullsync = False
            port_info = {}
            ancillary_port_info = {}
            start = time.time()
            LOG.info("Agent rpc_loop - iteration:%d started",
                     self.iter_num)
            ovs_status = self.check_ovs_status()
            if ovs_status == constants.OVS_RESTARTED:
                self._handle_ovs_restart(polling_manager)
                tunnel_sync = self.enable_tunneling or tunnel_sync
                # setup port forward flows
                self.setup_pf_fip_flows()
                # setup snat ip flows
                self.setup_snat_ip_flows()
            elif ovs_status == constants.OVS_DEAD:
                # Agent doesn't apply any operations when ovs is dead, to
                # prevent unexpected failure or crash. Sleep and continue
                # loop in which ovs status will be checked periodically.
                port_stats = self.get_port_stats({}, {})
                self.loop_count_and_wait(start, port_stats)
                continue
            # Check if any physical bridge wasn't recreated recently
            added_bridges = idl_monitor.bridges_added + self.added_bridges
            bridges_recreated |= self._reconfigure_physical_bridges(
                added_bridges)
            if bridges_recreated:
                # In case when any bridge was "re-created", we need to ensure
                # that there is no any stale flows in bridges left
                need_clean_stale_flow = True
            sync |= bridges_recreated
            # Notify the plugin of tunnel IP
            if self.enable_tunneling and tunnel_sync:
                try:
                    tunnel_sync = self.tunnel_sync()
                except Exception:
                    LOG.exception("Error while configuring tunnel endpoints")
                    tunnel_sync = True
            ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
            devices_need_retry = (any(failed_devices.values()) or
                                  any(failed_ancillary_devices.values()) or
                                  ports_not_ready_yet)
            if (self._agent_has_updates(polling_manager) or sync or
                    devices_need_retry or ovs_restarted):
                try:
                    LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
                             "starting polling. Elapsed:%(elapsed).3f",
                             {'iter_num': self.iter_num,
                              'elapsed': time.time() - start})
                    # Save updated ports dict to perform rollback in
                    # case resync would be needed, and then clear
                    # self.updated_ports. As the greenthread should not yield
                    # between these two statements, this will be thread-safe
                    updated_ports_copy = self.updated_ports
                    self.updated_ports = set()
                    activated_bindings_copy = self.activated_bindings
                    self.activated_bindings = set()
                    (port_info, ancillary_port_info, consecutive_resyncs,
                     ports_not_ready_yet) = (self.process_port_info(
                            start, polling_manager, sync, ovs_restarted,
                            ports, ancillary_ports, updated_ports_copy,
                            consecutive_resyncs, ports_not_ready_yet,
                            failed_devices, failed_ancillary_devices))
                    sync = False
                    self.process_deleted_ports(port_info)
                    self.process_deactivated_bindings(port_info)
                    self.process_activated_bindings(port_info,
                                                    activated_bindings_copy)
                    ofport_changed_ports = self.update_stale_ofport_rules()
                    if ofport_changed_ports:
                        port_info.setdefault('updated', set()).update(
                            ofport_changed_ports)
                    LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
                             "port information retrieved. "
                             "Elapsed:%(elapsed).3f",
                             {'iter_num': self.iter_num,
                              'elapsed': time.time() - start})
                    # Secure and wire/unwire VIFs and update their status
                    # on Neutron server
                    if (self._port_info_has_changes(port_info) or
                            self.sg_agent.firewall_refresh_needed() or
                            ovs_restarted or bridges_recreated):
                        LOG.debug("Starting to process devices in:%s",
                                  port_info)
                        provisioning_needed = (
                                ovs_restarted or bridges_recreated)
                        failed_devices = self.process_network_ports(
                            port_info, provisioning_needed)
                        if need_clean_stale_flow:
                            self.cleanup_stale_flows()
                            need_clean_stale_flow = False
                        LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
                                 "ports processed. Elapsed:%(elapsed).3f",
                                 {'iter_num': self.iter_num,
                                  'elapsed': time.time() - start})

                    ports = port_info['current']

                    if self.ancillary_brs:
                        failed_ancillary_devices = (
                            self.process_ancillary_network_ports(
                                ancillary_port_info))
                        LOG.info("Agent rpc_loop - iteration: "
                                 "%(iter_num)d - ancillary ports "
                                 "processed. Elapsed:%(elapsed).3f",
                                 {'iter_num': self.iter_num,
                                  'elapsed': time.time() - start})
                        ancillary_ports = ancillary_port_info['current']

                    polling_manager.polling_completed()
                    failed_devices_retries_map = (
                        self.update_retries_map_and_remove_devs_not_to_retry(
                            failed_devices, failed_ancillary_devices,
                            failed_devices_retries_map))
                    # Keep this flag in the last line of "try" block,
                    # so we can sure that no other Exception occurred.
                    ovs_restarted = False
                    bridges_recreated = False
                    self._dispose_local_vlan_hints()
                except Exception:
                    LOG.exception("Error while processing VIF ports")
                    # Put the ports back in self.updated_port
                    self.updated_ports |= updated_ports_copy
                    self.activated_bindings |= activated_bindings_copy
                    sync = True
            port_stats = self.get_port_stats(port_info, ancillary_port_info)
            self.loop_count_and_wait(start, port_stats)

文章来源:https://blog.csdn.net/u010494323/article/details/135411078
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。