当前位置:Linux教程 - Linux - TCP协议的核心守护进程echod

TCP协议的核心守护进程echod



         /*
    TCP协议的核心守护进程echod
    开始
    insmod tcpechod.o
    结束
    rmmod tcpechod
    */



    /* Makefile */
    all: tcpechod.o

    .c.o:
    gcc -I/usr/src/linux/include -O2 -c $<
    clean:
    rm -f *.o



    /* tcpechod.c */
    #define MODULE
    #define __KERNEL__
    #define __KERNEL_SYSCALLS__

    #include <linux/config.h>

    #if CONFIG_MODVERSIONS
    #define MODVERSIONS
    #include <linux/modversions.h>
    #endif

    #include <linux/module.h>
    #include <linux/kernel.h>
    #include <linux/sched.h>
    #include <asm/processor.h>
    #include <linux/tqueue.h>
    #include <linux/interrupt.h>
    #include <linux/malloc.h>
    #include <linux/spinlock.h>
    #include <linux/net.h>
    #include <net/sock.h>
    #include <asm/uaccess.h>

    int errno;
    #include <asm/unistd.h>

    #define MAXTHREAD 16

    struct sock_node {
    struct sock_node *prev;
    struct sock_node *next;
    struct socket *sock;
    };

    struct sock_queue {
    struct sock_node *head;
    struct sock_node *tail;
    };

    struct sock_queue sockqueue;

    void init_sockqueue(struct sock_queue *sockqueue)
    {
    sockqueue -> head = sockqueue -> tail = NULL;
    }

    static void put_sock_head(struct sock_queue *sockqueue, struct sock_node *tmp)
    {
    if (!sockqueue -> head) {
    sockqueue -> head = sockqueue -> tail = tmp;
    }
    else {
    tmp -> next = sockqueue -> head;
    sockqueue -> head -> prev = tmp;
    sockqueue -> head = tmp;
    }
    }

    static struct sock_node* out_sock_tail(struct sock_queue *sockqueue)
    {
    struct sock_node *tmp = sockqueue -> tail;
    if (sockqueue -> head == sockqueue -> tail)
    sockqueue -> head = sockqueue -> tail = NULL;
    else
    sockqueue -> tail = sockqueue -> tail -> prev;

    return tmp;
    }

    static int sockqueue_empty(struct sock_queue *sockqueue)
    {
    return (NULL == sockqueue -> head);
    }

    static struct sock_node* get_sock_head(struct sock_queue *sockqueue)
    {
    return sockqueue -> head;
    }

    static struct sock_node * get_sock_next(struct sock_node *cur)
    {
    if (cur)
    return cur -> next;
    }

    static void rm_sock_node(struct sock_queue *sockqueue, struct sock_node *cur)
    {
    if (!cur)
    return;
    if (cur -> next)
    cur -> next -> prev = cur -> prev;
    if (cur -> prev)
    cur -> prev -> next = cur -> next;
    if (cur == sockqueue -> head)
    sockqueue -> head = cur -> next;
    if (cur == sockqueue -> tail)
    sockqueue -> tail = cur -> prev;
    }

    static void rm_sock_queue(struct sock_queue *sockqueue)
    {
    struct sock_node *tmp, *tmp1;
    tmp = get_sock_head(sockqueue);
    while (tmp) {
    tmp1 = tmp;
    tmp = get_sock_next(tmp);
    rm_sock_node(sockqueue, tmp1);
    sock_release(tmp1 -> sock);
    kfree(tmp1);
    }
    }

    spinlock_t sock_lock = SPIN_LOCK_UNLOCKED;

    static struct timer_list tm;
    DECLARE_WAIT_QUEUE_HEAD(wm);
    DECLARE_WAIT_QUEUE_HEAD(wq_con);
    volatile int finish = 0;
    static int threadnum = 2;
    static int port = 7;
    static int acthreadnum;
    static int thread[MAXTHREAD];

    static int canquit[MAXTHREAD];


    /* current connection count */
    static atomic_t connectcount;


    /* init a server sock and begin listen */
    int init_sock(struct socket **s, int port)
    {
    struct sockaddr_in sin;
    int error = 0;

    error = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, s);
    if (error < 0) {
    printk("Error during creation of socket\n");
    goto out;
    }
    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = INADDR_ANY;
    sin.sin_port = htons((unsigned short)port);

    error = (*s) -> ops -> bind(*s, (struct sockaddr*)&sin, sizeof(sin));
    if (error <0) {
    printk("Error bind: %d\n", error);
    goto out;
    }

    (*s) -> sk -> reuse = 1;

    error = (*s) -> ops -> listen(*s, 48);
    if (0 != error) {
    printk("error to listen\n");
    goto out;
    }

    out:
    return error;
    }

    /* recv msg and echo back */
    void echomsg(struct socket *s)
    {
    struct msghdr msg;
    struct iovec iov;
    int len;
    char *buf;
    mm_segment_t oldfs;

    if (skb_queue_empty(&(s ->sk -> receive_queue)))
    return;

    buf = (char *)kmalloc(4096, GFP_KERNEL);

    msg.msg_name = 0;
    msg.msg_namelen = 0;
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    msg.msg_control = NULL;
    msg.msg_controllen = 0;
    msg.msg_flags = 0;

    msg.msg_iov -> iov_base = buf;
    msg.msg_iov -> iov_len = (size_t) 4095;

    len = 0;
    oldfs = get_fs();
    set_fs(get_ds());
    len = sock_recvmsg(s, &msg, 4095, MSG_DONTWAIT);
    buf[len] = 0;
    //printk("recv: %s : %d\n", buf, len);
    set_fs(oldfs);

    if ((len < 0) || (len > 4094))
    goto outdo;

    oldfs = get_fs();
    set_fs(get_ds());
    msg.msg_control = NULL;
    msg.msg_controllen = 0;
    msg.msg_iov -> iov_base = buf;
    msg.msg_iov -> iov_len = len;
    msg.msg_flags = MSG_NOSIGNAL;
    len = sock_sendmsg(s, &msg, len);
    set_fs(oldfs);
    outdo:
    kfree(buf);
    }

    /* accept a new sock connecttion */
    struct socket *acceptcon(struct socket *sock)
    {
    int error = 0;
    struct socket *newsock = NULL;
    if (atomic_read(&connectcount) > threadnum)
    return NULL;

    if (NULL == sock)
    return NULL;

    if (NULL == sock -> sk -> tp_pinfo.af_tcp.accept_queue)
    return NULL;

    while (error >= 0) {
    printk("accept routine\n");
    newsock = sock_alloc();
    if (NULL == newsock) {
    printk("error to alloc socket\n");
    break;
    }
    newsock -> type = sock -> type;
    newsock -> ops = sock -> ops;

    error = sock -> ops -> accept(sock, newsock, O_NONBLOCK);

    if (error < 0) {
    sock_release(newsock);
    printk("error to accept: %d\n", error);
    break;
    }

    if (TCP_CLOSE == newsock -> sk -> state) {
    printk("continue\n");
    sock_release(newsock);
    continue;
    }
    else
    break;
    }
    if (error < 0)
    return NULL;
    else
    return newsock;
    }

    void echowork(struct sock_queue *sq)
    {
    struct sock_node *cur, *tmp = get_sock_head(sq);
    DECLARE_WAIT_QUEUE_HEAD(wq);
    while (tmp) {
    if (TCP_CLOSE_WAIT == tmp -> sock -> sk -> state) {
    atomic_dec(&connectcount);
    cur = tmp;
    tmp = get_sock_next(cur);
    rm_sock_node(sq, cur);
    sock_release(cur -> sock);
    kfree(cur);
    } else {
    echomsg(tmp -> sock);
    tmp = get_sock_next(tmp);
    }

    interruptible_sleep_on_timeout(&wq, 1);
    }
    }

    /* echo work thread */
    int doecho(void *ptr)
    {
    struct sock_queue sq;
    struct sock_node *tmp;
    sigset_t tmpsig;
    int nothread = (int)(*((int*)ptr));
    DECLARE_WAIT_QUEUE_HEAD(wq);

    sprintf(current -> comm, "echod-tcp");

    init_sockqueue(&sq);

    spin_lock_irq(&current -> sigmask_lock);
    tmpsig = current -> blocked;
    siginitsetinv(&current -> blocked, sigmask(SIGKILL) | sigmask(SIGSTOP));
    recalc_sigpending(current);
    spin_unlock_irq(&current -> sigmask_lock);

    daemonize();

    while (!finish) {
    interruptible_sleep_on_timeout(&wq_con, 1);
    tmp = NULL;
    spin_lock(&sock_lock);
    if (!sockqueue_empty(&sockqueue))
    tmp = out_sock_tail(&sockqueue);
    spin_unlock(&sock_lock);
    if (tmp) {
    put_sock_head(&sq, tmp);
    }

    if (!sockqueue_empty(&sq)) {
    echowork(&sq);
    }
    interruptible_sleep_on_timeout(&wq, 1);
    }
    rm_sock_queue(&sq);
    canquit[nothread] = 1;
    return 0;
    }

    /* main damone */
    int echod(void *ptr)
    {
    struct socket *s = NULL;
    sigset_t tmpsig;
    int i;
    DECLARE_WAIT_QUEUE_HEAD(wq);

    sprintf(current -> comm, "echod-man");

    spin_lock_irq(&current -> sigmask_lock);
    tmpsig = current -> blocked;
    siginitsetinv(&current -> blocked, sigmask(SIGKILL) | sigmask(SIGSTOP));
    recalc_sigpending(current);
    spin_unlock_irq(&current -> sigmask_lock);

    daemonize();

    if (init_sock(&s, port))
    return -1;

    init_sockqueue(&sockqueue);

    atomic_set(&connectcount, 0);
    if (acthreadnum < 1)
    acthreadnum = threadnum;

    i = 0;
    while (i < acthreadnum) {
    canquit[i] = 0;
    thread[i] = i;
    (void)kernel_thread(doecho, &thread[i], CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
    i++;
    }

    while (!finish) {
    struct socket *newsock = acceptcon(s);
    if (newsock) {
    struct sock_node *tmp = (struct sock_node *)kmalloc(sizeof(struct sock_node), GFP_KERNEL);
    tmp -> sock = newsock;
    tmp -> next = NULL;
    tmp -> prev = NULL;
    spin_lock(&sock_lock);
    put_sock_head(&sockqueue, tmp);
    spin_unlock(&sock_lock);
    atomic_inc(&connectcount);
    wake_up_interruptible(&wq_con);
    }
    interruptible_sleep_on_timeout(&wq, 1);
    waitpid(-1, NULL, __WCLONE | WNOHANG);
    }

    while(waitpid(-1, NULL, __WCLONE | WNOHANG) > 0);

    for(i = 0;i< acthreadnum; i++) {
    while(1 != canquit[i])
    interruptible_sleep_on_timeout(&wq, 1);
    }

    sock_release(s);
    printk("echod end\n");
    wake_up_interruptible(&wm);
    return 0;
    }

    void end_damone(unsigned long ptr)
    {
    printk("wait up it\n");
    finish = 1;
    }

    MODULE_PARM(port, "i");
    MODULE_PARM(threadnum, "i");

    int init_module()
    {
    kernel_thread(echod, NULL, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
    return 0;
    }

    void cleanup_module()
    {
    end_damone(0);
    interruptible_sleep_on(&wm);
    }
    发布人:tengel 来自: