/** * rpmsg_device - device that belong to the rpmsg bus * @dev: the device struct * @id: device id (used to match between rpmsg drivers and devices) * @driver_override: driver name to force a match; do not set directly, * because core frees it; use driver_set_override() to * set or clear it. * @src: local address * @dst: destination address * @ept: the rpmsg endpoint of this channel * @announce: if set, rpmsg will announce the creation/removal of this channel */ structrpmsg_device { structdevicedev; structrpmsg_device_idid; constchar *driver_override; u32 src; u32 dst; structrpmsg_endpoint *ept; bool announce;
/** * struct rpmsg_device_ops - indirection table for the rpmsg_device operations * @create_ept: create backend-specific endpoint, required * @announce_create: announce presence of new channel, optional * @announce_destroy: announce destruction of channel, optional * * Indirection table for the operations that a rpmsg backend should implement. * @announce_create and @announce_destroy are optional as the backend might * advertise new channels implicitly by creating the endpoints. */ structrpmsg_device_ops { structrpmsg_endpoint *(*create_ept)(structrpmsg_device *rpdev, rpmsg_rx_cb_tcb, void *priv, structrpmsg_channel_infochinfo);
int (*announce_create)(struct rpmsg_device *ept); int (*announce_destroy)(struct rpmsg_device *ept); };
/** * struct virtio_rpmsg_channel - rpmsg channel descriptor * @rpdev: the rpmsg channel device * @vrp: the virtio remote processor device this channel belongs to * * This structure stores the channel that links the rpmsg device to the virtio * remote processor device. */ structvirtio_rpmsg_channel { structrpmsg_devicerpdev;
structvirtproc_info *vrp; };
即一个rpmsg_device还要加上核心数据结构struct virtproc_info *vrp,这是 virtio 实现 rpmsg bus 的关键数据结构
/** * struct rpmsg_endpoint - binds a local rpmsg address to its user * @rpdev: rpmsg channel device * @refcount: when this drops to zero, the ept is deallocated * @cb: rx callback handler * @cb_lock: must be taken before accessing/changing @cb * @addr: local rpmsg address * @priv: private data for the driver's use * * In essence, an rpmsg endpoint represents a listener on the rpmsg bus, as * it binds an rpmsg address with an rx callback handler. * * Simple rpmsg drivers shouldn't use this struct directly, because * things just work: every rpmsg driver provides an rx callback upon * registering to the bus, and that callback is then bound to its rpmsg * address when the driver is probed. When relevant inbound messages arrive * (i.e. messages which their dst address equals to the src address of * the rpmsg channel), the driver's handler is invoked to process it. * * More complicated drivers though, that do need to allocate additional rpmsg * addresses, and bind them to different rx callbacks, must explicitly * create additional endpoints by themselves (see rpmsg_create_ept()). */ structrpmsg_endpoint { structrpmsg_device *rpdev; structkrefrefcount; rpmsg_rx_cb_t cb; structmutexcb_lock; u32 addr; void *priv;
/** * struct rpmsg_endpoint_ops - indirection table for rpmsg_endpoint operations * @destroy_ept: see @rpmsg_destroy_ept(), required * @send: see @rpmsg_send(), required * @sendto: see @rpmsg_sendto(), optional * @send_offchannel: see @rpmsg_send_offchannel(), optional * @trysend: see @rpmsg_trysend(), required * @trysendto: see @rpmsg_trysendto(), optional * @trysend_offchannel: see @rpmsg_trysend_offchannel(), optional * @poll: see @rpmsg_poll(), optional * * Indirection table for the operations that a rpmsg backend should implement. * In addition to @destroy_ept, the backend must at least implement @send and * @trysend, while the variants sending data off-channel are optional. */ structrpmsg_endpoint_ops { void (*destroy_ept)(struct rpmsg_endpoint *ept);
int (*send)(struct rpmsg_endpoint *ept, void *data, int len); int (*sendto)(struct rpmsg_endpoint *ept, void *data, int len, u32 dst); int (*send_offchannel)(struct rpmsg_endpoint *ept, u32 src, u32 dst, void *data, int len);
int (*trysend)(struct rpmsg_endpoint *ept, void *data, int len); int (*trysendto)(struct rpmsg_endpoint *ept, void *data, int len, u32 dst); int (*trysend_offchannel)(struct rpmsg_endpoint *ept, u32 src, u32 dst, void *data, int len); __poll_t (*poll)(struct rpmsg_endpoint *ept, struct file *filp, poll_table *wait); };
/* do we need to allocate a local address ? */ if (addr == RPMSG_ADDR_ANY) { id_min = RPMSG_RESERVED_ADDRESSES; id_max = 0; } else { id_min = addr; id_max = addr + 1; }
mutex_lock(&vrp->endpoints_lock);
/* bind the endpoint to an rpmsg address (and allocate one if needed) */ id = idr_alloc(&vrp->endpoints, ept, id_min, id_max, GFP_KERNEL); if (id < 0) { dev_err(dev, "idr_alloc failed: %d\n", id); goto free_ept; } ept->addr = id;
/** * __rpmsg_destroy_ept() - destroy an existing rpmsg endpoint * @vrp: virtproc which owns this ept * @ept: endpoing to destroy * * An internal function which destroy an ept without assuming it is * bound to an rpmsg channel. This is needed for handling the internal * name service endpoint, which isn't bound to an rpmsg channel. * See also __rpmsg_create_ept(). */ staticvoid __rpmsg_destroy_ept(struct virtproc_info *vrp, struct rpmsg_endpoint *ept) { /* make sure new inbound messages can't find this ept anymore */ mutex_lock(&vrp->endpoints_lock); idr_remove(&vrp->endpoints, ept->addr); mutex_unlock(&vrp->endpoints_lock);
/* super simple buffer "allocator" that is just enough for now */ staticvoid *get_a_tx_buf(struct virtproc_info *vrp) { unsignedint len; void *ret;
/* support multiple concurrent senders */ mutex_lock(&vrp->tx_lock);
4/* * either pick the next unused tx buffer * (half of our buffers are used for sending messages) */ if (vrp->last_sbuf < vrp->num_bufs / 2) ret = vrp->sbufs + vrp->buf_size * vrp->last_sbuf++; /* or recycle a used one */ else ret = virtqueue_get_buf(vrp->svq, &len);
mutex_unlock(&vrp->tx_lock);
return ret; }
这个函数是一个很简单的 TX buffer allocator。
它有两个阶段:
1 2 3 4 5
阶段 1:还有从未使用过的 TX buffer 按数组顺序从 vrp->sbufs 里拿
阶段 2:所有 TX buffer 都至少用过一次 从 svq used ring 回收远端已经读完的 TX buffer
/* We expect two virtqueues, rx and tx (and in this order) */ err = virtio_find_vqs(vdev, 2, vqs, vq_cbs, names, NULL); if (err) goto free_vrp;
vrp->rvq = vqs[0]; vrp->svq = vqs[1];
/* we expect symmetric tx/rx vrings */ WARN_ON(virtqueue_get_vring_size(vrp->rvq) != virtqueue_get_vring_size(vrp->svq));
/* we need less buffers if vrings are small */ if (virtqueue_get_vring_size(vrp->rvq) < MAX_RPMSG_NUM_BUFS / 2) vrp->num_bufs = virtqueue_get_vring_size(vrp->rvq) * 2; else vrp->num_bufs = MAX_RPMSG_NUM_BUFS;
vrp->buf_size = MAX_RPMSG_BUF_SIZE;
total_buf_space = vrp->num_bufs * vrp->buf_size;
/* allocate coherent memory for the buffers */ bufs_va = dma_alloc_coherent(vdev->dev.parent, total_buf_space, &vrp->bufs_dma, GFP_KERNEL); if (!bufs_va) { err = -ENOMEM; goto vqs_del; }
dev_dbg(&vdev->dev, "buffers: va %pK, dma %pad\n", bufs_va, &vrp->bufs_dma);
/* half of the buffers is dedicated for RX */ vrp->rbufs = bufs_va;
/* and half is dedicated for TX */ vrp->sbufs = bufs_va + total_buf_space / 2;
/* set up the receive buffers */ for (i = 0; i < vrp->num_bufs / 2; i++) { structscatterlistsg; void *cpu_addr = vrp->rbufs + i * vrp->buf_size;
/* if supported by the remote processor, enable the name service */ if (virtio_has_feature(vdev, VIRTIO_RPMSG_F_NS)) { /* a dedicated endpoint handles the name service msgs */ vrp->ns_ept = __rpmsg_create_ept(vrp, NULL, rpmsg_ns_cb, vrp, RPMSG_NS_ADDR); if (!vrp->ns_ept) { dev_err(&vdev->dev, "failed to create the ns ept\n"); err = -ENOMEM; goto free_coherent; } }
/* * Prepare to kick but don't notify yet - we can't do this before * device is ready. */ notify = virtqueue_kick_prepare(vrp->rvq);
/* From this point on, we can notify and get callbacks. */ virtio_device_ready(vdev);
/* tell the remote processor it can start sending messages */ /* * this might be concurrent with callbacks, but we are only * doing notify, not a full kick here, so that's ok. */ if (notify) virtqueue_notify(vrp->rvq);
/** * rpmsg_send_offchannel_raw() - send a message across to the remote processor * @rpdev: the rpmsg channel * @src: source address * @dst: destination address * @data: payload of message * @len: length of payload * @wait: indicates whether caller should block in case no TX buffers available * * This function is the base implementation for all of the rpmsg sending API. * * It will send @data of length @len to @dst, and say it's from @src. The * message will be sent to the remote processor which the @rpdev channel * belongs to. * * The message is sent using one of the TX buffers that are available for * communication with this remote processor. * * If @wait is true, the caller will be blocked until either a TX buffer is * available, or 15 seconds elapses (we don't want callers to * sleep indefinitely due to misbehaving remote processors), and in that * case -ERESTARTSYS is returned. The number '15' itself was picked * arbitrarily; there's little point in asking drivers to provide a timeout * value themselves. * * Otherwise, if @wait is false, and there are no TX buffers available, * the function will immediately fail, and -ENOMEM will be returned. * * Normally drivers shouldn't use this function directly; instead, drivers * should use the appropriate rpmsg_{try}send{to, _offchannel} API * (see include/linux/rpmsg.h). * * Returns 0 on success and an appropriate error value on failure. */ staticintrpmsg_send_offchannel_raw(struct rpmsg_device *rpdev, u32 src, u32 dst, void *data, int len, bool wait) { structvirtio_rpmsg_channel *vch = to_virtio_rpmsg_channel(rpdev); structvirtproc_info *vrp = vch->vrp; structdevice *dev = &rpdev->dev; structscatterlistsg; structrpmsg_hdr *msg; int err;
/* * We currently use fixed-sized buffers, and therefore the payload * length is limited. * * One of the possible improvements here is either to support * user-provided buffers (and then we can also support zero-copy * messaging), or to improve the buffer allocator, to support * variable-length buffer sizes. */ if (len > vrp->buf_size - sizeof(struct rpmsg_hdr)) { dev_err(dev, "message is too big (%d)\n", len); return -EMSGSIZE; }
/* grab a buffer */ msg = get_a_tx_buf(vrp); if (!msg && !wait) return -ENOMEM;
/* no free buffer ? wait for one (but bail after 15 seconds) */ while (!msg) { /* enable "tx-complete" interrupts, if not already enabled */ rpmsg_upref_sleepers(vrp);
/* * sleep until a free buffer is available or 15 secs elapse. * the timeout period is not configurable because there's * little point in asking drivers to specify that. * if later this happens to be required, it'd be easy to add. */ err = wait_event_interruptible_timeout(vrp->sendq, (msg = get_a_tx_buf(vrp)), msecs_to_jiffies(15000));
/* disable "tx-complete" interrupts if we're the last sleeper */ rpmsg_downref_sleepers(vrp);
/* timeout ? */ if (!err) { dev_err(dev, "timeout waiting for a tx buffer\n"); return -ERESTARTSYS; } }
/* add message to the remote processor's virtqueue */ err = virtqueue_add_outbuf(vrp->svq, &sg, 1, msg, GFP_KERNEL); if (err) { /* * need to reclaim the buffer here, otherwise it's lost * (memory won't leak, but rpmsg won't use it again for TX). * this will wait for a buffer management overhaul. */ dev_err(dev, "virtqueue_add_outbuf failed: %d\n", err); goto out; }
/* tell the remote processor it has a pending message to read */ virtqueue_kick(vrp->svq); out: mutex_unlock(&vrp->tx_lock); return err; }
/* * This is invoked whenever the remote processor completed processing * a TX msg we just sent it, and the buffer is put back to the used ring. * * Normally, though, we suppress this "tx complete" interrupt in order to * avoid the incurred overhead. */ staticvoidrpmsg_xmit_done(struct virtqueue *svq) { structvirtproc_info *vrp = svq->vdev->priv;
dev_dbg(&svq->vdev->dev, "%s\n", __func__);
/* wake up potential senders that are waiting for a tx buffer */ wake_up_interruptible(&vrp->sendq); }
/* are we the first sleeping context waiting for tx buffers ? */ if (atomic_inc_return(&vrp->sleepers) == 1) /* enable "tx-complete" interrupts before dozing off */ virtqueue_enable_cb(vrp->svq);
/* called when an rx buffer is used, and it's time to digest a message */ staticvoidrpmsg_recv_done(struct virtqueue *rvq) { structvirtproc_info *vrp = rvq->vdev->priv; structdevice *dev = &rvq->vdev->dev; structrpmsg_hdr *msg; unsignedint len, msgs_received = 0; int err;
msg = virtqueue_get_buf(rvq, &len); if (!msg) { dev_err(dev, "uhm, incoming signal, but no used buffer ?\n"); return; }
while (msg) { err = rpmsg_recv_single(vrp, dev, msg, len); if (err) break;
/* let's make sure no one deallocates ept while we use it */ if (ept) kref_get(&ept->refcount);
mutex_unlock(&vrp->endpoints_lock);
if (ept) { /* make sure ept->cb doesn't go away while we use it */ mutex_lock(&ept->cb_lock);
if (ept->cb) ept->cb(ept->rpdev, msg->data, msg_len, ept->priv, virtio32_to_cpu(vrp->vdev, msg->src));
mutex_unlock(&ept->cb_lock);
/* farewell, ept, we don't need you anymore */ kref_put(&ept->refcount, __ept_release); } else dev_warn(dev, "msg received with no recipient\n");
/* publish the real size of the buffer */ rpmsg_sg_init(&sg, msg, vrp->buf_size);
/* add the buffer back to the remote processor's virtqueue */ err = virtqueue_add_inbuf(vrp->rvq, &sg, 1, msg, GFP_KERNEL); if (err < 0) { dev_err(dev, "failed to add a virtqueue buffer: %d\n", err); return err; }
return0; }
接收时不是根据 channel name 分发,而是根据:msg->dst, 在vrp->endpoints 里查本地 endpoint。所以 rpmsg 实际运行时更像address-based messaging, channel name 主要用于设备发现和 driver 匹配。
主要步骤:
/* * the name service ept does _not_ belong to a real rpmsg channel, * and is handled by the rpmsg bus itself. * for sanity reasons, make sure a valid rpdev has _not_ sneaked * in somehow. */ if (rpdev) { dev_err(dev, "anomaly: ns ept has an rpdev handle\n"); return -EINVAL; }
/* don't trust the remote processor for null terminating the name */ msg->name[RPMSG_NAME_SIZE - 1] = '\0';
/* * create an rpmsg channel using its name and address info. * this function will be used to create both static and dynamic * channels. */ staticstruct rpmsg_device *rpmsg_create_channel(struct virtproc_info *vrp, struct rpmsg_channel_info *chinfo) { structvirtio_rpmsg_channel *vch; structrpmsg_device *rpdev; structdevice *tmp, *dev = &vrp->vdev->dev; int ret;
/* make sure a similar channel doesn't already exist */ tmp = rpmsg_find_device(dev, chinfo); if (tmp) { /* decrement the matched device's refcount back */ put_device(tmp); dev_err(dev, "channel %s:%x:%x already exist\n", chinfo->name, chinfo->src, chinfo->dst); returnNULL; }
vch = kzalloc(sizeof(*vch), GFP_KERNEL); if (!vch) returnNULL;
/* Link the channel to our vrp */ vch->vrp = vrp;
/* Assign public information to the rpmsg_device */ rpdev = &vch->rpdev; rpdev->src = chinfo->src; rpdev->dst = chinfo->dst; rpdev->ops = &virtio_rpmsg_ops;
/* * rpmsg server channels has predefined local address (for now), * and their existence needs to be announced remotely */ rpdev->announce = rpdev->src != RPMSG_ADDR_ANY;
远端发送 NS 消息到 addr 53 | v rpmsg_recv_done() | v rpmsg_recv_single() | v endpoint 53 的 callback | v rpmsg_ns_cb() | | RPMSG_NS_CREATE v rpmsg_create_channel() | v rpmsg_register_device() | v 匹配上层 rpmsg driver
RX buffer 生命周期
RX buffer 生命周期:
1 2 3 4 5 6 7 8 9 10 11
probe 时 add_inbuf ↓ 远端写入消息 ↓ Linux virtqueue_get_buf ↓ 调用 callback ↓ Linux 重新 add_inbuf ↓ 远端再次使用
所以 RX buffer 是循环复用的。
TX buffer 生命周期
TX buffer 生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13
Linux 从 sbufs 初始池拿 buffer ↓ 填消息 ↓ virtqueue_add_outbuf ↓ 远端消费 ↓ buffer 进入 used ring ↓ Linux virtqueue_get_buf 回收 ↓ 再次发送