如何构建自己的模型


注意: 这需要版本 >= 4.1.X


概述

您可能希望编写自己的模型:

  • 如果您发现Donkey附带的模型不够用,并且想要尝试自己的模型基础架构
  • 如果您想要为模型添加更多的输入数据,因为您的汽车有更多的传感器

构造函数

模型位于 donkeycar/parts/keras.py 中。您自己的模型需要继承自 KerasPilot 并初始化您的模型:

class KerasSensors(KerasPilot):
    def __init__(self, input_shape=(120, 160, 3), num_sensors=2):
        super().__init__()
        self.num_sensors = num_sensors
        self.model = self.create_model(input_shape)

在这里,您需要在成员函数 create_model() 中实现 Keras 模型。 模型需要有标记的输入和输出张量。这些对于训练是必需的。

训练接口

为了使您的模型正常工作,需要以下函数:

def compile(self):
    self.model.compile(optimizer=self.optimizer, metrics=['accuracy'],
                       loss={'angle_out': 'categorical_crossentropy',
                             'throttle_out': 'categorical_crossentropy'},
                       loss_weights={'angle_out': 0.5, 'throttle_out': 0.5})

compile 函数告诉 Keras 如何定义训练的损失函数。我们以 KerasCategorical 模型为例。这里的损失函数明确使用了模型的输出张量(angle_out, throttle_out)。

def x_transform(self, record: TubRecord):
    img_arr = record.image(cached=True)
    return img_arr

在这个函数中,您定义了如何从记录的数据中提取输入数据。这些数据通常在机器学习框架中称为 X。 我们在基类中展示了适用于只有图像作为输入的所有模型的实现。

如果模型只有一个输入,该函数返回一个数据项。如果模型使用了更多的输入数据,则需要返回一个元组。

注意:如果您的模型有更多的输入,元组中的图像需要放在第一个位置。

def y_transform(self, record: TubRecord):
    angle: float = record.underlying['user/angle']
    throttle: float = record.underlying['user/throttle']
    return angle, throttle

在这个函数中,您指定了如何从记录的数据中提取 y 值(即目标值)。

def x_translate(self, x: XY) -> Dict[str, Union[float, np.ndarray]]:
    return {'img_in': x}

在这里,我们需要将上面提取的 X 值传递给 tf.data。注意,如果模型有多个输入变量,tf.data 需要一个字典作为输入。 为了保持一致性,我们选择在只有一个参数的情况下也使用字典。上面我们展示了基类中适用于只有图像作为输入的所有模型的实现。 如果您的模型只使用图像作为输入数据,则不需要重写 x_transformx_translate

注意:字典的键必须与模型中的输入层的名称相匹配。

def y_translate(self, y: XY) -> Dict[str, Union[float, np.ndarray]]:
    if isinstance(y, tuple):
        angle, throttle = y
        return {'angle_out': angle, 'throttle_out': throttle}
    else:
        raise TypeError('Expected tuple')

类似于上面的函数,这提供了将 y 数据转换为 tf.data 所需的字典的方法。这个示例展示了 KerasLinear 的实现方式。

注意:字典的键必须与模型中的输出层的名称相匹配。

def output_shapes(self):
    # need to cut off None from [None, 120, 160, 3] tensor shape
    img_shape = self.get_input_shape()[1:]
    shapes = ({'img_in': tf.TensorShape(img_shape)},
              {'angle_out': tf.TensorShape([15]),
               'throttle_out': tf.TensorShape([20])})
    return shapes

这个函数返回一个包含两个字典的元组,告诉 TensorFlow 模型中使用的形状。这里展示了 KerasCategorical 模型的示例。

注意 1:与上面一样,这两个字典的键必须与模型中的输入输出层的名称相匹配。

注意 2:当模型返回标量数字时,相应的类型必须为 tf.TensorShape([])

零件接口

在汽车应用中,模型通过 run() 函数调用。在基类中已经提供了该函数,在其中进行了输入图像的归一化处理。而派生类需要实现 inference() 函数,该函数在归一化后的数据上进行操作。如果您还有其他需要进行归一化处理的数据,可能还需要重写 run() 函数。

def inference(self, img_arr, other_arr):
    img_arr = img_arr.reshape((1,) + img_arr.shape)
    outputs = self.model.predict(img_arr)
    steering = outputs[0]
    throttle = outputs[1]
    return steering[0][0], throttle[0][0]

这里展示了线性模型的实现。请注意,输入张量的形状总是在第一个位置包含批次维度,因此输入图像的形状从 (120, 160, 3) 调整为 (1, 120, 160, 3)

注意:如果您在 other_arr 变量中传递了另一个数组,则需要进行类似的重新调整形状的操作。

示例

让我们构建一个基于标准线性模型的新的donkey模型,但在输入数据和网络设计方面有以下更改:

  1. 该模型接收一个额外的输入数据向量,表示连接到汽车前部的距离传感器的一组值。

  2. 该模型添加了几个前馈层,将视觉系统的CNN层与距离传感器数据相结合。

使用Keras构建模型

下面是示例模型的代码(使用Keras):

class KerasSensors(KerasPilot):
    def __init__(self, input_shape=(120, 160, 3), num_sensors=2):
        super().__init__()
        self.num_sensors = num_sensors
        self.model = self.create_model(input_shape)

    def create_model(self, input_shape):
        drop = 0.2
        img_in = Input(shape=input_shape, name='img_in')
        x = core_cnn_layers(img_in, drop)
        x = Dense(100, activation='relu', name='dense_1')(x)
        x = Dropout(drop)(x)
        x = Dense(50, activation='relu', name='dense_2')(x)
        x = Dropout(drop)(x)
        # 到这里,这是标准线性模型,现在我们添加传感器数据
        sensor_in = Input(shape=(self.num_sensors, ), name='sensor_in')
        y = sensor_in
        z = concatenate([x, y])
        # 在这里我们添加两个额外的密集层
        z = Dense(50, activation='relu', name='dense_3')(z)
        z = Dropout(drop)(z)
        z = Dense(50, activation='relu', name='dense_4')(z)
        z = Dropout(drop)(z)
        # 角度和油门的两个输出
        outputs = [
            Dense(1, activation='linear', name='n_outputs' + str(i))(z)
            for i in range(2)]

        # 模型需要在这里指定额外的输入
        model = Model(inputs=[img_in, sensor_in], outputs=outputs)
        return model

    def compile(self):
        self.model.compile(optimizer=self.optimizer, loss='mse')

    def inference(self, img_arr, other_arr):
        img_arr = img_arr.reshape((1,) + img_arr.shape)
        sens_arr = other_arr.reshape((1,) + other_arr.shape)
        outputs = self.model.predict([img_arr, sens_arr])
        steering = outputs[0]
        throttle = outputs[1]
        return steering[0][0], throttle[0][0]

    def x_transform(self, record: TubRecord) -> XY:
        img_arr = super().x_transform(record)
        # 为简单起见,假设这里的传感器数据已经归一化
        sensor_arr = np.array(record.underlying['sensor'])
        # 需要先返回图像数据
        return img_arr, sensor_arr

    def x_translate(self, x: XY) -> Dict[str, Union[float, np.ndarray]]:
        assert isinstance(x, tuple), '需要输入元组'
        # 键是模型的输入层名称
        return {'img_in': x[0], 'sensor_in': x[1]}

    def y_transform(self, record: TubRecord):
        angle: float = record.underlying['user/angle']
        throttle: float = record.underlying['user/throttle']
        return angle, throttle

    def y_translate(self, y: XY) -> Dict[str, Union[float, np.ndarray]]:
        if isinstance(y, tuple):
            angle, throttle = y
            # 键是模型的输出层名称
            return {'n_outputs0': angle, 'n_outputs1': throttle}
        else:
            raise TypeError('期望元组')

    def output_shapes(self):
        # 需要从[None, 120, 160, 3]张量形状中去掉None
        img_shape = self.get_input_shape()[1:]
        # 键需要与模型的输入/输出层匹配
        shapes = ({'img_in': tf.TensorShape(img_shape),
                   'sensor_in': tf.TensorShape([self.num_sensors])},
                  {'n_outputs0': tf.TensorShape([]),
                   'n_outputs1': tf.TensorShape([])})
        return shapes

我们可以继承自KerasLinear,该类已经提供了y_transform()y_translate()compile()的实现。然而,为了明确表示一般情况,我们在这里实现了所有的函数。该模型要求传感器数据是一个带有键"sensor"的TubRecord中的数组。

创建一个Tub

由于我们没有带有传感器数据的Tub,让我们创建一个带有虚假传感器条目的Tub:

import os
import tarfile
import numpy as np
from donkeycar.parts.tub_v2 import Tub
from donkeycar.pipeline.types import TubRecord
from donkeycar.config import load_config


if __name__ == '__main__':
    # 将路径更改为您的car app路径
    my_car = os.path.expanduser('~/mycar')
    cfg = load_config(os.path.join(my_car, 'config.py'))
    # 将路径更改为donkey项目路径
    tar = tarfile.open(os.path.expanduser(
        '~/Python/donkeycar/donkeycar/tests/tub/tub.tar.gz'))
    tub_parent = os.path.join(my_car, 'data2/')
    tar.extractall(tub_parent)
    tub_path = os.path.join(tub_parent, 'tub')
    tub1 = Tub(tub_path)
    tub2 = Tub(os.path.join(my_car, 'data2/tub_sensor'),
               inputs=['cam/image_array', 'user/angle', 'user/throttle',
                       'sensor'],
               types=['image_array', 'float', 'float', 'list'])

    for record in tub1:
        t_record = TubRecord(config=cfg,
                             base_path=tub1.base_path,
                             underlying=record)
        img_arr = t_record.image(cached=False)
        record['sensor'] = list(np.random.uniform(size=2))
        record['cam/image_array'] = img_arr
        tub2.write_record(record)

使模型可用

由于我们还没有动态工厂,所以我们需要将新模型添加到donkeycar/utils.py模块中的get_model_by_type()函数中:

...
elif model_type == 'sensor':
    kl = KerasSensors(input_shape=input_shape)
...

开始训练

现在,在您的car app文件夹中,以下命令应该可以工作: donkey train --tub data2/tub_sensor --model models/pilot.h5 --type sensor 由于数据中的随机值,模型不会很快收敛,这里的目标是使其在框架中正常工作。