Donkeycar软件架构

Donkeycar非常简单;代码被组织成接受输入并返回输出的部分。这些部分被添加到一个车辆中。一旦启动车辆循环,部分会按顺序运行。这些部分通过读取和修改车辆内存来有效地进行通信。

"模板"是一个包含构建"车辆"和一个或多个"部分"的代码的Python文件。部分是一个Python类,用于包装车辆的功能组件。这些部分被添加到车辆中。部分可以从车辆的内存中获取值作为输入,并可以将值写入车辆的内存作为输出。当启动车辆循环时,部分按照它们被添加的顺序运行;从内存中获取输入并将结果输出到内存。这个循环会一直持续下去,直到车辆停止,然后所有的部分都会被关闭,模板退出。

模板

当您使用donkey createcar ...命令创建您的汽车应用程序时,根据文档中的创建Donkeycar应用程序部分的描述,实际上是将一些文件从donkeycar/templates文件夹复制到您的mycar文件夹中。我们需要讨论的两个文件是manage.pymyconfig.py

复制到mycar文件夹的文件是模板文件夹中一对模板文件的重命名版本。文件的选择基于您在createcar命令的--template参数中传递的模板名称;如果您没有传递任何内容,则默认为--template=complete。因此,donkey createcar --path=~/mycardonkey createcar --path=~/mycar --template=complete是相同的。在这种情况下,被重命名并复制到~/mycar/manage.py~/mycar/myconfig.py的文件分别是donkeycar/templates/complete.pydonkeycar/templates/cfg_complete.py。如果您通过传递--template=path_follow来创建路径跟随应用程序,则被复制的文件是donkeycar/templates/path_follow.pydonkeycar/templates/cfg_path_follow.py

实际上,donkeycar/template/cfg_xxxx.py的另一个副本会作为config.py复制到mycar文件夹中;这包含了默认配置,不应进行编辑。myconfig.py文件实际上是config.py的注释版本。要更改应用程序的配置(例如选择相机类型或驱动系统),请取消myconfig.py中您关心的部分的注释并进行编辑。

manage.py文件是真正运行您的汽车的地方。它组织成一个"车辆循环",以myconfig.py文件中的DRIVE_LOOP_HZ值指定的频率运行;这是车辆循环的"部分"更新的频率。Donkeycar的车辆循环是一个我们称之为"部分"的管道,它在我们称之为"内存"的哈希映射中获取和设置状态。

complete.py和path_follow.py模板相当复杂,因为它们非常可配置。然而,它们并没有任何特殊之处。您可以创建自己的模板来实现您想要的功能;或者您可以直接编写自己的manage.py,而无需创建或使用模板。以下是一个具有单个部分的车辆循环示例,该部分将接受一个数字,将其乘以一个随机数并返回结果。在车辆循环运行时,值将不断随机化。

import random

# the randomizer part
class RandPercent:
    def run(self, x):
        value = x * random.random()
        print(f"{x} -> {value}")
        return value

# create the vehicle and it's internal memory
V = dk.Vehicle()

# initialize the value in the vehicle's memory and give it a name
V.mem['var'] = 4

# add the part to read and write to the same value.
V.add(RandPercent(), inputs=['var'], outputs=['var'])

# start the vehicle loop running; quit after 5 loops
V.start(max_loops=5)

部件

部件是一个Python类,用于包装车辆的功能组件。

这些包括:

  • 传感器 - 摄像头、激光雷达、里程计、GPS等
  • 执行器 - 电机控制器
  • 驾驶员 - 车道检测器、行为克隆模型等
  • 控制器 - 基于Web或蓝牙的控制器
  • 存储器 - Tub,或者用于保存数据的方式

Tawn Kramer制作了一个视频(实际上是两个部分),介绍了如何创建一个部件。此外,还有一个在Arm AIoT会议上展示的演示视频,展示了如何创建OLED部件

每个部分都被构建,然后以其命名的输入和命名的输出以及可选的run_condition被添加到车辆循环中。车辆的部分(大部分情况下)按照它们被添加到车辆循环中的顺序执行。每次车辆循环运行时,部分的输入从车辆内存中读取,并传递给部分的run()方法,run()方法执行其工作,并将其返回值分配给输出值。如果存在run_condition,则只有在run_condition属性的值为True时才调用部分的run()方法;因此,如果run_condition属性的值为False,则该部分被“关闭”。

  • memory:车辆内存是一个命名值的哈希映射。它是车辆的“状态”。它包括用作输入、输出和条件的值。它由所有部分共享。
  • inputs:输入是传递给部分的run()方法的内存值;它们在将部分添加到车辆循环时声明。因此,对于aiLauncher的示例,当我们添加该部分时,我们包括参数inputs=['user/mode', 'pilot/throttle']。在调用run()方法之前,车辆循环查找输入值,然后将它们作为参数传递给部分的run()方法。因此,当调用aiLauncher部分的run()方法时,它将传递两个参数;第一个参数将是车辆内存中user/mode属性的值,第二个参数将是pilot/throttle属性的值。请注意,当添加部分时声明的输入数量必须与部分的run()方法中的参数数量相匹配,否则会导致运行时错误。
  • outputs:输出是由部分的run()方法返回的内存值;它们在将部分添加到车辆循环时声明。在调用部分的run()方法之后,返回值将分配给命名的输出属性。因此,对于aiLauncher的示例,当我们添加该部分时,我们包括参数outputs=['pilot/throttle']。当aiLauncher部分运行完成时,它将返回一个值,并将该值分配给车辆内存中的'pilot/throttle'属性。请注意,当添加部分时声明的输出数量必须与部分的run()方法中返回的值数量相匹配,否则会导致运行时错误。
  • run_conditionrun_condition是一个布尔型内存值,可用于决定是否调用部分的run()方法。如果条件为True,则调用部分的run()方法,否则不调用。这是一种打开和关闭部分的方法。例如,如果我们只希望在自动驾驶模式下运行aiLauncher,我们会维护一个命名的内存值,假设为'run_pilot',当以自动驾驶模式运行时为True,以用户(手动)模式运行时为False。然后,当将aiLauncher部分添加到车辆时,我们将run_condition='run_pilot'传递给V.add()方法。只有在命名的内存值'run_pilot'为True时,才会调用aiLauncher的run()方法。

您可以通过更改部件的输入属性的值来控制部件的操作方式。一个部件可以通过输出值(从而改变它们)来影响其他部件,这些值被其他部件用作输入或运行条件。

以下是添加一个部件的示例;AiLaunch部件在驾驶模式从手动驾驶切换到自动驾驶时覆盖油门;它用于在比赛开始时的短时间内提供高油门。在这种情况下,它没有明确的run_condition参数,因此默认为True。

    aiLauncher = AiLaunch(cfg.AI_LAUNCH_DURATION, cfg.AI_LAUNCH_THROTTLE, cfg.AI_LAUNCH_KEEP_ENABLED)
    V.add(aiLauncher,
          inputs=['user/mode', 'pilot/throttle'],
          outputs=['pilot/throttle'])

要实现这个“launch”,它需要知道当前的驾驶模式和当前的自动驾驶油门值;这些是它的输入。如果它不是在启动状态,则只是将油门值传递而不修改它,但当它处于启动状态时,它输出的油门值等于cfg.AI_LAUNCH_THROTTLE。因此,油门是它唯一的输出。该部件的run()方法必须按照正确的顺序接收这两个输入,并返回一个输出。您可以在部件的代码中看到这一点;

import time

class AiLaunch():
    '''
    This part will apply a large thrust on initial activation. This is to help
    in racing to start fast and then the ai will take over quickly when it's
    up to speed.
    '''

    def __init__(self, launch_duration=1.0, launch_throttle=1.0, keep_enabled=False):
        self.active = False
        self.enabled = False
        self.timer_start = None
        self.timer_duration = launch_duration
        self.launch_throttle = launch_throttle
        self.prev_mode = None
        self.trigger_on_switch = keep_enabled

    def enable_ai_launch(self):
        self.enabled = True
        print('AiLauncher is enabled.')

    def run(self, mode, ai_throttle):
        new_throttle = ai_throttle

        if mode != self.prev_mode:
            self.prev_mode = mode
            if mode == "local" and self.trigger_on_switch:
                self.enabled = True

        if mode == "local" and self.enabled:
            if not self.active:
                self.active = True
                self.timer_start = time.time()
            else:
                duration = time.time() - self.timer_start
                if duration > self.timer_duration:
                    self.active = False
                    self.enabled = False
        else:
            self.active = False

        if self.active:
            print('AiLauncher is active!!!')
            new_throttle = self.launch_throttle

        return new_throttle

在这个例子中,将配置值作为参数传递给部件的构造函数是很常见的做法。如果部件使用了一些硬件资源,例如摄像头或串口,那么它还应该有一个shutdown()函数,在停止Donkey车时正确释放这些资源。

正如我们所说的,每次运行车辆循环时都会调用部件的run()方法;输入值从车辆内存中读取,并作为参数传递给run()方法,该方法执行其工作,然后返回值被写入车辆内存作为输出。由于部件按照它们添加的顺序运行(大部分情况下),您可以看到您需要在任何需要该值作为输入的部件之前添加一个提供输出的部件。

线程化部件

线程化部件(Threaded Parts)是以其自己的线程运行,因此可以以自己的速率进行操作。线程化部件具有run_threaded()方法,而不是run()方法;输入值作为参数传递,返回值作为输出,与run()方法类似。与run()方法类似,run_threaded()方法在每次车辆循环运行时被调用一次。

因此,如果run_threaded()每次通过车辆循环时都会被调用,就像run()方法一样,并且它的输入和输出与非线程化部件一样组织,那么线程化部件和非线程化部件之间有什么区别呢?下面可以看到一个区别是,当添加线程化部件时,需要传递threaded=True。但是,线程化部件最重要的区别是它必须具有一个无参数的update()方法。当启动线程化部件时,将创建一个线程,并将部件的update()方法注册为在线程上执行的方法。update()方法将与车辆循环分开运行,并且它将以Python调度程序允许的速度运行;通常比车辆循环运行得快得多。在线程化部件中,update()方法不应该返回,直到该部件被告知shutdown();它应该运行一个循环,一遍又一遍地完成其工作,例如从像TFMini部件那样的设备读取数据。在线程化部件中,run_threaded()方法通常非常简单;它通常只是设置由update()方法使用的类属性,并返回由update()方法维护的类属性。

下面是向车辆循环添加线程化部件的示例。该部件通过串口与TF-Mini单波束激光雷达进行接口交互,并报告距离。该部件不接受输入参数,只输出距离值。请注意,参数inputs=[]实际上并不是必需的;这是输入的默认值,因此可以省略。

    if cfg.HAVE_TFMINI:
        from donkeycar.parts.tfmini import TFMini
        lidar = TFMini(port=cfg.TFMINI_SERIAL_PORT)
        V.add(lidar, inputs=[], outputs=['lidar/dist'], threaded=True)

以下是TFMini部件的代码:

class TFMini:
    """
    用于TFMini和TFMini-Plus距离传感器的类。
    请参考https://github.com/TFmini/TFmini-RaspberryPi上的接线和安装说明。

    返回以厘米为单位的距离。
    """

    def __init__(self, port="/dev/serial0", baudrate=115200, poll_delay=0.01, init_delay=0.1):
        self.ser = serial.Serial(port, baudrate)
        self.poll_delay = poll_delay

        self.dist = 0

        if not self.ser.is_open:
            self.ser.close() # 如果串口仍然打开,我们不希望再次打开它
            self.ser.open()

        self.logger = logging.getLogger(__name__)

        self.logger.info("初始化TFMini")
        time.sleep(init_delay)

    def update(self):
        while self.ser.is_open:
            self.poll()
            if self.poll_delay > 0:
                time.sleep(self.poll_delay)

    def poll(self):
        try:
            count = self.ser.in_waiting
            if count > 8:
                recv = self.ser.read(9)   
                self.ser.reset_input_buffer() 

                if recv[0] == 0x59 and recv[1] == 0x59:     
                    dist = recv[2] + recv[3] * 256
                    strength = recv[4] + recv[5] * 256

                    if strength > 0:
                        self.dist = dist

                    self.ser.reset_input_buffer()

        except Exception as e:
            self.logger.error(e)


    def run_threaded(self):
        return self.dist

    def run(self):
        self.poll()
        return self.dist

    def shutdown(self):
        self.ser.close()

注意:TFMini部件自己管理一个串口;建议使用SerialPort部件从串口读取面向行的数据,而不是在部件中管理串口。SerialPort部件可以处理串口的所有细节并输出结果数据;然后您的部件只需要将该数据作为输入并使用它。

在TFMini部件中,update()方法在串口保持打开状态时运行一个循环。串口在构造函数中打开,并在调用shutdown()方法时关闭。在线程化部件中,update()方法几乎就像一个无限循环,一遍又一遍地运行,只要Python给予它运行的时间。这部分代码可以比车辆循环运行得更快。

使用线程化部件的原因是,如果您的部件需要比车辆循环更快地运行,或者需要实时响应设备,那么update()方法中的循环将以Python解释器允许的速度运行,通常比车辆循环快得多。重要的是要理解,update()方法由部件的线程调用,但是run_threaded()方法由主车辆循环线程调用。这意味着这两个方法可能会在它们正在执行的过程中相互中断。

您应该使用适当的线程安全模式,如锁定,以确保数据更新、读取和其他关键代码段得到安全隔离和原子操作。在某些情况下,这需要使用锁来确保资源从线程安全地访问,或者多行代码的原子执行。值得记住的是,在Python中赋值是原子的(所以全局解释器锁定,GIL,有一个好处)。因此,虽然这个赋值操作不是原子的:

x = 12.34
y = 34.56
angle = 1.34

因为在这些赋值之间,您的代码可能会被中断。但这个赋值操作是原子的:

pose = (12.34, 34.56, 1.34)

因此,如果您有可能在线程中发生变化的聚合内部状态,那么将其放入元组中,您可以原子地读取和写入它,而无需使用锁。