时间轴

时间轴

2026-06-22

init


linux 5.10.256

rpmsg 结构

rpmsg 有三层:

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────────┐
│ 业务层:rpmsg_driver │ ← 写业务的地方
│ (rpmsg_tty / rpmsg_chrdev / 自定义) │
├─────────────────────────────────────────────────────────────┤
│ rpmsg 总线 / core:rpmsg_core.c │ ← 匹配 device 和 driver
│ rpmsg_device / rpmsg_endpoint │
├─────────────────────────────────────────────────────────────┤
│ transport 后端:virtio_rpmsg_bus.c │ ← 本博客重点分析的代码
│ (virtio_driver,搬运消息、造 channel) │
├─────────────────────────────────────────────────────────────┤
│ virtio 总线 / vring │
└─────────────────────────────────────────────────────────────┘

virtio_rpmsg_bus.c 在中下层,它的职责是:

  1. 接管 virtio rpmsg 设备
  2. 用 vring 收发消息
  3. 根据 name service 创建/销毁 rpmsg_device

它生产 rpmsg_device,但不消费。消费者是 rpmsg_driver

module_init/module_exit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static struct virtio_device_id id_table[] = {
{ VIRTIO_ID_RPMSG, VIRTIO_DEV_ANY_ID },
{ 0 },
};

static unsigned int features[] = {
VIRTIO_RPMSG_F_NS,
};

static struct virtio_driver virtio_ipc_driver = {
.feature_table = features,
.feature_table_size = ARRAY_SIZE(features),
.driver.name = KBUILD_MODNAME,
.driver.owner = THIS_MODULE,
.id_table = id_table,
.probe = rpmsg_probe,
.remove = rpmsg_remove,
};

static int __init rpmsg_init(void)
{
int ret;

ret = register_virtio_driver(&virtio_ipc_driver);
if (ret)
pr_err("failed to register virtio driver: %d\n", ret);

return ret;
}
subsys_initcall(rpmsg_init);

static void __exit rpmsg_fini(void)
{
unregister_virtio_driver(&virtio_ipc_driver);
}
module_exit(rpmsg_fini);

MODULE_DEVICE_TABLE(virtio, id_table);
MODULE_DESCRIPTION("Virtio-based remote processor messaging bus");
MODULE_LICENSE("GPL v2");

virtio_rpmsg_bus.c 是 Linux 内核里 基于 virtio 的 rpmsg 总线驱动。它的作用是:

让 Linux 主核和远端处理器之间,通过 virtio vring 机制收发 rpmsg 消息,并把远端服务抽象成 Linux 的 rpmsg_device,供上层 rpmsg driver 绑定使用。
这个文件本身是 rpmsg bus 的 virtio transport 实现,它不关心具体业务协议,比如音频、传感器、TEE、MCU 控制等;它只负责把消息送到对应 endpoint。

基本通信模型

rpmsg

rpmsg 的通信单元是:

  • rpmsg_device rpmsg channel device, 一条通信通道(channel),是“设备”
  • rpmsg_driver 处理这条通道业务的驱动
  • rpmsg_endpoint 通道上的实际收发端点(地址 + callback)

三者之间关系:

1
2
3
4
5
rpmsg_driver  ←—— 匹配 ——→  rpmsg_device
|
| 持有
v
rpmsg_endpoint

rpmsg_device 就是中间那个“被驱动管理的设备”。当创建rpmsg_device时会触发 probe 函数进行匹配,匹配成功后调用rpmsg_driver->probe函数

struct rpmsg_device

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct rpmsg_device
├── u32 src # 本地地址
├── u32 dst # 远端地址
├── bool announce # 是否需要向远端 announce 这条 channel
├── struct device dev # 内嵌 struct device,接入 Linux 设备模型
├── struct rpmsg_device_id id; # channel 名字等 id 信息,用于和 driver 匹配
├── const char *driver_override; # 强制指定某个 driver
├── const struct rpmsg_device_ops *ops; # rpmsg_device 的ops
└── struct rpmsg_endpoint *ept
├── u32 addr
├── void *priv
├── rpmsg_rx_cb_t cb
├── struct mutex cb_lock
├── struct kref refcount
├── struct rpmsg_device *rpdev
└── const struct rpmsg_endpoint_ops *ops

其中struct rpmsg_device定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* rpmsg_device - device that belong to the rpmsg bus
* @dev: the device struct
* @id: device id (used to match between rpmsg drivers and devices)
* @driver_override: driver name to force a match; do not set directly,
* because core frees it; use driver_set_override() to
* set or clear it.
* @src: local address
* @dst: destination address
* @ept: the rpmsg endpoint of this channel
* @announce: if set, rpmsg will announce the creation/removal of this channel
*/
struct rpmsg_device {
struct device dev;
struct rpmsg_device_id id;
const char *driver_override;
u32 src;
u32 dst;
struct rpmsg_endpoint *ept;
bool announce;

const struct rpmsg_device_ops *ops;
};

struct rpmsg_device_ops 表示一个 rpmsg_device 的 operation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* struct rpmsg_device_ops - indirection table for the rpmsg_device operations
* @create_ept: create backend-specific endpoint, required
* @announce_create: announce presence of new channel, optional
* @announce_destroy: announce destruction of channel, optional
*
* Indirection table for the operations that a rpmsg backend should implement.
* @announce_create and @announce_destroy are optional as the backend might
* advertise new channels implicitly by creating the endpoints.
*/
struct rpmsg_device_ops {
struct rpmsg_endpoint *(*create_ept)(struct rpmsg_device *rpdev,
rpmsg_rx_cb_t cb, void *priv,
struct rpmsg_channel_info chinfo);

int (*announce_create)(struct rpmsg_device *ept);
int (*announce_destroy)(struct rpmsg_device *ept);
};

一个 rpmsg_device 等价于一条逻辑通道,比如:

  • channel name: “rpmsg-demo”
  • src: 本地 endpoint 地址
  • dst: 远端 endpoint 地址

virtio_rpmsg_bus.c 里,channel 是这样创建的(rpmsg_create_channel()):

1
2
3
4
rpdev->src = chinfo->src;
rpdev->dst = chinfo->dst;
rpdev->ops = &virtio_rpmsg_ops;
strncpy(rpdev->id.name, chinfo->name, RPMSG_NAME_SIZE);

也就是说:远端 announce 一个服务 “rpmsg-demo”, Linux 创建一个 rpmsg_device 代表这条 channel,或者本地主动创建一个服务"rpmsg-demo"即创建一个 rpmsg_device 代表本地channel, announce之后,由NS服务远端也创建"rpmsg-demo"的channel, 本地和远端的 rpmsg_device 的 src 和 dst 截然相反,通过这条通道进行通信。

实际上virtio_rpmsg_bus.c中抽象的rpmsg channel descriptor为

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* struct virtio_rpmsg_channel - rpmsg channel descriptor
* @rpdev: the rpmsg channel device
* @vrp: the virtio remote processor device this channel belongs to
*
* This structure stores the channel that links the rpmsg device to the virtio
* remote processor device.
*/
struct virtio_rpmsg_channel {
struct rpmsg_device rpdev;

struct virtproc_info *vrp;
};

即一个rpmsg_device还要加上核心数据结构struct virtproc_info *vrp,这是 virtio 实现 rpmsg bus 的关键数据结构

struct rpmsg_endpoint

rpmsg_device就像 一根电话线 + 一个分机号服务 而 rpmsg_endpoint 代表实际接听电话的人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* struct rpmsg_endpoint - binds a local rpmsg address to its user
* @rpdev: rpmsg channel device
* @refcount: when this drops to zero, the ept is deallocated
* @cb: rx callback handler
* @cb_lock: must be taken before accessing/changing @cb
* @addr: local rpmsg address
* @priv: private data for the driver's use
*
* In essence, an rpmsg endpoint represents a listener on the rpmsg bus, as
* it binds an rpmsg address with an rx callback handler.
*
* Simple rpmsg drivers shouldn't use this struct directly, because
* things just work: every rpmsg driver provides an rx callback upon
* registering to the bus, and that callback is then bound to its rpmsg
* address when the driver is probed. When relevant inbound messages arrive
* (i.e. messages which their dst address equals to the src address of
* the rpmsg channel), the driver's handler is invoked to process it.
*
* More complicated drivers though, that do need to allocate additional rpmsg
* addresses, and bind them to different rx callbacks, must explicitly
* create additional endpoints by themselves (see rpmsg_create_ept()).
*/
struct rpmsg_endpoint {
struct rpmsg_device *rpdev;
struct kref refcount;
rpmsg_rx_cb_t cb;
struct mutex cb_lock;
u32 addr;
void *priv;

const struct rpmsg_endpoint_ops *ops;
};

rpmsg_endpoint相关逻辑主要在 rpmsg_core.c中,其中最重要的是rpmsg_rx_cb_t cbconst struct rpmsg_endpoint_ops *ops这两个成员:

1
typedef int (*rpmsg_rx_cb_t)(struct rpmsg_device *, void *, int, void *, u32);

该函数表示该ept收到消息后触发的回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* struct rpmsg_endpoint_ops - indirection table for rpmsg_endpoint operations
* @destroy_ept: see @rpmsg_destroy_ept(), required
* @send: see @rpmsg_send(), required
* @sendto: see @rpmsg_sendto(), optional
* @send_offchannel: see @rpmsg_send_offchannel(), optional
* @trysend: see @rpmsg_trysend(), required
* @trysendto: see @rpmsg_trysendto(), optional
* @trysend_offchannel: see @rpmsg_trysend_offchannel(), optional
* @poll: see @rpmsg_poll(), optional
*
* Indirection table for the operations that a rpmsg backend should implement.
* In addition to @destroy_ept, the backend must at least implement @send and
* @trysend, while the variants sending data off-channel are optional.
*/
struct rpmsg_endpoint_ops {
void (*destroy_ept)(struct rpmsg_endpoint *ept);

int (*send)(struct rpmsg_endpoint *ept, void *data, int len);
int (*sendto)(struct rpmsg_endpoint *ept, void *data, int len, u32 dst);
int (*send_offchannel)(struct rpmsg_endpoint *ept, u32 src, u32 dst,
void *data, int len);

int (*trysend)(struct rpmsg_endpoint *ept, void *data, int len);
int (*trysendto)(struct rpmsg_endpoint *ept, void *data, int len, u32 dst);
int (*trysend_offchannel)(struct rpmsg_endpoint *ept, u32 src, u32 dst,
void *data, int len);
__poll_t (*poll)(struct rpmsg_endpoint *ept, struct file *filp,
poll_table *wait);
};

表示 rpmsg_endpoint 的 operation 函数。

endpoint 创建

核心函数是:__rpmsg_create_ept(),它是rpmsg_device的operation函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* for more info, see below documentation of rpmsg_create_ept() */
static struct rpmsg_endpoint *__rpmsg_create_ept(struct virtproc_info *vrp,
struct rpmsg_device *rpdev,
rpmsg_rx_cb_t cb,
void *priv, u32 addr)
{
int id_min, id_max, id;
struct rpmsg_endpoint *ept;
struct device *dev = rpdev ? &rpdev->dev : &vrp->vdev->dev;

ept = kzalloc(sizeof(*ept), GFP_KERNEL);
if (!ept)
return NULL;

kref_init(&ept->refcount);
mutex_init(&ept->cb_lock);

ept->rpdev = rpdev;
ept->cb = cb;
ept->priv = priv;
ept->ops = &virtio_endpoint_ops;

/* do we need to allocate a local address ? */
if (addr == RPMSG_ADDR_ANY) {
id_min = RPMSG_RESERVED_ADDRESSES;
id_max = 0;
} else {
id_min = addr;
id_max = addr + 1;
}

mutex_lock(&vrp->endpoints_lock);

/* bind the endpoint to an rpmsg address (and allocate one if needed) */
id = idr_alloc(&vrp->endpoints, ept, id_min, id_max, GFP_KERNEL);
if (id < 0) {
dev_err(dev, "idr_alloc failed: %d\n", id);
goto free_ept;
}
ept->addr = id;

mutex_unlock(&vrp->endpoints_lock);

return ept;

free_ept:
mutex_unlock(&vrp->endpoints_lock);
kref_put(&ept->refcount, __ept_release);
return NULL;
}

static struct rpmsg_endpoint *virtio_rpmsg_create_ept(struct rpmsg_device *rpdev,
rpmsg_rx_cb_t cb,
void *priv,
struct rpmsg_channel_info chinfo)
{
struct virtio_rpmsg_channel *vch = to_virtio_rpmsg_channel(rpdev);

return __rpmsg_create_ept(vch->vrp, rpdev, cb, priv, chinfo.src);
}
  1. 分配 struct rpmsg_endpoint
  2. 初始化引用计数和 callback lock
  3. 记录 callback、private data、ops
  4. 给 endpoint 分配本地地址
  5. 插入 vrp->endpoints 这个 idr

关键逻辑:

1
2
3
4
5
6
7
if (addr == RPMSG_ADDR_ANY) {
id_min = RPMSG_RESERVED_ADDRESSES;
id_max = 0;
} else {
id_min = addr;
id_max = addr + 1;
}
  • 如果调用方不指定地址,则动态分配
  • 动态地址从 1024 开始
  • 0 ~ 1023 保留给预定义服务
    而保留地址定义:
1
#define RPMSG_RESERVED_ADDRESSES	(1024)

endpoint 地址通过 idr_alloc() 分配:

1
id = idr_alloc(&vrp->endpoints, ept, id_min, id_max, GFP_KERNEL);
endpoint 销毁

核心函数:__rpmsg_destroy_ept(),它是rpmsg_endpoint的operation函数

  1. idr 中删除 endpoint
  2. 把 callback 设置成 NULL
  3. 减引用计数,必要时释放 endpoint

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* __rpmsg_destroy_ept() - destroy an existing rpmsg endpoint
* @vrp: virtproc which owns this ept
* @ept: endpoing to destroy
*
* An internal function which destroy an ept without assuming it is
* bound to an rpmsg channel. This is needed for handling the internal
* name service endpoint, which isn't bound to an rpmsg channel.
* See also __rpmsg_create_ept().
*/
static void
__rpmsg_destroy_ept(struct virtproc_info *vrp, struct rpmsg_endpoint *ept)
{
/* make sure new inbound messages can't find this ept anymore */
mutex_lock(&vrp->endpoints_lock);
idr_remove(&vrp->endpoints, ept->addr);
mutex_unlock(&vrp->endpoints_lock);

/* make sure in-flight inbound messages won't invoke cb anymore */
mutex_lock(&ept->cb_lock);
ept->cb = NULL;
mutex_unlock(&ept->cb_lock);

kref_put(&ept->refcount, __ept_release);
}

这里特别注意并发:

  • 删除 idr:防止新的 RX 消息找到这个 endpoint
  • 设置 cb = NULL:防止已经拿到 endpoint 的 in-flight RX 调用 callback
  • kref:防止 RX 路径正在使用 endpoint 时被释放

struct virtproc_info

核心数据结构:struct virtproc_info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct virtproc_info {
struct virtio_device *vdev;
struct virtqueue *rvq, *svq;
void *rbufs, *sbufs;
unsigned int num_bufs;
unsigned int buf_size;
int last_sbuf;
dma_addr_t bufs_dma;
struct mutex tx_lock;
struct idr endpoints;
struct mutex endpoints_lock;
wait_queue_head_t sendq;
atomic_t sleepers;
struct rpmsg_endpoint *ns_ept;
};

这是整个 virtio rpmsg 设备的私有状态,挂在vdev->priv = vrp 中, 按功能划分

  • virtio 基础:
    • vdev 底层 virtio 设备
  • 收发通道:
    • rvq, svq RX/TX virtqueue
  • buffer 管理:
    • rbufs, sbufs RX/TX buffer 虚拟地址
    • num_bufs 总 buffer 数
    • buf_size 单 buffer 大小
    • last_sbuf TX 开荒游标
    • bufs_dma buffer DMA 基址
  • 发送同步:
    • tx_lock 保护 svq/sbufs/sleepers
    • sendq 等待 TX buffer 的等待队列
    • sleepers 等待者计数(控制 tx-complete 中断开关)
  • endpoint 管理:
    • endpoints endpoint idr(按地址查找)
    • endpoints_lock 保护 endpoint 表
  • name service:
    • ns_ept 名字服务 endpoint(addr 53)

struct virtio_device *vdev

作用:指向底层的 virtio 设备。它是这个 rpmsg 实例的根。通过它可以拿到:
- vdev->dev 设备节点(dev_err/dev_dbg 用)
- vdev->config virtio 配置操作(reset/del_vqs 等)
- vdev->priv 反向指回 vrp 自己
probe 里设置:

1
2
3
vrp->vdev = vdev;
...
vdev->priv = vrp;

callback 里就靠它取回 vrp:

1
struct virtproc_info *vrp = rvq->vdev->priv;

struct virtqueue *rvq, *svq

作用:

  • rvq = receive virtqueue:远端 -> Linux
  • svq = send virtqueue: Linux -> 远端
    这是收发的两条通道。probe 里:
1
2
vrp->rvq = vqs[0];   /* input  */
vrp->svq = vqs[1]; /* output */
  • 接收消息:virtqueue_get_buf(rvq) / virtqueue_add_inbuf(rvq)
  • 发送消息:virtqueue_add_outbuf(svq) / virtqueue_get_buf(svq)

void *rbufs, *sbufs

作用:

  • rbufs:RX buffer 区域的内核虚拟地址起点

  • sbufs:TX buffer 区域的内核虚拟地址起点

    probe 里把一整块 DMA 内存切两半:

1
2
vrp->rbufs = bufs_va;                     /* 前半 RX */
vrp->sbufs = bufs_va + total_buf_space / 2; /* 后半 TX */

如图

1
2
3
bufs_va
├── rbufs:RX[0] RX[1] ...
└── sbufs:TX[0] TX[1] ...

get_a_tx_buf() 就是从 sbufs 上按索引取 TX buffer:

1
ret = vrp->sbufs + vrp->buf_size * vrp->last_sbuf++;

unsigned int num_bufs

作用:RX + TX 的 buffer 总数(两者各占一半)。probe 里根据 vring 大小计算:

1
2
3
4
if (virtqueue_get_vring_size(vrp->rvq) < MAX_RPMSG_NUM_BUFS / 2)
vrp->num_bufs = virtqueue_get_vring_size(vrp->rvq) * 2;
else
vrp->num_bufs = MAX_RPMSG_NUM_BUFS; /* 512 */

所以:

1
2
RX buffer 数 = num_bufs / 2
TX buffer 数 = num_bufs / 2

多处都用 num_bufs / 2 作为边界,例如 get_a_tx_buf()

1
if (vrp->last_sbuf < vrp->num_bufs / 2)

unsigned int buf_size

作用:单个 buffer 的字节数,当前固定 512(MAX_RPMSG_BUF_SIZE)。

probe函数中

1
vrp->buf_size = MAX_RPMSG_BUF_SIZE;

它决定:

  • 每个 buffer 的地址步长:sbufs + buf_size * i
  • 单条消息 payload 上限
1
2
if (len > vrp->buf_size - sizeof(struct rpmsg_hdr))
return -EMSGSIZE;

int last_sbuf

作用:TX buffer 的“开荒游标”,记录初次顺序分配到第几个。在 get_a_tx_buf() 里用:

1
2
3
4
if (vrp->last_sbuf < vrp->num_bufs / 2)
ret = vrp->sbufs + vrp->buf_size * vrp->last_sbuf++;
else
ret = virtqueue_get_buf(vrp->svq, &len);
  • last_sbuf < num_bufs/2:还有没用过的新 TX buffer,按序拿
  • 否则:所有 TX buffer 都用过,改从 used ring 回收复用
    一旦达到上限,last_sbuf 就固定不再增长,后续全靠 virtqueue_get_buf() 回收。

dma_addr_t bufs_dma

作用:整块 buffer 的 DMA 基地址(设备/DMA 视角的地址)。probe 里由 dma_alloc_coherent() 返回:

1
2
bufs_va = dma_alloc_coherent(vdev->dev.parent,
total_buf_space, &vrp->bufs_dma, GFP_KERNEL);

rbufs/sbufs 是 CPU 用的虚拟地址,bufs_dma 是释放和 DMA 映射时用的物理/总线地址。remove 时用它释放:

1
2
dma_free_coherent(vdev->dev.parent, total_buf_space,
vrp->rbufs, vrp->bufs_dma);

struct mutex tx_lock

作用:保护发送侧的共享状态:svq、sbufs、sleepers。允许多个发送者并发调用 rpmsg_send()。用 mutex 是因为发送可能要唤醒在“打盹”的远端处理器,这个过程可能睡眠,所以必须用可睡眠的 mutex。

struct idr endpoints

作用:本地所有 endpoint 的 idr 索引表,key 是 endpoint 地址。用于快速按地址查找 endpoint。这是消息分发的核心数据结构。创建 endpoint 时分配地址并插入:

1
2
id = idr_alloc(&vrp->endpoints, ept, id_min, id_max, GFP_KERNEL);
ept->addr = id;

接收消息时按 dst 查找:

1
ept = idr_find(&vrp->endpoints, virtio32_to_cpu(vrp->vdev, msg->dst));

所以 rpmsg 实际是 address-based dispatch,靠的就是这个 idr。

struct mutex endpoints_lock

作用:保护 endpoints idr 的并发访问。增删查改 endpoint 时都要持有:

1
2
3
4
5
mutex_lock(&vrp->endpoints_lock);
ept = idr_find(&vrp->endpoints, ...);
if (ept)
kref_get(&ept->refcount); /* 查到后先加引用,防止释放 */
mutex_unlock(&vrp->endpoints_lock);

注意它和 tx_lock 是两把不同的锁:
- tx_lock 保护发送路径
- endpoints_lock 保护 endpoint 表

wait_queue_head_t sendq

作用:等待 TX buffer 的发送者睡在这个等待队列上。没有 TX buffer 时,rpmsg_send() 睡眠等待:

1
2
3
wait_event_interruptible_timeout(vrp->sendq,
(msg = get_a_tx_buf(vrp)),
msecs_to_jiffies(15000));

远端消费完 TX buffer 后,TX complete callback 唤醒它:

1
2
3
4
5
static void rpmsg_xmit_done(struct virtqueue *svq)
{
struct virtproc_info *vrp = svq->vdev->priv;
wake_up_interruptible(&vrp->sendq);
}

atomic_t sleepers

作用:当前有多少个发送者正在等待 TX buffer(等待者计数)。它配合 TX complete 中断的动态开关:

1
2
3
4
5
6
7
/* 第一个等待者:打开 tx-complete callback */
if (atomic_inc_return(&vrp->sleepers) == 1)
virtqueue_enable_cb(vrp->svq);

/* 最后一个等待者:关闭 tx-complete callback */
if (atomic_dec_and_test(&vrp->sleepers))
virtqueue_disable_cb(vrp->svq);

设计目的:

  • 没人等 TX buffer:关闭 tx-complete 中断,省开销
  • 有人等 TX buffer:打开 tx-complete 中断,远端归还 buffer 后立即唤醒

(这是 5.10 的设计,Linux 7.1.1 版本的因为引入 poll 支持已去掉 sleepers。)

struct rpmsg_endpoint *ns_ept

作用:name service 专用 endpoint(固定地址 53)。它不属于任何普通 rpmsg channel,是 bus 内部用来处理远端服务“创建/销毁”通知的。probe 里在远端支持 NS feature 时创建:

1
2
3
4
if (virtio_has_feature(vdev, VIRTIO_RPMSG_F_NS)) {
vrp->ns_ept = __rpmsg_create_ept(vrp, NULL, rpmsg_ns_cb,
vrp, RPMSG_NS_ADDR);
}

它的 callback 是 rpmsg_ns_cb(),负责按 NS 消息创建/销毁 rpmsg_device。remove 时单独销毁:

1
2
if (vrp->ns_ept)
__rpmsg_destroy_ept(vrp, vrp->ns_ept);

注意创建时 rpdev 传 NULL,因为它不绑定具体 channel。

buffer 设计

驱动使用固定大小 buffer。相关定义:

1
2
#define MAX_RPMSG_NUM_BUFS	(512)
#define MAX_RPMSG_BUF_SIZE (512)
  • 最多 512 个 buffer
  • 一半 RX,一半 TX
  • 每个 buffer 512 字节
  • 最大总内存 512 * 512 = 256 KiB

也就是说:

1
2
3
4
5
6
7
8
9
10
总 buffer 区域
+------------------------+ <-- vrp->rbufs = bufs_va = dma_alloc_coherent(vdev->dev.parent,
| | total_buf_space, &vrp->bufs_dma,GFP_KERNEL);
| RX Buffer (前一半) |
| |
+------------------------+ <-- vrp->sbufs = bufs_va + total_buf_space / 2;
| |
| TX Buffer (后一半) |
| |
+------------------------+ <-- vrp->rbufs + total_buf_space

在 probe 里分配:

1
2
3
bufs_va = dma_alloc_coherent(vdev->dev.parent,
total_buf_space, &vrp->bufs_dma,
GFP_KERNEL);

然后切分:

1
2
vrp->rbufs = bufs_va;
vrp->sbufs = bufs_va + total_buf_space / 2;

发送的每条消息都有一个公共头:

1
2
3
4
5
6
7
8
struct rpmsg_hdr {
__virtio32 src;
__virtio32 dst;
__virtio32 reserved;
__virtio16 len;
__virtio16 flags;
u8 data[];
} __packed;

含义:

字段 含义
src 源 endpoint 地址
dst 目标 endpoint 地址
reserved 保留
len payload 长度
flags 消息标志
data[] 实际业务数据

Linux 收到消息后,会根据 dst 地址找到本地 endpoint,然后调用该 endpoint 的 callback。

get_a_tx_buf()

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* super simple buffer "allocator" that is just enough for now */
static void *get_a_tx_buf(struct virtproc_info *vrp)
{
unsigned int len;
void *ret;

/* support multiple concurrent senders */
mutex_lock(&vrp->tx_lock);

4 /*
* either pick the next unused tx buffer
* (half of our buffers are used for sending messages)
*/
if (vrp->last_sbuf < vrp->num_bufs / 2)
ret = vrp->sbufs + vrp->buf_size * vrp->last_sbuf++;
/* or recycle a used one */
else
ret = virtqueue_get_buf(vrp->svq, &len);

mutex_unlock(&vrp->tx_lock);

return ret;
}

这个函数是一个很简单的 TX buffer allocator。

它有两个阶段:

1
2
3
4
5
阶段 1:还有从未使用过的 TX buffer
按数组顺序从 vrp->sbufs 里拿

阶段 2:所有 TX buffer 都至少用过一次
从 svq used ring 回收远端已经读完的 TX buffer

last_sbuf 只在第一阶段有用。

一旦:

1
vrp->last_sbuf == vrp->num_bufs / 2

后续就一直走:

1
virtqueue_get_buf(vrp->svq, &len);

也就是说:

1
2
last_sbuf = 一次性“开荒游标”
virtqueue_get_buf() = 后续复用 buffer 的来源

virtio API

API 作用
virtio_find_vqs() 找/创建 virtqueue
virtqueue_add_inbuf() 给设备一个“可写入”的 buffer
virtqueue_add_outbuf() 给设备一个“可读取”的 buffer
virtqueue_add_sgs 通用的缓冲区添加函数包含in和out
virtqueue_get_buf() 从 used ring 取回设备处理完的 buffer
virtqueue_kick() 通知对方:队列有新缓冲区
virtqueue_kick_prepare() 判断是否需要通知
virtqueue_notify() 真正发通知
virtqueue_enable_cb() 打开 virtqueue callback/中断
virtqueue_disable_cb() 关闭 virtqueue callback/中断,用于轮询期间防止中断风暴
virtqueue_get_vring_size() 获取 ring 大小
virtio_has_feature() 检查 virtio feature
virtio_device_ready() 设置 DRIVER_OK,设备可以开始工作

probe 函数

1
2
3
4
5
6
7
8
9
static struct virtio_driver virtio_ipc_driver = {
.feature_table = features,
.feature_table_size = ARRAY_SIZE(features),
.driver.name = KBUILD_MODNAME,
.driver.owner = THIS_MODULE,
.id_table = id_table,
.probe = rpmsg_probe,
.remove = rpmsg_remove,
};

virtio_driver.probe 函数定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
static int rpmsg_probe(struct virtio_device *vdev)
{
vq_callback_t *vq_cbs[] = { rpmsg_recv_done, rpmsg_xmit_done };
static const char * const names[] = { "input", "output" };
struct virtqueue *vqs[2];
struct virtproc_info *vrp;
void *bufs_va;
int err = 0, i;
size_t total_buf_space;
bool notify;

// 分配struct virtproc_info
vrp = kzalloc(sizeof(*vrp), GFP_KERNEL);
if (!vrp)
return -ENOMEM;

vrp->vdev = vdev;

// 初始化struct virtproc_info的成员
idr_init(&vrp->endpoints);
mutex_init(&vrp->endpoints_lock);
mutex_init(&vrp->tx_lock);
init_waitqueue_head(&vrp->sendq);

/* We expect two virtqueues, rx and tx (and in this order) */
err = virtio_find_vqs(vdev, 2, vqs, vq_cbs, names, NULL);
if (err)
goto free_vrp;

vrp->rvq = vqs[0];
vrp->svq = vqs[1];

/* we expect symmetric tx/rx vrings */
WARN_ON(virtqueue_get_vring_size(vrp->rvq) !=
virtqueue_get_vring_size(vrp->svq));

/* we need less buffers if vrings are small */
if (virtqueue_get_vring_size(vrp->rvq) < MAX_RPMSG_NUM_BUFS / 2)
vrp->num_bufs = virtqueue_get_vring_size(vrp->rvq) * 2;
else
vrp->num_bufs = MAX_RPMSG_NUM_BUFS;

vrp->buf_size = MAX_RPMSG_BUF_SIZE;

total_buf_space = vrp->num_bufs * vrp->buf_size;

/* allocate coherent memory for the buffers */
bufs_va = dma_alloc_coherent(vdev->dev.parent,
total_buf_space, &vrp->bufs_dma,
GFP_KERNEL);
if (!bufs_va) {
err = -ENOMEM;
goto vqs_del;
}

dev_dbg(&vdev->dev, "buffers: va %pK, dma %pad\n",
bufs_va, &vrp->bufs_dma);

/* half of the buffers is dedicated for RX */
vrp->rbufs = bufs_va;

/* and half is dedicated for TX */
vrp->sbufs = bufs_va + total_buf_space / 2;

/* set up the receive buffers */
for (i = 0; i < vrp->num_bufs / 2; i++) {
struct scatterlist sg;
void *cpu_addr = vrp->rbufs + i * vrp->buf_size;

rpmsg_sg_init(&sg, cpu_addr, vrp->buf_size);

err = virtqueue_add_inbuf(vrp->rvq, &sg, 1, cpu_addr,
GFP_KERNEL);
WARN_ON(err); /* sanity check; this can't really happen */
}

/* suppress "tx-complete" interrupts */
virtqueue_disable_cb(vrp->svq);

vdev->priv = vrp;

/* if supported by the remote processor, enable the name service */
if (virtio_has_feature(vdev, VIRTIO_RPMSG_F_NS)) {
/* a dedicated endpoint handles the name service msgs */
vrp->ns_ept = __rpmsg_create_ept(vrp, NULL, rpmsg_ns_cb,
vrp, RPMSG_NS_ADDR);
if (!vrp->ns_ept) {
dev_err(&vdev->dev, "failed to create the ns ept\n");
err = -ENOMEM;
goto free_coherent;
}
}

/*
* Prepare to kick but don't notify yet - we can't do this before
* device is ready.
*/
notify = virtqueue_kick_prepare(vrp->rvq);

/* From this point on, we can notify and get callbacks. */
virtio_device_ready(vdev);

/* tell the remote processor it can start sending messages */
/*
* this might be concurrent with callbacks, but we are only
* doing notify, not a full kick here, so that's ok.
*/
if (notify)
virtqueue_notify(vrp->rvq);

dev_info(&vdev->dev, "rpmsg host is online\n");

return 0;

free_coherent:
dma_free_coherent(vdev->dev.parent, total_buf_space,
bufs_va, vrp->bufs_dma);
vqs_del:
vdev->config->del_vqs(vrp->vdev);
free_vrp:
kfree(vrp);
return err;
}

分析如下

分配virtproc_info

1
vrp = kzalloc(sizeof(*vrp), GFP_KERNEL);

初始化:

1
2
3
4
idr_init(&vrp->endpoints);
mutex_init(&vrp->endpoints_lock);
mutex_init(&vrp->tx_lock);
init_waitqueue_head(&vrp->sendq);

查找 virtqueue

驱动需要两个 virtqueue:

1
2
vq_callback_t *vq_cbs[] = { rpmsg_recv_done, rpmsg_xmit_done };
static const char * const names[] = { "input", "output" };

然后:

1
err = virtio_find_vqs(vdev, 2, vqs, vq_cbs, names, NULL);

两个 queue:

1
2
vrp->rvq = vqs[0];
vrp->svq = vqs[1];

含义:

virtqueue 用途 callback
rvq / input Linux 接收远端发来的消息 rpmsg_recv_done()
svq / output Linux 发送消息给远端 rpmsg_xmit_done()

计算 buffer 数量

1
2
3
4
if (virtqueue_get_vring_size(vrp->rvq) < MAX_RPMSG_NUM_BUFS / 2)
vrp->num_bufs = virtqueue_get_vring_size(vrp->rvq) * 2;
else
vrp->num_bufs = MAX_RPMSG_NUM_BUFS;

也就是说:如果 vring 小,就按 vring 能力减少 buffer 数,否则最多 512 个 buffer

分配 coherent DMA buffer

1
2
3
bufs_va = dma_alloc_coherent(vdev->dev.parent,
total_buf_space, &vrp->bufs_dma,
GFP_KERNEL);

这个 buffer 是主核和 virtio 后端都能访问的共享 DMA 内存。

预先把 RX buffer 放入 RX virtqueue

1
2
3
4
5
6
7
8
9
for (i = 0; i < vrp->num_bufs / 2; i++) {
struct scatterlist sg;
void *cpu_addr = vrp->rbufs + i * vrp->buf_size;

rpmsg_sg_init(&sg, cpu_addr, vrp->buf_size);

err = virtqueue_add_inbuf(vrp->rvq, &sg, 1, cpu_addr,
GFP_KERNEL);
}

这一步很关键:

Linux 先把一批空 RX buffer 放到 available ring 中,远端处理器之后才能把消息写进这些 buffer。

默认关闭 TX complete 中断

1
virtqueue_disable_cb(vrp->svq);

因为大多数时候,发送方不需要每次 TX buffer 被远端消费都收到中断。只有当 Linux 没有 TX buffer、发送者要睡眠等待时,才临时打开 TX complete 中断。

创建 name service endpoint

如果远端支持:

1
VIRTIO_RPMSG_F_NS

则创建地址为 53 的 name service endpoint:

1
2
vrp->ns_ept = __rpmsg_create_ept(vrp, NULL, rpmsg_ns_cb,
vrp, RPMSG_NS_ADDR);

地址定义:

1
#define RPMSG_NS_ADDR			(53)

这个 endpoint 专门处理远端服务创建/销毁通知。

设备 ready 并通知远端

1
2
3
4
notify = virtqueue_kick_prepare(vrp->rvq);
virtio_device_ready(vdev);
if (notify)
virtqueue_notify(vrp->rvq);

这里的顺序很重要:

  1. RX buffer 已经准备好(virtqueue_kick_prepare)
  2. virtio 设备置 ready
  3. 通知远端可以开始发送消息

最后打印:

1
dev_info(&vdev->dev, "rpmsg host is online\n");

remove 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static int rpmsg_remove_device(struct device *dev, void *data)
{
device_unregister(dev);

return 0;
}

static void rpmsg_remove(struct virtio_device *vdev)
{
struct virtproc_info *vrp = vdev->priv;
size_t total_buf_space = vrp->num_bufs * vrp->buf_size;
int ret;

vdev->config->reset(vdev);

ret = device_for_each_child(&vdev->dev, NULL, rpmsg_remove_device);
if (ret)
dev_warn(&vdev->dev, "can't remove rpmsg device: %d\n", ret);

if (vrp->ns_ept)
__rpmsg_destroy_ept(vrp, vrp->ns_ept);

idr_destroy(&vrp->endpoints);

vdev->config->del_vqs(vrp->vdev);

dma_free_coherent(vdev->dev.parent, total_buf_space,
vrp->rbufs, vrp->bufs_dma);

kfree(vrp);
}

流程:

  1. reset virtio device
1
vdev->config->reset(vdev);

先停止设备,避免继续收发。

  1. 删除所有子 rpmsg device
1
device_for_each_child(&vdev->dev, NULL, rpmsg_remove_device);

rpmsg_remove_device() 里调用:

1
device_unregister(dev);
  1. 销毁 NS endpoint
1
2
if (vrp->ns_ept)
__rpmsg_destroy_ept(vrp, vrp->ns_ept);
  1. endpoints、virtqueue、DMA buffer、vrp
1
2
3
4
idr_destroy(&vrp->endpoints);
vdev->config->del_vqs(vrp->vdev);
dma_free_coherent(...);
kfree(vrp);

发送消息(rpmsg_send_offchannel_raw())

所有发送 API 最终都会进入rpmsg_send_offchannel_raw()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/**
* rpmsg_send_offchannel_raw() - send a message across to the remote processor
* @rpdev: the rpmsg channel
* @src: source address
* @dst: destination address
* @data: payload of message
* @len: length of payload
* @wait: indicates whether caller should block in case no TX buffers available
*
* This function is the base implementation for all of the rpmsg sending API.
*
* It will send @data of length @len to @dst, and say it's from @src. The
* message will be sent to the remote processor which the @rpdev channel
* belongs to.
*
* The message is sent using one of the TX buffers that are available for
* communication with this remote processor.
*
* If @wait is true, the caller will be blocked until either a TX buffer is
* available, or 15 seconds elapses (we don't want callers to
* sleep indefinitely due to misbehaving remote processors), and in that
* case -ERESTARTSYS is returned. The number '15' itself was picked
* arbitrarily; there's little point in asking drivers to provide a timeout
* value themselves.
*
* Otherwise, if @wait is false, and there are no TX buffers available,
* the function will immediately fail, and -ENOMEM will be returned.
*
* Normally drivers shouldn't use this function directly; instead, drivers
* should use the appropriate rpmsg_{try}send{to, _offchannel} API
* (see include/linux/rpmsg.h).
*
* Returns 0 on success and an appropriate error value on failure.
*/
static int rpmsg_send_offchannel_raw(struct rpmsg_device *rpdev,
u32 src, u32 dst,
void *data, int len, bool wait)
{
struct virtio_rpmsg_channel *vch = to_virtio_rpmsg_channel(rpdev);
struct virtproc_info *vrp = vch->vrp;
struct device *dev = &rpdev->dev;
struct scatterlist sg;
struct rpmsg_hdr *msg;
int err;

/* bcasting isn't allowed */
if (src == RPMSG_ADDR_ANY || dst == RPMSG_ADDR_ANY) {
dev_err(dev, "invalid addr (src 0x%x, dst 0x%x)\n", src, dst);
return -EINVAL;
}

/*
* We currently use fixed-sized buffers, and therefore the payload
* length is limited.
*
* One of the possible improvements here is either to support
* user-provided buffers (and then we can also support zero-copy
* messaging), or to improve the buffer allocator, to support
* variable-length buffer sizes.
*/
if (len > vrp->buf_size - sizeof(struct rpmsg_hdr)) {
dev_err(dev, "message is too big (%d)\n", len);
return -EMSGSIZE;
}

/* grab a buffer */
msg = get_a_tx_buf(vrp);
if (!msg && !wait)
return -ENOMEM;

/* no free buffer ? wait for one (but bail after 15 seconds) */
while (!msg) {
/* enable "tx-complete" interrupts, if not already enabled */
rpmsg_upref_sleepers(vrp);

/*
* sleep until a free buffer is available or 15 secs elapse.
* the timeout period is not configurable because there's
* little point in asking drivers to specify that.
* if later this happens to be required, it'd be easy to add.
*/
err = wait_event_interruptible_timeout(vrp->sendq,
(msg = get_a_tx_buf(vrp)),
msecs_to_jiffies(15000));

/* disable "tx-complete" interrupts if we're the last sleeper */
rpmsg_downref_sleepers(vrp);

/* timeout ? */
if (!err) {
dev_err(dev, "timeout waiting for a tx buffer\n");
return -ERESTARTSYS;
}
}

msg->len = cpu_to_virtio16(vrp->vdev, len);
msg->flags = 0;
msg->src = cpu_to_virtio32(vrp->vdev, src);
msg->dst = cpu_to_virtio32(vrp->vdev, dst);
msg->reserved = 0;
memcpy(msg->data, data, len);

dev_dbg(dev, "TX From 0x%x, To 0x%x, Len %d, Flags %d, Reserved %d\n",
src, dst, len, msg->flags, msg->reserved);
#if defined(CONFIG_DYNAMIC_DEBUG)
dynamic_hex_dump("rpmsg_virtio TX: ", DUMP_PREFIX_NONE, 16, 1,
msg, sizeof(*msg) + len, true);
#endif

rpmsg_sg_init(&sg, msg, sizeof(*msg) + len);

mutex_lock(&vrp->tx_lock);

/* add message to the remote processor's virtqueue */
err = virtqueue_add_outbuf(vrp->svq, &sg, 1, msg, GFP_KERNEL);
if (err) {
/*
* need to reclaim the buffer here, otherwise it's lost
* (memory won't leak, but rpmsg won't use it again for TX).
* this will wait for a buffer management overhaul.
*/
dev_err(dev, "virtqueue_add_outbuf failed: %d\n", err);
goto out;
}

/* tell the remote processor it has a pending message to read */
virtqueue_kick(vrp->svq);
out:
mutex_unlock(&vrp->tx_lock);
return err;
}

调用链大致是:

1
2
3
4
5
6
7
8
rpmsg_send()
→ ept->ops->send()
→ virtio_rpmsg_send()
→ rpmsg_send_offchannel_raw()

rpmsg_trysend()
→ virtio_rpmsg_trysend()
→ rpmsg_send_offchannel_raw(..., wait = false)

参数检查

首先检查 src/dst:

1
2
if (src == RPMSG_ADDR_ANY || dst == RPMSG_ADDR_ANY)
return -EINVAL;

不允许广播地址作为实际发送地址。
然后检查长度:

1
2
if (len > vrp->buf_size - sizeof(struct rpmsg_hdr))
return -EMSGSIZE;

因为单个 buffer 固定 512 字节,所以 payload 不能超过512 - sizeof(struct rpmsg_hdr)

获取 TX buffer

1
msg = get_a_tx_buf(vrp);

它的逻辑:

1
2
3
4
if (vrp->last_sbuf < vrp->num_bufs / 2)
ret = vrp->sbufs + vrp->buf_size * vrp->last_sbuf++;
else
ret = virtqueue_get_buf(vrp->svq, &len);

含义:

  1. 初始阶段:直接从 TX buffer 池中拿一个从没用过的 buffer
  2. 后续阶段:从 svq used ring 中回收远端已经消费完的 TX buffer

没有 TX buffer 时怎么办?

如果是 trysend

1
2
if (!msg && !wait)
return -ENOMEM;

如果是普通 send,则等待,最多 15 秒:

1
2
3
err = wait_event_interruptible_timeout(vrp->sendq,
(msg = get_a_tx_buf(vrp)),
msecs_to_jiffies(15000));

如果超时:

1
return -ERESTARTSYS;

等待前会调用:

1
rpmsg_upref_sleepers(vrp);

作用: 如果这是第一个睡眠等待 TX buffer 的发送者,则打开 TX complete 中断

1
virtqueue_enable_cb(vrp->svq);

等待结束后调用:

1
rpmsg_downref_sleepers(vrp);

如果已经没有等待者,就关闭 TX complete 中断:

1
virtqueue_disable_cb(vrp->svq);

填充 rpmsg header 和 payload

1
2
3
4
5
6
msg->len = cpu_to_virtio16(vrp->vdev, len);
msg->flags = 0;
msg->src = cpu_to_virtio32(vrp->vdev, src);
msg->dst = cpu_to_virtio32(vrp->vdev, dst);
msg->reserved = 0;
memcpy(msg->data, data, len);

注意这里使用 cpu_to_virtio16/32(),因为 virtio 设备可能有特定字节序要求。

加入 TX virtqueue 并 kick 远端

1
virtqueue_add_outbuf(vrp->svq, &sg, 1, msg, GFP_KERNEL);

然后:

1
virtqueue_kick(vrp->svq);

完整含义:
Linux 填好 TX buffer → 把 buffer 加入 output virtqueue → kick 远端处理器 → 远端从 virtqueue 取走消息

TX complete 回调

在probe函数中设置了

1
2
3
4
5
6
7
8
9
10
11
12
static int rpmsg_probe(struct virtio_device *vdev)
{
vq_callback_t *vq_cbs[] = { rpmsg_recv_done, rpmsg_xmit_done };
static const char * const names[] = { "input", "output" };
struct virtqueue *vqs[2];

.....

err = virtio_find_vqs(vdev, 2, vqs, vq_cbs, names, NULL);

.....
}

即tx的回调函数为rpmsg_xmit_done

注意:平时 TX complete 中断是关闭的,只有有发送者睡眠等待 buffer 时才打开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* This is invoked whenever the remote processor completed processing
* a TX msg we just sent it, and the buffer is put back to the used ring.
*
* Normally, though, we suppress this "tx complete" interrupt in order to
* avoid the incurred overhead.
*/
static void rpmsg_xmit_done(struct virtqueue *svq)
{
struct virtproc_info *vrp = svq->vdev->priv;

dev_dbg(&svq->vdev->dev, "%s\n", __func__);

/* wake up potential senders that are waiting for a tx buffer */
wake_up_interruptible(&vrp->sendq);
}

当远端消费了 TX buffer,virtio 后端把 buffer 放入 used ring,并触发 TX complete callback。Linux 唤醒等待 TX buffer 的发送线程。

TX sleepers 机制:rpmsg_upref_sleepers()

1
2
3
4
5
6
7
8
9
10
11
12
static void rpmsg_upref_sleepers(struct virtproc_info *vrp)
{
/* support multiple concurrent senders */
mutex_lock(&vrp->tx_lock);

/* are we the first sleeping context waiting for tx buffers ? */
if (atomic_inc_return(&vrp->sleepers) == 1)
/* enable "tx-complete" interrupts before dozing off */
virtqueue_enable_cb(vrp->svq);

mutex_unlock(&vrp->tx_lock);
}

含义:发送线程因为没有 TX buffer 准备睡眠前:sleepers++, 如果这是第一个等待者,就打开 TX complete callback
对应还有 rpmsg_downref_sleepers():发送线程醒来后:sleepers--, 如果这是最后一个等待者,就关闭 TX complete callback

这套机制的目的:

  • 没人等 TX buffer:关闭 TX complete 中断,减少开销
  • 有人等 TX buffer:打开 TX complete 中断,远端归还 buffer 后马上唤醒等待者

完整链路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
TX buffer 用完

rpmsg_send() 准备睡眠

rpmsg_upref_sleepers()

第一个 sleeper 打开 svq callback

远端读完 TX buffer

rpmsg_xmit_done()

wake_up_interruptible(&vrp->sendq)

发送线程醒来

get_a_tx_buf() / virtqueue_get_buf(svq)

拿到回收 buffer

接收消息(rpmsg_recv_done)

在probe函数中设置了

1
2
3
4
5
6
7
8
9
10
11
12
static int rpmsg_probe(struct virtio_device *vdev)
{
vq_callback_t *vq_cbs[] = { rpmsg_recv_done, rpmsg_xmit_done };
static const char * const names[] = { "input", "output" };
struct virtqueue *vqs[2];

.....

err = virtio_find_vqs(vdev, 2, vqs, vq_cbs, names, NULL);

.....
}

即rx的回调函数为rpmsg_recv_done, 函数定义如下:

rpmsg_recv_done

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* called when an rx buffer is used, and it's time to digest a message */
static void rpmsg_recv_done(struct virtqueue *rvq)
{
struct virtproc_info *vrp = rvq->vdev->priv;
struct device *dev = &rvq->vdev->dev;
struct rpmsg_hdr *msg;
unsigned int len, msgs_received = 0;
int err;

msg = virtqueue_get_buf(rvq, &len);
if (!msg) {
dev_err(dev, "uhm, incoming signal, but no used buffer ?\n");
return;
}

while (msg) {
err = rpmsg_recv_single(vrp, dev, msg, len);
if (err)
break;

msgs_received++;

msg = virtqueue_get_buf(rvq, &len);
}

dev_dbg(dev, "Received %u messages\n", msgs_received);

/* tell the remote processor we added another available rx buffer */
if (msgs_received)
virtqueue_kick(vrp->rvq);
}

从 RX virtqueue 取 used buffer

1
msg = virtqueue_get_buf(rvq, &len);

如果能拿到msg, 尝试继续,循环处理所有可用消息:

1
2
3
4
5
6
7
8
9
while (msg) {
err = rpmsg_recv_single(vrp, dev, msg, len);
if (err)
break;

msgs_received++;

msg = virtqueue_get_buf(rvq, &len);
}

单条消息处理:rpmsg_recv_single()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
static int rpmsg_recv_single(struct virtproc_info *vrp, struct device *dev,
struct rpmsg_hdr *msg, unsigned int len)
{
struct rpmsg_endpoint *ept;
struct scatterlist sg;
unsigned int msg_len = virtio16_to_cpu(vrp->vdev, msg->len);
int err;

dev_dbg(dev, "From: 0x%x, To: 0x%x, Len: %d, Flags: %d, Reserved: %d\n",
virtio32_to_cpu(vrp->vdev, msg->src),
virtio32_to_cpu(vrp->vdev, msg->dst), msg_len,
virtio16_to_cpu(vrp->vdev, msg->flags),
virtio32_to_cpu(vrp->vdev, msg->reserved));
#if defined(CONFIG_DYNAMIC_DEBUG)
dynamic_hex_dump("rpmsg_virtio RX: ", DUMP_PREFIX_NONE, 16, 1,
msg, sizeof(*msg) + msg_len, true);
#endif

/*
* We currently use fixed-sized buffers, so trivially sanitize
* the reported payload length.
*/
if (len > vrp->buf_size ||
msg_len > (len - sizeof(struct rpmsg_hdr))) {
dev_warn(dev, "inbound msg too big: (%d, %d)\n", len, msg_len);
return -EINVAL;
}

/* use the dst addr to fetch the callback of the appropriate user */
mutex_lock(&vrp->endpoints_lock);

ept = idr_find(&vrp->endpoints, virtio32_to_cpu(vrp->vdev, msg->dst));

/* let's make sure no one deallocates ept while we use it */
if (ept)
kref_get(&ept->refcount);

mutex_unlock(&vrp->endpoints_lock);

if (ept) {
/* make sure ept->cb doesn't go away while we use it */
mutex_lock(&ept->cb_lock);

if (ept->cb)
ept->cb(ept->rpdev, msg->data, msg_len, ept->priv,
virtio32_to_cpu(vrp->vdev, msg->src));

mutex_unlock(&ept->cb_lock);

/* farewell, ept, we don't need you anymore */
kref_put(&ept->refcount, __ept_release);
} else
dev_warn(dev, "msg received with no recipient\n");

/* publish the real size of the buffer */
rpmsg_sg_init(&sg, msg, vrp->buf_size);

/* add the buffer back to the remote processor's virtqueue */
err = virtqueue_add_inbuf(vrp->rvq, &sg, 1, msg, GFP_KERNEL);
if (err < 0) {
dev_err(dev, "failed to add a virtqueue buffer: %d\n", err);
return err;
}

return 0;
}

接收时不是根据 channel name 分发,而是根据:msg->dst, 在vrp->endpoints 里查本地 endpoint。所以 rpmsg 实际运行时更像address-based messaging, channel name 主要用于设备发现和 driver 匹配。
主要步骤:

  1. 解析 payload 长度
1
msg_len = virtio16_to_cpu(vrp->vdev, msg->len);
  1. 检查消息长度是否合法
1
2
3
4
if (len > vrp->buf_size ||
msg_len > (len - sizeof(struct rpmsg_hdr))) {
return -EINVAL;
}

防止远端传入异常长度导致越界。

  1. 根据 dst 查找 endpoint
1
ept = idr_find(&vrp->endpoints, virtio32_to_cpu(vrp->vdev, msg->dst));

这是 rpmsg 分发的核心:msg->dst == 本地 endpoint 地址

  1. 增加 endpoint 引用计数
1
2
if (ept)
kref_get(&ept->refcount);

避免 callback 执行期间 endpoint 被释放。

  1. 调用 callback
1
2
3
if (ept->cb)
ept->cb(ept->rpdev, msg->data, msg_len, ept->priv,
virtio32_to_cpu(vrp->vdev, msg->src));

传给上层 callback 的参数包括:

参数 含义
ept->rpdev 对应 rpmsg device
msg->data payload
msg_len payload 长度
ept->priv endpoint 私有数据
msg->src 远端源地址
  1. 把 RX buffer 放回 virtqueue
    处理完之后,这个 buffer 需要重新给远端用:
1
2
rpmsg_sg_init(&sg, msg, vrp->buf_size);
err = virtqueue_add_inbuf(vrp->rvq, &sg, 1, msg, GFP_KERNEL);

kick 远端

最后,在 rpmsg_recv_done() 里如果处理过消息,就 kick 远端:

1
2
if (msgs_received)
virtqueue_kick(vrp->rvq);

name service 机制

注意:name service 是可选 feature。feature 定义:

1
#define VIRTIO_RPMSG_F_NS	0

如果远端不支持这个 feature,动态服务发现就不可用。此时 channel 可能需要通过静态方式创建。

rpmsg 支持远端动态通知 Linux:我创建了一个服务或者我销毁了一个服务,这叫 name service。相关结构:

1
2
3
4
5
struct rpmsg_ns_msg {
char name[RPMSG_NAME_SIZE];
__virtio32 addr;
__virtio32 flags;
} __packed;

字段:

字段 含义
name 服务名称
addr 远端服务地址
flags 创建或销毁

flags:

1
2
RPMSG_NS_CREATE  = 0
RPMSG_NS_DESTROY = 1

NS endpoint

name service 使用固定地址:

1
#define RPMSG_NS_ADDR			(53)

probe 时创建:

1
2
vrp->ns_ept = __rpmsg_create_ept(vrp, NULL, rpmsg_ns_cb,
vrp, RPMSG_NS_ADDR);

注意这里 rpdev 参数是 NULL,因为 NS endpoint 不属于某个普通 rpmsg channel,而是 bus 自己用的内部 endpoint。

NS callback:rpmsg_ns_cb()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* invoked when a name service announcement arrives */
static int rpmsg_ns_cb(struct rpmsg_device *rpdev, void *data, int len,
void *priv, u32 src)
{
struct rpmsg_ns_msg *msg = data;
struct rpmsg_device *newch;
struct rpmsg_channel_info chinfo;
struct virtproc_info *vrp = priv;
struct device *dev = &vrp->vdev->dev;
int ret;

#if defined(CONFIG_DYNAMIC_DEBUG)
dynamic_hex_dump("NS announcement: ", DUMP_PREFIX_NONE, 16, 1,
data, len, true);
#endif

if (len != sizeof(*msg)) {
dev_err(dev, "malformed ns msg (%d)\n", len);
return -EINVAL;
}

/*
* the name service ept does _not_ belong to a real rpmsg channel,
* and is handled by the rpmsg bus itself.
* for sanity reasons, make sure a valid rpdev has _not_ sneaked
* in somehow.
*/
if (rpdev) {
dev_err(dev, "anomaly: ns ept has an rpdev handle\n");
return -EINVAL;
}

/* don't trust the remote processor for null terminating the name */
msg->name[RPMSG_NAME_SIZE - 1] = '\0';

strncpy(chinfo.name, msg->name, sizeof(chinfo.name));
chinfo.src = RPMSG_ADDR_ANY;
chinfo.dst = virtio32_to_cpu(vrp->vdev, msg->addr);

dev_info(dev, "%sing channel %s addr 0x%x\n",
virtio32_to_cpu(vrp->vdev, msg->flags) & RPMSG_NS_DESTROY ?
"destroy" : "creat", msg->name, chinfo.dst);

if (virtio32_to_cpu(vrp->vdev, msg->flags) & RPMSG_NS_DESTROY) {
ret = rpmsg_unregister_device(&vrp->vdev->dev, &chinfo);
if (ret)
dev_err(dev, "rpmsg_destroy_channel failed: %d\n", ret);
} else {
newch = rpmsg_create_channel(vrp, &chinfo);
if (!newch)
dev_err(dev, "rpmsg_create_channel failed\n");
}

return 0;
}

收到远端 NS 消息后:

  1. 检查长度
  2. 确保 endpoint 不绑定真实 rpdev
  3. 修正 name 字符串结尾
  4. 构造 rpmsg_channel_info
  5. 根据 flags 创建或销毁 channel

创建 channel:

1
newch = rpmsg_create_channel(vrp, &chinfo);

销毁 channel:

1
ret = rpmsg_unregister_device(&vrp->vdev->dev, &chinfo);

本地服务 announce 机制

这个驱动不仅能接收远端 NS,也能向远端 announce 本地服务。

相关函数:

1
2
virtio_rpmsg_announce_create()
virtio_rpmsg_announce_destroy()

当本地创建一个需要 announce 的 rpmsg channel 时,驱动会构造 rpmsg_ns_msg,发到远端的 NS 地址 53(在rpmsg_core.c中)。 创建通知:

1
2
nsm.flags = cpu_to_virtio32(vrp->vdev, RPMSG_NS_CREATE);
err = rpmsg_sendto(rpdev->ept, &nsm, sizeof(nsm), RPMSG_NS_ADDR);

销毁通知:

1
2
nsm.flags = cpu_to_virtio32(vrp->vdev, RPMSG_NS_DESTROY);
err = rpmsg_sendto(rpdev->ept, &nsm, sizeof(nsm), RPMSG_NS_ADDR);

rpmsg channel 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* create an rpmsg channel using its name and address info.
* this function will be used to create both static and dynamic
* channels.
*/
static struct rpmsg_device *rpmsg_create_channel(struct virtproc_info *vrp,
struct rpmsg_channel_info *chinfo)
{
struct virtio_rpmsg_channel *vch;
struct rpmsg_device *rpdev;
struct device *tmp, *dev = &vrp->vdev->dev;
int ret;

/* make sure a similar channel doesn't already exist */
tmp = rpmsg_find_device(dev, chinfo);
if (tmp) {
/* decrement the matched device's refcount back */
put_device(tmp);
dev_err(dev, "channel %s:%x:%x already exist\n",
chinfo->name, chinfo->src, chinfo->dst);
return NULL;
}

vch = kzalloc(sizeof(*vch), GFP_KERNEL);
if (!vch)
return NULL;

/* Link the channel to our vrp */
vch->vrp = vrp;

/* Assign public information to the rpmsg_device */
rpdev = &vch->rpdev;
rpdev->src = chinfo->src;
rpdev->dst = chinfo->dst;
rpdev->ops = &virtio_rpmsg_ops;

/*
* rpmsg server channels has predefined local address (for now),
* and their existence needs to be announced remotely
*/
rpdev->announce = rpdev->src != RPMSG_ADDR_ANY;

strncpy(rpdev->id.name, chinfo->name, RPMSG_NAME_SIZE);

rpdev->dev.parent = &vrp->vdev->dev;
rpdev->dev.release = virtio_rpmsg_release_device;
ret = rpmsg_register_device(rpdev);
if (ret)
return NULL;

return rpdev;
}

它根据传入的struct rpmsg_channel_info *chinfo创建一个 rpmsg_device

1
2
3
4
5
6
7
8
9
10
11
/**
* struct rpmsg_channel_info - channel info representation
* @name: name of service
* @src: local address
* @dst: destination address
*/
struct rpmsg_channel_info {
char name[RPMSG_NAME_SIZE];
u32 src;
u32 dst;
};

流程:

  1. 检查是否已有相同 channel
1
tmp = rpmsg_find_device(dev, chinfo);

如果已经存在,则不重复创建。

  1. 分配 virtio_rpmsg_channel
1
vch = kzalloc(sizeof(*vch), GFP_KERNEL);

这个结构:

1
2
3
4
struct virtio_rpmsg_channel {
struct rpmsg_device rpdev;
struct virtproc_info *vrp;
};

它是 virtio transport 私有 channel,对外暴露其中的 rpmsg_device

  1. 填充 rpmsg_device
1
2
3
4
5
rpdev->src = chinfo->src;
rpdev->dst = chinfo->dst;
rpdev->ops = &virtio_rpmsg_ops;

strncpy(rpdev->id.name, chinfo->name, RPMSG_NAME_SIZE);

然后注册:

1
ret = rpmsg_register_device(rpdev);

注册后,rpmsg bus 会匹配上层 rpmsg driver 的 id table,然后调用对应 driver 的 probe。

整体收发流程图

Linux 发送消息给远端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
上层 rpmsg driver
|
| rpmsg_send()
v
rpmsg core
|
v
virtio_rpmsg_send()
|
v
rpmsg_send_offchannel_raw()
|
| 1. 检查 src/dst/len
| 2. 获取 TX buffer
| 3. 填 rpmsg_hdr
| 4. memcpy payload
| 5. virtqueue_add_outbuf()
| 6. virtqueue_kick()
v
远端处理器从 svq 取消息

Linux 接收远端消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
远端处理器写入 RX buffer
|
| 通知 virtqueue
v
rpmsg_recv_done()
|
v
virtqueue_get_buf()
|
v
rpmsg_recv_single()
|
| 1. 检查长度
| 2. 用 msg->dst 查 endpoint
| 3. 调 endpoint callback
| 4. 把 RX buffer 重新 add_inbuf()
v
上层 rpmsg driver 收到 callback

远端发布服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
远端发送 NS 消息到 addr 53
|
v
rpmsg_recv_done()
|
v
rpmsg_recv_single()
|
v
endpoint 53 的 callback
|
v
rpmsg_ns_cb()
|
| RPMSG_NS_CREATE
v
rpmsg_create_channel()
|
v
rpmsg_register_device()
|
v
匹配上层 rpmsg driver

RX buffer 生命周期

RX buffer 生命周期:

1
2
3
4
5
6
7
8
9
10
11
probe 时 add_inbuf

远端写入消息

Linux virtqueue_get_buf

调用 callback

Linux 重新 add_inbuf

远端再次使用

所以 RX buffer 是循环复用的。

TX buffer 生命周期

TX buffer 生命周期:

1
2
3
4
5
6
7
8
9
10
11
12
13
Linux 从 sbufs 初始池拿 buffer

填消息

virtqueue_add_outbuf

远端消费

buffer 进入 used ring

Linux virtqueue_get_buf 回收

再次发送

参考文档