TCP协议的核心守护进程echod
/*
TCP协议的核心守护进程echod
开始
insmod tcpechod.o
结束
rmmod tcpechod
*/
/* Makefile */
all: tcpechod.o
.c.o:
gcc -I/usr/src/linux/include -O2 -c $<
clean:
rm -f *.o
/* tcpechod.c */
#define MODULE
#define __KERNEL__
#define __KERNEL_SYSCALLS__
#include <linux/config.h>
#if CONFIG_MODVERSIONS
#define MODVERSIONS
#include <linux/modversions.h>
#endif
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/sched.h>
#include <asm/processor.h>
#include <linux/tqueue.h>
#include <linux/interrupt.h>
#include <linux/malloc.h>
#include <linux/spinlock.h>
#include <linux/net.h>
#include <net/sock.h>
#include <asm/uaccess.h>
int errno;
#include <asm/unistd.h>
#define MAXTHREAD 16
struct sock_node {
struct sock_node *prev;
struct sock_node *next;
struct socket *sock;
};
struct sock_queue {
struct sock_node *head;
struct sock_node *tail;
};
struct sock_queue sockqueue;
void init_sockqueue(struct sock_queue *sockqueue)
{
sockqueue -> head = sockqueue -> tail = NULL;
}
static void put_sock_head(struct sock_queue *sockqueue, struct sock_node *tmp)
{
if (!sockqueue -> head) {
sockqueue -> head = sockqueue -> tail = tmp;
}
else {
tmp -> next = sockqueue -> head;
sockqueue -> head -> prev = tmp;
sockqueue -> head = tmp;
}
}
static struct sock_node* out_sock_tail(struct sock_queue *sockqueue)
{
struct sock_node *tmp = sockqueue -> tail;
if (sockqueue -> head == sockqueue -> tail)
sockqueue -> head = sockqueue -> tail = NULL;
else
sockqueue -> tail = sockqueue -> tail -> prev;
return tmp;
}
static int sockqueue_empty(struct sock_queue *sockqueue)
{
return (NULL == sockqueue -> head);
}
static struct sock_node* get_sock_head(struct sock_queue *sockqueue)
{
return sockqueue -> head;
}
static struct sock_node * get_sock_next(struct sock_node *cur)
{
if (cur)
return cur -> next;
}
static void rm_sock_node(struct sock_queue *sockqueue, struct sock_node *cur)
{
if (!cur)
return;
if (cur -> next)
cur -> next -> prev = cur -> prev;
if (cur -> prev)
cur -> prev -> next = cur -> next;
if (cur == sockqueue -> head)
sockqueue -> head = cur -> next;
if (cur == sockqueue -> tail)
sockqueue -> tail = cur -> prev;
}
static void rm_sock_queue(struct sock_queue *sockqueue)
{
struct sock_node *tmp, *tmp1;
tmp = get_sock_head(sockqueue);
while (tmp) {
tmp1 = tmp;
tmp = get_sock_next(tmp);
rm_sock_node(sockqueue, tmp1);
sock_release(tmp1 -> sock);
kfree(tmp1);
}
}
spinlock_t sock_lock = SPIN_LOCK_UNLOCKED;
static struct timer_list tm;
DECLARE_WAIT_QUEUE_HEAD(wm);
DECLARE_WAIT_QUEUE_HEAD(wq_con);
volatile int finish = 0;
static int threadnum = 2;
static int port = 7;
static int acthreadnum;
static int thread[MAXTHREAD];
static int canquit[MAXTHREAD];
/* current connection count */
static atomic_t connectcount;
/* init a server sock and begin listen */
int init_sock(struct socket **s, int port)
{
struct sockaddr_in sin;
int error = 0;
error = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, s);
if (error < 0) {
printk("Error during creation of socket\n");
goto out;
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons((unsigned short)port);
error = (*s) -> ops -> bind(*s, (struct sockaddr*)&sin, sizeof(sin));
if (error <0) {
printk("Error bind: %d\n", error);
goto out;
}
(*s) -> sk -> reuse = 1;
error = (*s) -> ops -> listen(*s, 48);
if (0 != error) {
printk("error to listen\n");
goto out;
}
out:
return error;
}
/* recv msg and echo back */
void echomsg(struct socket *s)
{
struct msghdr msg;
struct iovec iov;
int len;
char *buf;
mm_segment_t oldfs;
if (skb_queue_empty(&(s ->sk -> receive_queue)))
return;
buf = (char *)kmalloc(4096, GFP_KERNEL);
msg.msg_name = 0;
msg.msg_namelen = 0;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg.msg_iov -> iov_base = buf;
msg.msg_iov -> iov_len = (size_t) 4095;
len = 0;
oldfs = get_fs();
set_fs(get_ds());
len = sock_recvmsg(s, &msg, 4095, MSG_DONTWAIT);
buf[len] = 0;
//printk("recv: %s : %d\n", buf, len);
set_fs(oldfs);
if ((len < 0) || (len > 4094))
goto outdo;
oldfs = get_fs();
set_fs(get_ds());
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_iov -> iov_base = buf;
msg.msg_iov -> iov_len = len;
msg.msg_flags = MSG_NOSIGNAL;
len = sock_sendmsg(s, &msg, len);
set_fs(oldfs);
outdo:
kfree(buf);
}
/* accept a new sock connecttion */
struct socket *acceptcon(struct socket *sock)
{
int error = 0;
struct socket *newsock = NULL;
if (atomic_read(&connectcount) > threadnum)
return NULL;
if (NULL == sock)
return NULL;
if (NULL == sock -> sk -> tp_pinfo.af_tcp.accept_queue)
return NULL;
while (error >= 0) {
printk("accept routine\n");
newsock = sock_alloc();
if (NULL == newsock) {
printk("error to alloc socket\n");
break;
}
newsock -> type = sock -> type;
newsock -> ops = sock -> ops;
error = sock -> ops -> accept(sock, newsock, O_NONBLOCK);
if (error < 0) {
sock_release(newsock);
printk("error to accept: %d\n", error);
break;
}
if (TCP_CLOSE == newsock -> sk -> state) {
printk("continue\n");
sock_release(newsock);
continue;
}
else
break;
}
if (error < 0)
return NULL;
else
return newsock;
}
void echowork(struct sock_queue *sq)
{
struct sock_node *cur, *tmp = get_sock_head(sq);
DECLARE_WAIT_QUEUE_HEAD(wq);
while (tmp) {
if (TCP_CLOSE_WAIT == tmp -> sock -> sk -> state) {
atomic_dec(&connectcount);
cur = tmp;
tmp = get_sock_next(cur);
rm_sock_node(sq, cur);
sock_release(cur -> sock);
kfree(cur);
} else {
echomsg(tmp -> sock);
tmp = get_sock_next(tmp);
}
interruptible_sleep_on_timeout(&wq, 1);
}
}
/* echo work thread */
int doecho(void *ptr)
{
struct sock_queue sq;
struct sock_node *tmp;
sigset_t tmpsig;
int nothread = (int)(*((int*)ptr));
DECLARE_WAIT_QUEUE_HEAD(wq);
sprintf(current -> comm, "echod-tcp");
init_sockqueue(&sq);
spin_lock_irq(¤t -> sigmask_lock);
tmpsig = current -> blocked;
siginitsetinv(¤t -> blocked, sigmask(SIGKILL) | sigmask(SIGSTOP));
recalc_sigpending(current);
spin_unlock_irq(¤t -> sigmask_lock);
daemonize();
while (!finish) {
interruptible_sleep_on_timeout(&wq_con, 1);
tmp = NULL;
spin_lock(&sock_lock);
if (!sockqueue_empty(&sockqueue))
tmp = out_sock_tail(&sockqueue);
spin_unlock(&sock_lock);
if (tmp) {
put_sock_head(&sq, tmp);
}
if (!sockqueue_empty(&sq)) {
echowork(&sq);
}
interruptible_sleep_on_timeout(&wq, 1);
}
rm_sock_queue(&sq);
canquit[nothread] = 1;
return 0;
}
/* main damone */
int echod(void *ptr)
{
struct socket *s = NULL;
sigset_t tmpsig;
int i;
DECLARE_WAIT_QUEUE_HEAD(wq);
sprintf(current -> comm, "echod-man");
spin_lock_irq(¤t -> sigmask_lock);
tmpsig = current -> blocked;
siginitsetinv(¤t -> blocked, sigmask(SIGKILL) | sigmask(SIGSTOP));
recalc_sigpending(current);
spin_unlock_irq(¤t -> sigmask_lock);
daemonize();
if (init_sock(&s, port))
return -1;
init_sockqueue(&sockqueue);
atomic_set(&connectcount, 0);
if (acthreadnum < 1)
acthreadnum = threadnum;
i = 0;
while (i < acthreadnum) {
canquit[i] = 0;
thread[i] = i;
(void)kernel_thread(doecho, &thread[i], CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
i++;
}
while (!finish) {
struct socket *newsock = acceptcon(s);
if (newsock) {
struct sock_node *tmp = (struct sock_node *)kmalloc(sizeof(struct sock_node), GFP_KERNEL);
tmp -> sock = newsock;
tmp -> next = NULL;
tmp -> prev = NULL;
spin_lock(&sock_lock);
put_sock_head(&sockqueue, tmp);
spin_unlock(&sock_lock);
atomic_inc(&connectcount);
wake_up_interruptible(&wq_con);
}
interruptible_sleep_on_timeout(&wq, 1);
waitpid(-1, NULL, __WCLONE | WNOHANG);
}
while(waitpid(-1, NULL, __WCLONE | WNOHANG) > 0);
for(i = 0;i< acthreadnum; i++) {
while(1 != canquit[i])
interruptible_sleep_on_timeout(&wq, 1);
}
sock_release(s);
printk("echod end\n");
wake_up_interruptible(&wm);
return 0;
}
void end_damone(unsigned long ptr)
{
printk("wait up it\n");
finish = 1;
}
MODULE_PARM(port, "i");
MODULE_PARM(threadnum, "i");
int init_module()
{
kernel_thread(echod, NULL, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
return 0;
}
void cleanup_module()
{
end_damone(0);
interruptible_sleep_on(&wm);
}
发布人:tengel 来自: