kernel-协议栈链路层分析

本篇文章主要分析一下经过纯物理层之后从设备读取的frame是如何进入协议栈的和对应封装好的frame是如何离开协议栈的,多亏协议栈的分层思路,每一层之间都可以独立分析和阅读,对于驱动来说,首先要面对的就是中断了, 本文基于4.7.2的内核简单的过了一遍.

中断处理

中断处理是分析链路层数据传输的入口。

例如在 drivers/net/ethernet/3com/3c59x.c
这个是一个华三的设备驱动, 观察这个驱动的中断处理函数可以知道,frame的接收的入口是 vortex_rx
,而当设备有足够的空间发送frame的时候就会调用 netif_wake_queue
来触发发送frame的例程。

/*
* This is the ISR for the vortex series chips.
* full_bus_master_tx == 0 && full_bus_master_rx == 0
*/

static irqreturn_t
vortex_interrupt(int irq, void *dev_id)
{
struct net_device *dev = dev_id;
struct vortex_private *vp = netdev_priv(dev);
void __iomem *ioaddr;
int status;
int work_done = max_interrupt_work;
int handled = 0;
unsigned int bytes_compl = 0, pkts_compl = 0;

ioaddr = vp->ioaddr;
spin_lock(&vp->lock);

status = ioread16(ioaddr + EL3_STATUS); // 从ioremap的地址当中读取网卡的状态,这个是设备规定的地址

if (vortex_debug > 6)
pr_debug("vortex_interrupt(). status=0x%4xn", status);

// 在中断处理的时候,设备是关中断的,这个时候可以通过观察这个status来检查设备是否有中断到来.
if ((status & IntLatch) == 0) // 有中断需要处理,但是可能已经被其他中断处理函数处理了
goto handler_exit;        /* No interrupt: shared IRQs cause this */
handled = 1;

if (status & IntReq) { // 中断请求
status |= vp->deferred;
vp->deferred = 0;
}

if (status == 0xffff)        /* h/w no longer present (hotplug)? */
goto handler_exit;

if (vortex_debug > 4)
pr_debug("%s: interrupt, status %4.4x, latency %d ticks.n",
dev->name, status, ioread8(ioaddr + Timer));

spin_lock(&vp->window_lock);
window_set(vp, 7);

do {
if (vortex_debug > 5)
pr_debug("%s: In interrupt loop, status %4.4x.n",
dev->name, status);
if (status & RxComplete) // 中断表示接收完成的时候调用 vortex_rx
vortex_rx(dev);

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug("    TX room bit was handled.n");
/* Theres room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

if (status & DMADone) {// 表示发送完成可以清除sk_buff了
if (ioread16(ioaddr + Wn7_MasterStatus) & 0x1000) {
iowrite16(0x1000, ioaddr + Wn7_MasterStatus); /* Ack the event. */
pci_unmap_single(VORTEX_PCI(vp), vp->tx_skb_dma, (vp->tx_skb->len + 3) & ~3, PCI_DMA_TODEVICE);
pkts_compl++;
bytes_compl += vp->tx_skb->len;
dev_kfree_skb_irq(vp->tx_skb); /* Release the transferred buffer */
if (ioread16(ioaddr + TxFree) > 1536) {
/*
* AKPM: FIXME: I dont think we need this.  If the queue was stopped due to
* insufficient FIFO room, the TxAvailable test will succeed and call
* netif_wake_queue()
*/
netif_wake_queue(dev);
} else { /* Interrupt when FIFO has room for max-sized packet. */
iowrite16(SetTxThreshold + (1536>>2), ioaddr + EL3_CMD);
netif_stop_queue(dev);
}
}
}
/* Check for all uncommon interrupts at once. */
if (status & (HostError | RxEarly | StatsFull | TxComplete | IntReq)) {
if (status == 0xffff)
break;
if (status & RxEarly)
vortex_rx(dev);
spin_unlock(&vp->window_lock);
vortex_error(dev, status);
spin_lock(&vp->window_lock);
window_set(vp, 7);
}

if (--work_done < 0) { // 最多处理work_done个frame
pr_warn("%s: Too much work in interrupt, status %4.4xn",
dev->name, status);
/* Disable all pending interrupts. */
do {
vp->deferred |= status; // 把当前状态保存起来等下次中断的时候处理
iowrite16(SetStatusEnb | (~vp->deferred & vp->status_enable),
ioaddr + EL3_CMD);
iowrite16(AckIntr | (vp->deferred & 0x7ff), ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_CMD)) & IntLatch); // 把中断清掉?
/* The timer will reenable interrupts. */
mod_timer(&vp->timer, jiffies + 1*HZ);
break;
}
/* Acknowledge the IRQ. */
iowrite16(AckIntr | IntReq | IntLatch, ioaddr + EL3_CMD);
} while ((status = ioread16(ioaddr + EL3_STATUS)) & (IntLatch | RxComplete));// 当有pending的中断并且是接收完成的状态

netdev_completed_queue(dev, pkts_compl, bytes_compl);
spin_unlock(&vp->window_lock);

if (vortex_debug > 4)
pr_debug("%s: exiting interrupt, status %4.4x.n",
dev->name, status);
handler_exit:
spin_unlock(&vp->lock);
return IRQ_RETVAL(handled);
}

当中断到来的时候检查 RxComplete
,如果这个状态置位了说明有frame可以处理,当状态有 TxAvailable
的时候表示网卡的缓冲有空可以尝试发送frame,这样的检查会循环很多次,直到出现错误或者超过规定的循环次数 max_interrupt_work
,这个值对于这个设备来说是32,也就是说最多接受32帧就会退出中断处理。

frame的接收

vortex_rx
最终会调用 netif_rx
这个函数很关键, vortex_rx
会通过 netdev_alloc_skb
分配一个 sk_buff
,这个是一个会贯穿整个协议栈的一个buff,然后从设备中拷贝frame,拷贝的方式也有很多比如直接读取或者利用DMA。

从DMA读取到内存当中比如截取的这段代码

// 转换成总线地址给DMA设备,启动DMA传输,然后循环检查传输状态
// 最后取消总线地址的映射.
dma_addr_t dma = pci_map_single(VORTEX_PCI(vp), skb_put(skb, pkt_len),
pkt_len, PCI_DMA_FROMDEVICE);
iowrite32(dma, ioaddr + Wn7_MasterAddr);
iowrite16((skb->len + 3) & ~3, ioaddr + Wn7_MasterLen);
iowrite16(StartDMAUp, ioaddr + EL3_CMD);
while (ioread16(ioaddr + Wn7_MasterStatus) & 0x8000)
;
pci_unmap_single(VORTEX_PCI(vp), dma, pkt_len, PCI_DMA_FROMDEVICE);

也有不饶过CPU直接读取的

ioread32_rep(ioaddr + RX_FIFO,
skb_put(skb, pkt_len),
(pkt_len + 3) >> 2);

接着调用 eth_type_trans
确定对应的protocol,并且最终调用 netif_rx
来继续处理接收任务。

到了 netif_rx
就是一个通用流程了, netif_rx
调用了 netif_rx_internal
,这个函数会先通过 net_timestamp_check
检查 sk_buff
的时间戳,并且设置,然后调用 enqueue_to_backlog

/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;

sd = &per_cpu(softnet_data, cpu); // 获取per-cpu softnet_data

local_irq_save(flags);

rps_lock(sd);
if (!netif_running(skb->dev)) // 如果设备已经没有运行的直接丢frame
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue); // 获取softnet_data的&sk_buff的队列长度
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { // 如果没有超过最大长度并且没有被限制
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags); // 加入队列并且开中断
return NET_RX_SUCCESS;
}
// 如果队列为空可以尝试调度 backlog device,
// 再把frame加入到队列当中。
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}

drop:
sd->dropped++;
rps_unlock(sd);

local_irq_restore(flags);

atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

进入到 ____napi_schedule
则很简单了,就是唤起把 backlog
这个 softnet_data
上的device加入到 poll_list
当中然后唤起softirq来处理。

/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

这里要提的是,kernel对于frame的处理有一个比较新的API,称之为NAPI。

对于NAPI来说, net_rx_action
结合了轮询和中断, 当设备有中断表示收到frame的时候会把它加入 softnet_data
poll_list
, 这个时候设备不再发出中断,然后通过轮询这些设备是否有剩余frame处理,并且限制最大处理数量,这样能保证cpu不会承受过大的中断load的压力,并且可以使得设备之间能够相对公平获得处理的机会,而不是中断更频繁的设备获得的机会更多。

对于使用NAPI的设备来说显然需要提供poll接口来供查询之用,同时用 NET_RX_SOFTIRQ
触发软中断执行 net_rx_action
这个软中断处理函数,而不需要 netif_rx
这个接口。

NET_RX_SOFTIRQ
做的事情就是把设备加入 poll
列表并且触发softirq. 对于没有使用NAPI的设备来说,会使用per-cpu 结构 softnet_data
中的 backlog_dev
(是一个假的胶水层的封装)来替代放入对应的 poll
列表中,然后再进入 netif_rx_schedule
的例程来处理。也就是说NAPI-unaware的设备用的是 backlog_dev
而NAPI-aware的设备用的是自己的device结构体。

相反的把设备从轮询列表中移除依靠的是 netif_rx_complete
. 这样轮询检查的时候就不会处理对应的设备了.

这里说了这么久的轮询不要误会frame的主力是轮询,显然这是不行的,因为网络数据要求接受很快,还是中断驱动的,只不过为了物尽其用,既然你有数据帧还在就不要中断告诉我,我继续处理就可以了。

现在看一下加入到 poll_list
之后, NET_RX_SOFTIRQ
软中断是如何工作的。关于软中断的实现可以参考参考列表中的第二个链接,我本来想自己总结一下,但是发现这篇文章确实总结的很好,所以保存一下就好了。

补充一点, softnet_data->input_pkt_queue
是frame的缓存队列,这个队列有一个最大值 netdev_max_backlog
,目前这个值是1000,也就是每个CPU最多有1000个frame没处理。对于有自己device的设备,frame需要通过设备的 poll
方法来获取。

net_rx_action
对应的是软中断 NET_RX_SOFTIRQ
的处理函数。之前说过 net_rx_action
的工作方式,这里看一下具体的代码。

static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget; // 一个budget用于限制运行时间,目前是300
LIST_HEAD(list);
LIST_HEAD(repoll);

local_irq_disable();
list_splice_init(&sd->poll_list, &list); // 把poll_list合并到list上并且把poll_list清空
local_irq_enable();

for (;;) {
struct napi_struct *n;

if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}

n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // 每调用一次会减掉相应的budget,如果还有work没完成还需要poll就会加入到repoll链表里。

/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || // 如果budge耗尽或者超过了两个jiffies就会停止
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}

__kfree_skb_flush();
local_irq_disable();

list_splice_tail_init(&sd->poll_list, &list); // 再把poll_list合并到list当中,并且清空poll_list。
list_splice_tail(&repoll, &list); // 然后把repoll合并到list当中。
list_splice(&list, &sd->poll_list); // 再把list合并到poll_list当中。
if (!list_empty(&sd->poll_list)) // 这几步的过程就是把需要repoll的设备和当前的poll_list合并
__raise_softirq_irqoff(NET_RX_SOFTIRQ); // 如果还有需要poll的设备就再触发软中断.

net_rps_action_and_irq_enable(sd);
}

整个过程就是检查 poll_list
并且轮询调用 poll
方法。接下来具体看一下 poll
方法的相关内容。以非NAPI设备为例,使用的是 backlog
poll
方法,对应的方法可以在 net/core/dev.c
当中找到,在初始化函数 net_dev_init
当中 sd->backlog.poll = process_backlog;
给每个per-cpu结构的 softnet_data
都是指向了这个poll函数。

static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}

napi->weight = weight_p; // 这个值目前是32
local_irq_disable();
while (1) {
struct sk_buff *skb;

while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
local_irq_enable();
__netif_receive_skb(skb); // 取出的skb交给__netif_receive_skb处理
rcu_read_unlock();
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}

rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
rps_unlock(sd);

break;
}

skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
rps_unlock(sd);
}
local_irq_enable();

return work;
}

这个函数的主体就是把 &sd->input_pkt_queue
交给 &sd->process_queue
然后从 &sd->process_queue
当中取出 sk_buff
,再调用 __netif_receive_skb
,进一步处理.我想老是把 &sd->input_pkt_queue
拷贝并且清空应该是因为希望不因为锁独占这个队列太久的原因.

__netif_receive_skb
的内容主要是根据 skb->protocol
,遍历 &skb->dev->ptype_all
然后将frame交给L3的 ptype->func
处理,还有一些在这一层需要处理的特性,比如bridging.

frame的发送

在frame的发送路径,主要包含几个任务,开关设备的发送功能,调度设备进行发送,选择在设备发送队列的frame进行发送,还要传输过程的本身.而且发送的例程也是类似接收的过程,有对应的softirq( NET_TX_SOFTIRQ
)和对应的handler, net_tx_action
。和 poll_list
对应的是 output_queue
,也是一个等待发送的设备列表。

__LINK_STATE_START
__LINK_ STATE_XOFF
__LINK_STATE_RX_SCHED
__LINK_STATE_SCHED
也是对应的.回顾开头中断的代码

if (status & TxAvailable) { // 中断表示可以传输的时候
if (vortex_debug > 5)
pr_debug("    TX room bit was handled.n");
/* Theres room in the FIFO for a full-sized packet. */
iowrite16(AckIntr | TxAvailable, ioaddr + EL3_CMD);
netif_wake_queue (dev);
}

就是说当状态可用的时候,尝试调用 netif_wake_queue
来触发frame的发送。

首先来看一下如何选择发送的设备。

/**
*      netif_wake_queue - restart transmit
*      @dev: network device
*
*      Allow upper layers to call the device hard_start_xmit routine.
*      Used for flow control when transmit resources are available.
*/
static inline void netif_wake_queue(struct net_device *dev)
{
netif_tx_wake_queue(netdev_get_tx_queue(dev, 0));
}

这个函数内部调用了

static inline void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

主体就是把设备加入 output_queue
(通过 ->next_sched
连接)然后唤起软中断 NET_TX_SOFTIRQ

再来看看软中断的主体。

首先是遍历 completion_queue
来释放 sk_buff
这个连接是通过 dev_kfree_skb_irq
添加的,和正常的 dev_kfree_skb
不同,它是把 sk_buff
加入到释放列表中就快速返回了, 然后就会遍历设备列表寻找可运行的设备,调用 qdisk_run

void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) { // 检查completion_queue逐一释放sk_buff, 他们是通过dev_kfree_skb_irq,因为他不真的释放sk_buff,而是把sk_buff移到completio_queue当中.
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;
clist = clist->next;

WARN_ON(atomic_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue; // 拷贝并首尾相连.
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
if (spin_trylock(root_lock)) {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
qdisc_run(q); // qdisk_run
spin_unlock(root_lock);
} else {
if (!test_bit(__QDISC_STATE_DEACTIVATED,
&q->state)) {
__netif_reschedule(q);
} else {
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED,
&q->state);
}
}
}
}
}

qdisc_run
首先检查设备的运行状态然后调用 __qdisc_run
,这个函数的主体就是调用 qdisc_restart
,直到超过限制或者需要让出时间CPU了,最后清空qdisc的RUNNING状态。

void __qdisc_run(struct Qdisc *q)
{
int quota = weight_p;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. weve exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

qdisc_restart
的主体是从队列中取出 sk_buff
,然后调用 sch_direct_xmit
,它的功能是调用 dev_hard_start_xmit
来运行设备驱动的指定的传输方法 hard_start_xmit
,如果没有成功则把 sk_buff
重新加入到出队当中。

/*
* Transmit possibly several skbs, and handle the return status as
* required. Holding the __QDISC___STATE_RUNNING bit guarantees that
* only one CPU can execute this function.
*
* Returns to the caller:
*                              0  - queue is empty or throttled.
*                              >0 - queue is not empty.
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;

/* And release qdisc */
spin_unlock(root_lock);

/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
skb = validate_xmit_skb_list(skb, dev);

if (likely(skb)) {
HARD_TX_LOCK(dev, txq, smp_processor_id());
if (!netif_xmit_frozen_or_stopped(txq))
skb = dev_hard_start_xmit(skb, dev, txq, &ret);

HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
}
spin_lock(root_lock);

if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %dn",
dev->name, ret, q->q.qlen);

ret = dev_requeue_skb(skb, q);
}

if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;

return ret;
}

至此整个L2的发送和接收就大致有了一个了解,并且能够理解整个的工作过程,这里有一点没有讲到的是L2的转发机制,比如STP等协议和实现,可能会在接下来的文章中继续剖析.

参考:

  1. Linux 下DMA浅析
  2. linux kernel的中断子系统之(八):softirq
  3. Linux Network Internals
  • 版权声明: 本文源自互联网, 于3个月前,由整理发表,共 13090字。
  • 原文链接:点此查看原文