Linux网络编程 - TCP Socket 简单练习:线程池实现并发
分类:网络频道
时间:2019-11-05
关键词:
这里讲的仅仅是一个简单的server的模型!为了处理同时来到很多小的链接请求(
解释:就是请求很简单,持续时间很短,那么if
server在请求到来时在fork来处理它,有可能fork的时间比应答请求的还要少,那么就是不合理的服务设计
),所以我们采用的是“prefork”和“prethread”模型!
Unix 网络编程 上的4个模型是:prefork:主进程accept
服务器函数执行流程
main
init_system
creat_pthread_pool
child_work
thread_manager
task_manager
process_client
monitor
sys_clean
子进程accept
Makefile文件
[plain] view
plain copy
print?

- CC = gcc
- TARGET = pthread_pool
- SRC = pthread_pool.c base.c
- OBJECT = pthread_pool.o base.o
- INCLUDES = -I./
- LDFLAGS = -lpthread
-
- all:$(TARGET)
-
- $(OBJECT):$(SRC)
- $(CC) -c $(INCLUDES) ${SRC}
-
- $(TARGET):$(OBJECT)
- $(CC) -o $@ $(OBJECT) $(LDFLAGS)
-
- .PHONY:clean
-
- clean:
- @rm -rf $(OBJECT) $(TARGET) *~
prethread:
服务器代码
主线程accept
头文件
[cpp] view
plain copy
print?

- #ifndef __PTHREAD_POOL_H__
-
- #define __PTHREAD_POOL_H__
-
- #include <stdio.h>
- #include <pthread.h>
- #include <stdlib.h>
- #include <string.h>
- #include <time.h>
- #include <sys/syscall.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <assert.h>
- #include <sys/stat.h>
- #include <sys/types.h>
- #include <fcntl.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <net/if.h>
- #include <sys/ioctl.h>
- #include <errno.h>
-
- #define THREAD_MAX_NUM 100 /* max number of thread. */
- #define THREAD_DEF_NUM 20 /* by default ,number of thread. */
- #define THREAD_MIN_NUM 5 /* min number of thread pool. */
- #define LISNUM<span style="white-space:pre"> </span>5
- #define PORT 9001
- #define MAXBUF 1024
-
-
- /*
- * *ds of the every task. make all task in a single link
- */
- //任务结构节点,用于描述每个任务的具体属性
- typedef struct task_node
- {
- void *arg; /* fun arg. */
- void *(*fun)(void *); /* the real work of the task. */
- pthread_t tid; /* which thread exec this task. */
- int work_id; /* task id. */
- int flag; /* 1: assigned, 0: unassigned. */
- struct task_node *next;
- pthread_mutex_t mutex; /* when modify this ds and exec the work,lock the task ds. */
- } TASK_NODE;
-
-
- /*
- * *the ds of the task_queue
- */
- //任务队列结构,用于控制整个任务队列
- typedef struct task_queue
- {
- pthread_mutex_t mutex;
- pthread_cond_t cond; /* when no task, the manager thread wait for ;when a new task come, signal. */
- struct task_node *head; /* point to the task_link. */
- int number; /* current number of task, include unassinged and assigned but no finished. */
- } TASK_QUEUE_T;
-
-
- /*
- * *the ds of every thread, make all thread in a double link queue.
- */
- //线程结构节点,用于描述每个线程的具体属性
- typedef struct pthread_node
- {
- pthread_t tid; /* the pid of this thread in kernel,the value is syscall return . */
- int flag; /* 1:busy, 0:free. */
- struct task_node *work; /* if exec a work, which work. */
- struct pthread_node *next;
- struct pthread_node *prev;
- pthread_cond_t cond; /* when assigned a task, signal this child thread by manager. */
- pthread_mutex_t mutex;
- } THREAD_NODE;
-
-
- /*
- * *the ds of the thread queue
- */
- //线程队列结构,用于控制空闲线程队列和忙碌线程队列
- typedef struct pthread_queue
- {
- int number; /* the number of thread in this queue. */
- struct pthread_node *head;
- struct pthread_node *rear;
- pthread_cond_t cond; /* when no idle thread, the manager wait for ,or when a thread return with idle, signal. */
- pthread_mutex_t mutex;
- } PTHREAD_QUEUE_T;
-
- //在pthread_poll()中定义的三个结构的指针
- extern PTHREAD_QUEUE_T *pthread_queue_idle; /* the idle thread double link queue. */
- extern PTHREAD_QUEUE_T *pthread_queue_busy; /* the work thread double link queue. */
- extern TASK_QUEUE_T *task_queue_head; /* the task queuee single link list. */
-
- void *child_work( void *ptr );
-
- void create_pthread_pool( void );
-
- void init_system( void );
-
- void *thread_manager( void *ptr );
-
- void *prcoess_client( void *ptr );
-
- void *task_manager( void *ptr );
-
- void *monitor( void *ptr );
-
- void sys_clean( void );
-
- #endif
子线程accept ( 姑且使用主线程和子线程来描述 )
基础函数
[cpp] view
plain copy
print?

- #include "pthread_pool.h"
-
-
- /*
- * *child_work:the code exec in child thread
- * *ptr: the ds of thread_node of current thread.
- * *return :nothing.void * just avoid warning.
- */
- /*
- child_work为创建的线程执行的函数
- 主要用来等待线程属性状态的变化,来判断是否有任务要执行
- 并且判断线程的工作状态的变化,来决定加入哪个线程队列(空闲还是忙碌)
- */
- void *
- child_work( void *ptr )
- {
- //这里的ptr为(void *) &temp[i]
- THREAD_NODE * self = (THREAD_NODE *) ptr;
-
- /*modify the tid attribute the first time exec */
- pthread_mutex_lock( &self->mutex );
- self->tid = syscall( SYS_gettid );//获得线程自身id
- pthread_mutex_unlock( &self->mutex );
-
- while ( 1 )
- {
- pthread_mutex_lock( &self->mutex );
-
- /*if no task exec,blocked */
- /*
- 关键的一句话
- 从线程的属性struct task_node *work(即为self->work)
- 判断是否已给当前线程分配任务
- */
- //如果该线程尚没有分配任务,则通过条件变量阻塞等待条件变量self->cond
- if ( NULL == self->work )
- {
- pthread_cond_wait( &self->cond, &self->mutex );
- }
-
- pthread_mutex_lock( &self->work->mutex );
-
- /*execute the real work.
- 开始执行任务
- */
- self->work->fun( self->work->arg );
-
- /*after finished the work
- 任务执行完后,撤销任务的属性,并销毁任务本身,释放其占用的资源
- */
- self->work->fun = NULL;
- self->work->flag = 0;
- self->work->tid = 0;
- self->work->next = NULL;
-
- free( self->work->arg );
-
- pthread_mutex_unlock( &self->work->mutex ); /* unlock the task */
- pthread_mutex_destroy( &self->work->mutex );
-
- /*free the task space */
- free( self->work );
-
- /*make self thread no work */
- self->work = NULL;
- self->flag = 0;
-
-
- /*
- * *get new task from the task_link if not NULL.
- * *there no idle thread if there are task to do.
- * *if on task ,make self idle and add to the idle queue.
- */
- /*
- 执行完上一个任务后,查看任务队列中是否还有任务
- */
- pthread_mutex_lock( &task_queue_head->mutex );
- if ( task_queue_head->head != NULL )//如果有任务,则分配任务
- {
- TASK_NODE * temp = task_queue_head->head;
-
- /*get the first task */
- task_queue_head->head = task_queue_head->head->next;
-
- /*modify self thread attribute */
- self->flag = 1;
- self->work = temp;
- temp->tid = self->tid;
- temp->next = NULL;
- temp->flag = 1;
-
- task_queue_head->number--;
-
- pthread_mutex_unlock( &task_queue_head->mutex );
-
- pthread_mutex_unlock( &self->mutex );
-
- continue;
- }
- else //如果没有任务,从忙碌线程队列中删除此线程,将其加入空闲线程队列中
- {
- /*no task need to exec, add self to idle queue and del from busy queue */
- pthread_mutex_unlock( &task_queue_head->mutex );
-
- pthread_mutex_lock( &pthread_queue_busy->mutex );
-
- /*self is the last execte thread
- 如果此线程是忙碌的线程队列中的仅剩的一个线程
- */
- if ( pthread_queue_busy->head == self
- && pthread_queue_busy->rear == self )
- {
- pthread_queue_busy->head = pthread_queue_busy->rear = NULL;
- self->next = self->prev = NULL;
- }
- /*the first one thread in busy queue
- 如果此线程是忙碌的线程队列中的第一个线程
- */
- else if ( pthread_queue_busy->head == self
- && pthread_queue_busy->rear != self )
- {
- pthread_queue_busy->head = pthread_queue_busy->head->next;
- pthread_queue_busy->head->prev = NULL;
-
- self->next = self->prev = NULL;
- }
- /*the last one thread in busy queue
- 如果此线程是忙碌的线程队列中的末尾的一个线程
- */
- else if ( pthread_queue_busy->head != self
- && pthread_queue_busy->rear == self )
- {
- pthread_queue_busy->rear = pthread_queue_busy->rear->prev;
- pthread_queue_busy->rear->next = NULL;
-
- self->next = self->prev = NULL;
- }
- /*middle one
- 如果此线程是忙碌的线程队列中的中间的某个线程
- */
- else{
- self->next->prev = self->prev;
- self->prev->next = self->next;
- self->next = self->prev = NULL;
- }
-
- pthread_mutex_unlock( &pthread_queue_busy->mutex );
-
- /*add self to the idle queue
- 将此线程加入空闲线程队列中
- */
- pthread_mutex_lock( &pthread_queue_idle->mutex );
-
- /*now the idle queue is empty
- 判断空闲线程队列的情况,根据不同的情况将此线程加入不同的位置
- */
- if ( pthread_queue_idle->head == NULL
- || pthread_queue_idle->rear == NULL )
- {
- pthread_queue_idle->head = pthread_queue_idle->rear = self;
- self->next = self->prev = NULL;
- }else {
- self->next = pthread_queue_idle->head;
- self->prev = NULL;
- self->next->prev = self;
-
- pthread_queue_idle->head = self;
- pthread_queue_idle->number++;
- }
-
- pthread_mutex_unlock( &pthread_queue_idle->mutex );
-
- pthread_mutex_unlock( &self->mutex );
-
- /*signal have idle thread
- 告知阻塞等待条件变量pthread_queue_idle->cond的位置已有空闲线程
- */
- pthread_cond_signal( &pthread_queue_idle->cond );
- }
- }
- }
-
-
- /*
- * *create thread pool when the system on, and thread number is THREAD_DEF_NUM.
- * *when init, initial all the thread into a double link queue and all wait fo self->cond.
- */
- void
- create_pthread_pool( void )
- {
- //分配线程节点
- THREAD_NODE * temp =
- (THREAD_NODE *) malloc( sizeof(THREAD_NODE) * THREAD_DEF_NUM );
-
- if ( temp == NULL )
- {
- printf( " malloc failuren" );
- exit( EXIT_FAILURE );
- }
-
- /*init as a double link queue
- 初始化为双向链式队列
- */
- int i;
-
- //THREAD_DEF_NUM为线程池中线程的最大数量
- //for循环开始创建线程池
- for ( i = 0; i < THREAD_DEF_NUM; i++ )
- {
- temp[i].tid = i + 1;
- temp[i].work = NULL;
- temp[i].flag = 0;
-
- if ( i == THREAD_DEF_NUM - 1 )
- temp[i].next = NULL;
-
- if ( i == 0 )
- temp[i].prev = NULL;
-
- //双向链表的体现
- temp[i].prev = &temp[i - 1];
- temp[i].next = &temp[i + 1];
-
- pthread_cond_init( &temp[i].cond, NULL );
- pthread_mutex_init( &temp[i].mutex, NULL );
-
- /*create this thread
- 在此创建线程,各个线程执行的函数为child_work
- */
- pthread_create( &temp[i].tid, NULL, child_work, (void *) &temp[i] );
- }
-
- /*modify the idle thread queue attribute
- 修改空闲线程队列的属性
- */
- pthread_mutex_lock( &pthread_queue_idle->mutex );
-
- pthread_queue_idle->number = THREAD_DEF_NUM;
- //此句就将刚创建的那些线程给空闲线程队列
- pthread_queue_idle->head = &temp[0];
- pthread_queue_idle->rear = &temp[THREAD_DEF_NUM - 1];
-
- pthread_mutex_unlock( &pthread_queue_idle->mutex );
- }
-
-
- /*
- * *init_system :init the system glob pointor.
- */
- void
- init_system( void )
- {
- /*init the pthread_queue_idle
- 初始化空闲线程队列,采用的是普通的双向链式结构(未循环)
- */
- pthread_queue_idle =
- (PTHREAD_QUEUE_T *) malloc( sizeof(PTHREAD_QUEUE_T) );
-
- pthread_queue_idle->number = 0;
- pthread_queue_idle->head = NULL;
- pthread_queue_idle->rear = NULL;
- pthread_mutex_init( &pthread_queue_idle->mutex, NULL );
- pthread_cond_init( &pthread_queue_idle->cond, NULL );
-
- /*init the pthread_queue_busy
- 初始化空闲线程队列,采用的是普通的双向链式结构(未循环)
- */
- pthread_queue_busy =
- (PTHREAD_QUEUE_T *) malloc( sizeof(PTHREAD_QUEUE_T) );
-
- pthread_queue_busy->number = 0;
- pthread_queue_busy->head = NULL;
- pthread_queue_busy->rear = NULL;
- pthread_mutex_init( &pthread_queue_busy->mutex, NULL );
- pthread_cond_init( &pthread_queue_busy->cond, NULL );
-
- /*init the task_queue_head
- 初始化任务队列,采用单向链表
- */
- task_queue_head = (TASK_QUEUE_T *) malloc( sizeof(TASK_QUEUE_T) );
-
- task_queue_head->head = NULL;
- task_queue_head->number = 0;
- pthread_cond_init( &task_queue_head->cond, NULL );
- pthread_mutex_init( &task_queue_head->mutex, NULL );
-
- /*create thread poll
- 创建线程池
- */
- create_pthread_pool();
- }
-
-
- /*
- * *thread_manager:code exec in manager thread.
- * block on task_queue_head->cond when no task come.
- * block on pthread_queue_idle->cond when no idle thread
- **ptr:no used ,in order to avoid warning.
- **return :nothing.
- */
-
- void *
- thread_manager( void *ptr )
- {
- while ( 1 )
- {
- THREAD_NODE * temp_thread = NULL;
- TASK_NODE * temp_task = NULL;
-
- /*
- * *get a new task, and modify the task_queue.
- * *if no task block om task_queue_head->cond.
- */
- pthread_mutex_lock( &task_queue_head->mutex );
- //如果任务队列为空,则阻塞等待条件变量task_queue_head->cond
- if ( task_queue_head->number == 0 )
- pthread_cond_wait( &task_queue_head->cond,
- &task_queue_head->mutex );
-
- //如果不为空,则开始准备分配任务,并修改任务队列属性
- temp_task = task_queue_head->head;
- task_queue_head->head = task_queue_head->head->next;
- task_queue_head->number--;
-
- pthread_mutex_unlock( &task_queue_head->mutex );
-
-
- /*
- * *get a new idle thread, and modify the idle_queue.
- * *if no idle thread, block on pthread_queue_idle->cond.
- */
- //有了任务之后,开始判断是否有空闲线程
- pthread_mutex_lock( &pthread_queue_idle->mutex );
-
- //如果没有空闲线程,则阻塞等待条件变量pthread_queue_idle->cond
- if ( pthread_queue_idle->number == 0 )
- pthread_cond_wait( &pthread_queue_idle->cond,
- &pthread_queue_idle->mutex );
-
- //如果有空闲线程则取出一个空闲线程,然后修改空闲线程队列属性
- temp_thread = pthread_queue_idle->head;
-
- /*if this is the last idle thread ,modiry the head and rear pointor */
- if ( pthread_queue_idle->head == pthread_queue_idle->rear )
- {
- pthread_queue_idle->head = NULL;
- pthread_queue_idle->rear = NULL;
- }
- /*if idle thread number>2, get the first one,modify the head pointor */
- else{
- pthread_queue_idle->head = pthread_queue_idle->head->next;
- pthread_queue_idle->head->prev = NULL;
- }
-
- pthread_queue_idle->number--;//将空闲线程队列数量减一
-
- pthread_mutex_unlock( &pthread_queue_idle->mutex );
-
- /*modify the task attribute.
- 修改取出的线程的线程结构属性和相关的任务结构属性
- */
- pthread_mutex_lock( &temp_task->mutex );
-
- temp_task->tid = temp_thread->tid;
- temp_task->next = NULL;
- temp_task->flag = 1;
-
- pthread_mutex_unlock( &temp_task->mutex );
-
- /*modify the idle thread attribute. */
- pthread_mutex_lock( &temp_thread->mutex );
-
- temp_thread->flag = 1;
- temp_thread->work = temp_task;
- temp_thread->next = NULL;
- temp_thread->prev = NULL;
-
- pthread_mutex_unlock( &temp_thread->mutex );
-
- /*add the thread assinged task to the busy queue. */
- //将已分配任务的线程加入忙碌线程队列中
- pthread_mutex_lock( &pthread_queue_busy->mutex );
-
- /*if this is the first one in busy queue */
- if ( pthread_queue_busy->head == NULL )
- {
- pthread_queue_busy->head = temp_thread;
- pthread_queue_busy->rear = temp_thread;
- temp_thread->prev = temp_thread->next = NULL;
- }else {
- /*insert in thre front of the queue */
- pthread_queue_busy->head->prev = temp_thread;
- temp_thread->prev = NULL;
- temp_thread->next = pthread_queue_busy->head;
- pthread_queue_busy->head = temp_thread;
- pthread_queue_busy->number++;
- }
- pthread_mutex_unlock( &pthread_queue_busy->mutex );
-
- /*signal the child thread to exec the work */
- //告知阻塞等待条件变量temp_thread->cond的位置,开始执行任务
- pthread_cond_signal( &temp_thread->cond );
- }
- }
-
-
- /*
- * *code to process the new client in every chilld pthead.
- * *ptr: the fd come from listen thread that can communicate to the client.
- * *return:nothing. void * only used to avoid waring.
- */
- //用来处理任务的函数
- void *
- prcoess_client( void *ptr )
- {
- int net_fd;
- net_fd = atoi( (char *) ptr );
-
- socklen_t len;
- char buf[MAXBUF + 1];
- /*下面是select用到的变量的定义 */
- fd_set rfds;
- struct timeval tv;
- int retval;
- int maxfd = -1;
-
- while ( 1 )
- {
- FD_ZERO( &rfds ); /* 初始化rfds为空 */
- FD_SET( 0, &rfds ); /* 将标准输入的描述符0加入到集合rfds中 */
- FD_SET( net_fd, &rfds ); /* 将net_fd加入到集合rfds中 */
- maxfd = net_fd + 1;
- tv.tv_sec = 1; /* 阻塞等待时间为1s */
- tv.tv_usec = 0;
-
- retval = select( maxfd, &rfds, NULL, NULL, &tv ); /* 多路复用,同时监测描述符0和net_fd */
- if ( retval == -1 ) /* select函数执行出错 */
- {
- perror( "select" );
- exit( EXIT_FAILURE );
- }else if ( retval == 0 ) /* select函数执行超时 */
- continue;
- else{ /*有描述符引起异常 */
- if ( FD_ISSET( 0, &rfds ) ) /* 判断是不是标准输入0引起的异常 */
- {
- bzero( buf, sizeof(buf) ); /* 清空buf */
- fgets( buf, sizeof(buf) - 1, stdin ); /* 从终端接收输入 */
-
- if ( !strncasecmp( buf, "quit", 4 ) ) /* 判断是否为退出 */
- {
- printf( "i will close the connect!n" );
- close( net_fd );
- goto clean;
- }
-
- len = send( net_fd, buf, strlen( buf ) - 1, 0 ); /* 向客户端发送消息 */
- if ( len > 0 )
- {
- printf( "send successful,%d byte send!n", len );
- }else {
- printf( "message '%s' send failure !n", buf );
- printf( "errno code is %d, errno message is '%s'n", errno, strerror( errno ) );
- close( net_fd );
- goto clean;
- }
- }
-
- if ( FD_ISSET( net_fd, &rfds ) ) /* 判断是不是net_fd引起的异常 */
- {
- bzero( buf, sizeof(buf) );
- len = recv( net_fd, buf, sizeof(buf) - 1, 0 ); /* 从客户端接收消息 */
- if ( len > 0 )
- printf( "message recv successful : '%s', %d Byte recvn", buf, len );
- else if ( len < 0 )
- {
- printf( "recv failure !nerrno code is %d, errno message is '%s'n", errno, strerror( errno ) );
- close( net_fd );
- goto clean;
- }else { /* 如果客户端已关闭 */
- printf( "the other one close quitn" );
- close( net_fd );
- return;
- }
- }
- }
- }
- close( net_fd );
- return;
-
- clean:
- sys_clean();
- }
-
-
- /*
- * *task_manager: get new task and add to the tail of the task_link.
- * *ptr: no used. just avoid warning.
- * *return:nothing.
- */
- //用来监听客户端的连接,产生任务
- void *
- task_manager( void *ptr )
- {
- int listen_fd;
-
- if ( -1 == (listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) )
- {
- perror( "socket" );
- goto clean;
- }
-
- struct ifreq ifr;
-
- //eno16777736类似于eth0,在Linux系统中可以修改为eth0
- strcpy( ifr.ifr_name, "eno16777736" );
- //获取eno16777736的ip地址
- if ( ioctl( listen_fd, SIOCGIFADDR, &ifr ) < 0 )
- {
- perror( "ioctl" );
- goto clean;
- }
-
- struct sockaddr_in myaddr;
-
- myaddr.sin_family = AF_INET;
- myaddr.sin_port = htons( PORT );//PORT为9001,在头文件中设置,是全局变量
- myaddr.sin_addr.s_addr =
- ( (struct sockaddr_in *) &(ifr.ifr_addr) )->sin_addr.s_addr;
-
- <span style="white-space:pre"> </span>//输出ip和port信息
- <span style="white-space:pre"> </span>printf("server_ip = %snserver_port = %dnlisnum = %dn", inet_ntoa(myaddr.sin_addr), PORT, LISNUM);
-
- //绑定IP地址和端口port
- if ( -1 == bind( listen_fd, (struct sockaddr *) &myaddr, sizeof(myaddr) ) )
- {
- perror( "bind" );
- goto clean;
- }
- //监听
- if ( -1 == listen( listen_fd, 5 ) )
- {
- perror( "listen" );
- goto clean;
- }
-
- /*i is the id of the task */
- int i;
- //开始接受客户端的连接,产生任务
- for ( i = 1; ; i++ )
- {
- int newfd;
- struct sockaddr_in client;
- socklen_t len = sizeof(client);
-
- if ( -1 ==
- (newfd = accept( listen_fd, (struct sockaddr *) &client, &len ) ) )
- {
- perror( "accept" );
- goto clean;
- }
- /* 打印本次连接的客户端的地址信息 inet_ntoa ntohs */
- printf( "server: got connection from %s, port %d, socket %dn", inet_ntoa( client.sin_addr ), ntohs( client.sin_port ), newfd );
-
- //开始将产生的新任务加入任务队列之中
- TASK_NODE * temp = NULL;
- TASK_NODE * newtask = (TASK_NODE *) malloc( sizeof(TASK_NODE) );
- if ( newtask == NULL )
- {
- printf( "malloc error" );
- goto clean;
- }
-
- /*
- * *initial the attribute of the task.
- * *because this task havn't add to system,so,no need lock the mutex.
- */
-
- newtask->arg = (void *) malloc( 128 );
-
- memset( newtask->arg, ' ', 128 );
- //任务执行的参数为连接的客户端的socket描述符
- sprintf( newtask->arg, "%d", newfd );
- //新任务的处理函数均为prcoess_client,newfd即为函数prcoess_client的参数
- newtask->fun = prcoess_client;
- newtask->tid = 0;
- newtask->work_id = i;
- newtask->next = NULL;
- pthread_mutex_init( &newtask->mutex, NULL );
-
- /*add new task to task_link */
- pthread_mutex_lock( &task_queue_head->mutex );
-
- /*find the tail of the task link and add the new one to tail
- 开始将产生的新任务加入到任务队列中
- */
- temp = task_queue_head->head;
-
- if ( temp == NULL )
- {
- task_queue_head->head = newtask;
- }else {
- while ( temp->next != NULL )
- temp = temp->next;
-
- temp->next = newtask;
- }
- task_queue_head->number++;//任务队列数量加一
-
- pthread_mutex_unlock( &task_queue_head->mutex );
-
- /*signal the manager thread , task coming
- 告知阻塞等待条件变量task_queue_head->cond的位置,已有未执行的任务
- */
- pthread_cond_signal( &task_queue_head->cond );
- }
-
- return;
-
- clean:
- sys_clean();
- }
-
-
- /*
- * *monitor: get the system info
- * *ptr: not used ,only to avoid warning for pthread_create
- * *return: nothing.
- */
- //用来输出哪些线程在工作
- void *
- monitor( void *ptr )
- {
- /*in order to prevent warning. */
- ptr = ptr;
-
- THREAD_NODE * temp_thread = NULL;
-
- while ( 1 )
- {
- pthread_mutex_lock( &pthread_queue_busy->mutex );
-
- /*output the busy thread works one by one */
- temp_thread = pthread_queue_busy->head;
-
- printf( "n*******************************n" );
- while ( temp_thread )
- {
- printf( "thread %ld is execute work_number %dn",
- temp_thread->tid, temp_thread->work->work_id );
- temp_thread = temp_thread->next;
- }
- printf( "*******************************nn" );
-
- pthread_mutex_unlock( &pthread_queue_busy->mutex );
-
- sleep( 10 );
- }
-
- return;
- }
-
-
- /*
- * *sys_clean: clean the system .
- * *this function code need to do to make it more stronger.
- */
- //清理函数
- void
- sys_clean( void )
- {
- printf( "the system exit abnormallyn" );
- exit( EXIT_FAILURE );
- }
第一部分是:使用“预先生成进程”处理
主函数
[cpp] view
plain copy
print?

- #include "pthread_pool.h"
-
- //定义三个结构的指针
- PTHREAD_QUEUE_T * pthread_queue_idle; /* the idle thread double link queue. */
- PTHREAD_QUEUE_T *pthread_queue_busy; /* the work thread double link queue. */
- TASK_QUEUE_T *task_queue_head; /* the task queuee single link list. */
-
- int
- main( int argc, char *argv[] )
- {
- pthread_t thread_manager_tid, task_manager_tid, monitor_id;
-
- //初始化空闲线程、在工作线程和要完成的任务
- init_system();
-
- //创建线程池管理线程、创建任务管理线程和线程状态监视线程
- pthread_create( &thread_manager_tid, NULL, thread_manager, NULL ); /* create thread to manage the thread pool. */
- pthread_create( &task_manager_tid, NULL, task_manager, NULL ); /* create thread recive task from client. */
- pthread_create( &monitor_id, NULL, monitor, NULL ); /* create thread to monitor the system info. */
-
- //等待线程退出
- pthread_join( thread_manager_tid, NULL );
- pthread_join( task_manager_tid, NULL );
- pthread_join( monitor_id, NULL );
-
- //清理服务器,准备退出主函数
- sys_clean();
-
- return(0);
- }
CODE_1 :
server是:主进程accept,那么这是4种方法中最复杂的,因为要涉及到进程间传递socket描述符的问题!(
进程间传递描述符在上一篇bolg中有过
!),server采用轮询的方式将socket传递给子进程!
客户端代码
[cpp] view
plain copy
print?

- /*************************************************************************
- > File Name: socket_select_client.c
- > Author: genglut
- > Mail: genglut@163.com
- > Created Time: 2014年12月22日 星期一 18时06分06秒
- ************************************************************************/
-
- #include <stdio.h>
- #include <string.h>
- #include <errno.h>
- #include <sys/socket.h>
- #include <resolv.h>
- #include <stdlib.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <unistd.h>
-
- #define MAXBUF 1024
-
- int main(int argc, char *argv[])
- {
- int sockfd;
- socklen_t len;
- struct sockaddr_in server_addr;
- char buf[MAXBUF + 1];
-
- //下面是select用到的变量的定义
- fd_set rfds;
- struct timeval tv;
- int retval;
- int maxfd = -1;
-
- if(argc != 3)
- {
- printf("error failure, it must be:ntt%s IP port n", argv[0]);
- exit(EXIT_FAILURE);
- }
-
- if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
- {
- perror("socket");
- exit(EXIT_FAILURE);
- }
-
- bzero(&server_addr, sizeof(server_addr));
- server_addr.sin_family = AF_INET;
- server_addr.sin_port = htons(atoi(argv[2]));
- server_addr.sin_addr.s_addr = inet_addr(argv[1]);
-
- if(connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) == -1)
- {
- perror("connect");
- exit(EXIT_FAILURE);
- }
-
- printf("already connected to server %sn", argv[1]);
-
-
- while(1)
- {
- FD_ZERO(&rfds);//初始化rfds为空
- FD_SET(0, &rfds);//将标准输入的描述符0加入到集合rfds中
- FD_SET(sockfd, &rfds);//将newfd加入到集合rfds中
- maxfd = sockfd + 1;
- tv.tv_sec = 1;//阻塞等待时间为1s
- tv.tv_usec = 0;
-
- retval = select(maxfd, &rfds, NULL, NULL, &tv);//多路复用,同时监测描述符0和newfd
-
- if(retval == -1)//select函数执行出错
- {
- perror("select");
- exit(EXIT_FAILURE);
- }
- else if(retval == 0)//select函数执行超时
- continue;
- else//有描述符引起异常
- {
- if(FD_ISSET(0, &rfds))//判断是不是标准输入0引起的异常
- {
- bzero(buf, sizeof(buf));//清空buf
- fgets(buf, sizeof(buf)-1, stdin);//从终端接收输入
-
- if(!strncasecmp(buf, "quit", 4))//判断是否为退出
- {
- printf("i will quit!n");
- break;
- }
-
- len = send(sockfd, buf, strlen(buf)-1, 0);//向客户端发送消息
- if(len > 0)
- {
- printf ("send successful,%d byte send!n",len);
- }
- else
- {
- printf("message '%s' send failure !n", buf);
- printf("errno code is %d, errno message is '%s'n", errno, strerror(errno));
- break;
- }
- }
-
- if(FD_ISSET(sockfd, &rfds))//判断是不是newfd引起的异常
- {
- bzero(buf, sizeof(buf));
- len = recv(sockfd, buf, sizeof(buf)-1, 0);//从客户端接收消息
- if(len > 0 )
- printf("message recv successful : '%s', %d Byte recvn", buf, len);
- else if(len < 0)
- {
- printf("recv failure !nerrno code is %d, errno message is '%s'n", errno, strerror(errno));
- break;
- }
- else//如果客户端已关闭
- {
- printf("the other one close, quitn");
- break;
- }
- }
- }
- }
-
- close(sockfd);
- printf("i quited!n");
- return 0;
- }
话不多说,贴上代码:
运行结果
Server:
服务器:
[cpp] view
plain copy
print?

- $ ./pthread_pool
-
- -----------------------------------------
- -----------------------------------------
-
- server_ip = 172.18.229.60
- server_port = 9001
- lisnum = 5
-
- -----------------------------------------
- -----------------------------------------
-
- server: got connection from 172.18.229.60, port 56023, socket 4
- message recv successful : 'hello', 5 Byte recv
-
- -----------------------------------------
- thread 40159 is execute work_number 1
- -----------------------------------------
-
- server: got connection from 172.18.229.60, port 56024, socket 5
- message recv successful : 'hi', 2 Byte recv
- the other one close quit
-
- -----------------------------------------
- thread 40159 is execute work_number 1
- -----------------------------------------
-
- the other one close quit
- /*
- 基本思路:
- server预先创建几个子进程,他们都在各自与server独立链接的pipe socket等待数据,
- server调用accept,然后遍历子进程,将connfd通过pipe给字进程处理!
- 一旦子进程结束就返回任意内容告知已经处理结束!那么server将其状态位置为闲置!!!
- 注意:这里应该使用select或者epoll,因为server接受的不仅仅是listen的msg,还有child
- 处理完成返回的标志信息!
- */
-
- #include <stdio.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/select.h>
- #include <sys/types.h>
- #include <errno.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <sys/epoll.h>
- #include <fcntl.h>
-
- #define PORT 6000
- #define MAXBACK 100
- #define MAXLINE 1024
- #define CHILD_NUM 10
-
- typedef struct child_process
- {
- pid_t s_pid; //!> 子进程的pid
- int s_pipe_fd; //!> 与子进程通信的pipe口
- int s_status; //!> 子进程的状态!0:闲 1:忙
- }child_process;
-
- child_process child[CHILD_NUM]; //!> 定义10个子进程( 此处以10个为例 )
-
- static int n_child_use = 0; //!> 几个child在工作( if 全忙就不给他们 )
-
-
- //!> 发送socket描述符( 这个代码在上一篇博文上有 )
- //!>
- int send_fd( int fd_send_to, void * data, size_t len, int sock_fd )
- {
- struct msghdr msghdr_send; //!> the info struct
- struct iovec iov[1]; //!> io vector
- size_t n; //!>
-
- union
- {
- struct cmsghdr cm; //!> control msg
- char ctl[CMSG_SPACE(sizeof( int ))]; //!> the pointer of char
- }ctl_un;
-
- struct cmsghdr * pCmsghdr = NULL; //!> the pointer of control
-
- msghdr_send.msg_control = ctl_un.ctl;
- msghdr_send.msg_controllen = sizeof( ctl_un.ctl );
-
- //!> design : the first info
- pCmsghdr = CMSG_FIRSTHDR( &msghdr_send ); //!> the info of head
- pCmsghdr->cmsg_len = CMSG_LEN(sizeof(int)); //!> the msg len
- pCmsghdr->cmsg_level = SOL_SOCKET; //!> -> stream mode
- pCmsghdr->cmsg_type = SCM_RIGHTS; //!> -> file descriptor
- *((int *)CMSG_DATA( pCmsghdr )) = sock_fd; //!> data: the file fd
-
- //!> these infos are nosignification
- msghdr_send.msg_name = NULL; //!> the name
- msghdr_send.msg_namelen = 0; //!> len of name
-
- iov[0].iov_base = data; //!> no data here
- iov[0].iov_len = len; //!> the len of data
-
- msghdr_send.msg_iov = iov; //!> the io/vector info
- msghdr_send.msg_iovlen = 1; //!> the num of iov
-
- return ( sendmsg( fd_send_to, &msghdr_send, 0 ) ); //!> send msg now
- }
-
- //!> 接收socket描述符
- //!>
- int recv_sock_fd( int fd, void * data, size_t len, int * recv_fd )
- {
- struct msghdr msghdr_recv; //!> the info struct
- struct iovec iov[1]; //!> io vector
- size_t n; //!>
-
- union
- {
- struct cmsghdr cm; //!> control msg
- char ctl[CMSG_SPACE(sizeof( int ))]; //!> the pointer of char
- }ctl_un;
-
- struct cmsghdr * pCmsghdr = NULL; //!> the pointer of control
-
- msghdr_recv.msg_control = ctl_un.ctl;
- msghdr_recv.msg_controllen = sizeof( ctl_un.ctl );
-
- //!> these infos are nosignification
- msghdr_recv.msg_name = NULL; //!> the name
- msghdr_recv.msg_namelen = 0; //!> len of name
-
- iov[0].iov_base = data; //!> no data here
- iov[0].iov_len = len; //!> the len of data
-
- msghdr_recv.msg_iov = iov; //!> the io/vector info
- msghdr_recv.msg_iovlen = 1; //!> the num of iov
-
- if( ( n = recvmsg( fd, &msghdr_recv, 0 ) ) < 0 ) //!> recv msg
- { //!> the msg is recv by msghdr_recv
- printf("recv error : %dn", errno);
- exit(EXIT_FAILURE);
- }
-
- //!> now, we not use 'for' just because only one test_data_
- if( ( pCmsghdr = CMSG_FIRSTHDR( &msghdr_recv ) ) != NULL //!> now we need only one,
- && pCmsghdr->cmsg_len == CMSG_LEN( sizeof( int ) ) //!> we should use 'for' when
- ) //!> there are many fds
- {
- if( pCmsghdr->cmsg_level != SOL_SOCKET )
- {
- printf("Ctl level should be SOL_SOCKET :%d n", errno);
- exit(EXIT_FAILURE);
- }
-
- if( pCmsghdr->cmsg_type != SCM_RIGHTS )
- {
- printf("Ctl type should be SCM_RIGHTS : %dn", errno);
- exit(EXIT_FAILURE);
- }
-
- *recv_fd =*((int*)CMSG_DATA(pCmsghdr)); //!> get the data : the file des*
- }
- else
- {
- *recv_fd = -1;
- }
-
- return n;
- }
-
-
- //!> 子进程具体的执行过程
- //!>
- void web_child( int con_fd )
- {
- char buf[MAXLINE];
- int n_read;
- int i = 0;
-
- while( strcmp( buf, "Q" ) != 0 && strcmp( buf, "q" ) != 0 )
- {
- memset( buf, 0, sizeof( buf ) );
-
- if( ( n_read = read( conn_fd, buf, MAXLINE ) ) < 0 )
- {
- printf( "Read errnr! :%d n", errno );
- exit( EXIT_FAILURE );
- }
- else if( n_read == 0 )
- {
- continue;
- }
- else
- {
- while( buf[i] )
- {
- buf[i] = toupper( buf[i] );
- i++;
- }
- buf[i] = ' ';
-
- printf("Child %d done! n", ( unsigned int )pthread_self());
- printf("Child %d send %sn", ( unsigned int )pthread_self(), buf);
- write( conn_fd, buf, strlen( buf ) ); //!> 写回给client
- }
- }
-
- printf("Child %d : Dating end!n", ( unsigned int )pthread_self());
-
- }
-
-
- //!> child process 的主函数
- //!>
- void child_main( int i )
- {
- char data; //!> 由于此处我们主要是传递socket,那么data一般就给一个” “做一个标志就好
- int con_fd; //!> 接受con_fd
- int n_read; //!> 读取长度
-
- printf( "Child %d starting ... n", i );
-
- while( 1 )
- {
- if( ( n_read = recv_sock_fd( STDERR_FILENO, &data, 1, &con_fd ) ) == 0 )
- {
- continue; //!> 此处理论上应该是阻塞,但是简化为轮询
- //printf( " Child process %d read errnr! : %dn", i, errno );
- //exit( EXIT_FAILURE );
- }
-
- if( con_fd < 0 )
- {
- printf("Child %d read connfd errnr! : %dn", i, errno);
- exit( EXIT_FAILURE );
- }
-
- web_child( con_fd ); //!> child具体的执行过程
-
- write( STDERR_FILENO, " ", 1 ); //!> 随便写点什么让server知道我处理完成了,那么就可以将状态位置为0了
- }
- }
-
- //!> 产生子进程及相关处理
- //!>
- void child_make( int i, int listen_fd )
- {
- int sock_fd[2]; //!> 为了和主进程通信创建socket pair
- pid_t pid;
-
- //!> 创建 socketpair
- if( socketpair( AF_LOCAL, SOCK_STREAM, 0, sock_fd ) == -1 )
- {
- printf( "create socketpair error : %dn", errno );
- exit( EXIT_FAILURE );
- }
-
- if( ( pid = fork() ) > 0 ) //!> 父进程
- {
- close( sock_fd[1] );
- child[i].s_pid = pid;
- child[i].s_pipe_fd = sock_fd[0];
- child[i].s_status = 0;
- return;
- }
-
- if( dup2( sock_fd[0], STDERR_FILENO ) == -1 ) //!> 现在可以使用STDERR_FILENO替换刚刚创建的sock描述符
- { //!> 往后的child的操作就可以STDERR_FILENO中进行!
- printf("socket pair errnr! : %dn", errno);
- exit( EXIT_FAILURE );
- }
-
- close( sock_fd[0] ); //!> 这些描述符都bu需要了!
- close( sock_fd[1] );
- close( listen_fd );
-
- child_main( i ); //!> child 主循环
- }
-
- //!> MAIN PROCESS
- //!>
- int main( int argc, char ** argv )
- {
- int i;
- int listen_fd;
- int conn_fd;
- int max_fd;
- int n_select;
- int n_read;
- char buf[5];
- fd_set all_set, now_set;
- struct sockaddr_in servaddr;
- struct sockaddr_in cliaddr;
- int len = sizeof( struct sockaddr_in );
-
- //!> server 套接口
- //!>
- bzero( &servaddr, sizeof( servaddr ) );
- servaddr.sin_family = AF_INET;
- servaddr.sin_addr.s_addr = htonl( INADDR_ANY );
- servaddr.sin_port = htons( PORT );
-
- //!> 建立套接字
- if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 )
- {
- printf("Socket Error...n" , errno );
- exit( EXIT_FAILURE );
- }
-
- //!> 绑定
- //!>
- if( bind( listen_fd, ( struct sockaddr *)&servaddr, sizeof( servaddr ) ) == -1 )
- {
- printf("Bind Error : %dn", errno);
- exit( EXIT_FAILURE );
- }
-
- //!> 监听
- //!>
- if( listen( listen_fd, MAXBACK ) == -1 )
- {
- printf("Listen Error : %dn", errno);
- exit( EXIT_FAILURE );
- }
-
- FD_ZERO( &all_set );
- FD_SET( listen_fd, &all_set ); //!> 将listenfd加入select
- max_fd = listen_fd;
-
- for( i = 0; i < CHILD_NUM; i++ )
- {
- child_make( i, listen_fd );
- FD_SET( child[i].s_pipe_fd, &all_set ); //!> 将子进程socket加入
- max_fd = max_fd > child[i].s_pipe_fd ? max_fd : child[i].s_pipe_fd;
- }
-
- while( 1 ) //!> 主进程循环
- {
- now_set = all_set;
- if( n_child_use >= CHILD_NUM ) //!> 没有可以使用的child 了
- { //!> 那么就将listenfd从中清空,也就是不在响应listen了,直到有child空闲
- FD_CLR( listen_fd, &now_set );
- }
-
- if( (n_select = select( max_fd + 1, &now_set, NULL, NULL, NULL )) == -1)
- {
- printf(" Main process select errnr~ :%dn", errno);
- exit( EXIT_FAILURE );
- }
-
- if( FD_ISSET( listen_fd, &now_set ) ) //!> if来了请求
- {
- if( ( conn_fd = accept( listen_fd, ( struct sockaddr *)&cliaddr , &len ) ) == -1 )
- {
- printf("Server accept errnr! : %dn", errno);
- exit( EXIT_FAILURE );
- }
-
- for( i = 0; i < CHILD_NUM; i++ )
- {
- if( child[i].s_status == 0 ) //!> 此child闲置
- {
- break;
- }
- }
-
- if( i == CHILD_NUM ) //!> 说明child已经全部处于忙态
- {
- printf("All childs are busy! n");
- exit( EXIT_FAILURE ); //!> 此处可以等待哦,或者丢弃数据
- }
-
- child[i].s_status = 1; //!> busy
- n_child_use++; //!> busy child ++
-
- send_fd( child[i].s_pipe_fd, " ", 1, conn_fd ); //!> 发送socket描述符
- close( conn_fd ); //!> server不需要处理了
-
- if( --n_select == 0 ) //!> 没有其他的请求了
- {
- continue;
- }
- }
-
- for( i = 0; i < CHILD_NUM; i++ ) //!> 看看那些child发来了msg,其实server知道肯定是child完成处理的提示标志
- {
- if( FD_ISSET( child[i].s_pipe_fd, &now_set ) )
- {
- if( ( n_read = read( child[i].s_pipe_fd, buf, 5 ) ) == 0 ) //!> 这里的buf中data没有用,仅仅是child告诉server我完成了
- {
- printf("Child %d exit error! : %dn", i, errno);
- exit( EXIT_FAILURE );
- }
-
- child[i].s_status = 0; //!> 状态位置闲
-
- if( --n_select == 0 ) //!> if没有其他child回送消息就不要浪费时间for了
- {
- break;
- }
- }
- }
-
- }
-
- return 0;
- }
客户端1:
[cpp] view
plain copy
print?

- $ ./client 172.18.229.60 9001
- already connected to server 172.18.229.60
- hello
- send successful,5 byte send!
- quit
- i will quit!
- i quited!
Client:
客户端2:
[cpp] view
plain copy
print?

- $ ./client 172.18.229.60 9001
- already connected to server 172.18.229.60
- hi
- send successful,2 byte send!
- quit
- i will quit!
- i quited!
- #include <stdio.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <string.h>
- #include <errno.h>
- #include <netinet/in.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <sys/select.h>
-
- #define MAXLINE 1024
- #define SERV_PORT 6000
-
- //!> 注意输入是由stdin,接受是由server发送过来
- //!> 所以在client端也是需要select进行处理的
- void send_and_recv( int connfd )
- {
- FILE * fp = stdin;
- int lens;
- char send[MAXLINE];
- 宝马娱乐在线,char recv[MAXLINE];
- fd_set rset;
- FD_ZERO( &rset );
- int maxfd = ( fileno( fp ) > connfd ? fileno( fp ) : connfd + 1 );
- //!> 输入和输出的最大值
- int n;
-
- while( 1 )
- {
- FD_SET( fileno( fp ), &rset );
- FD_SET( connfd, &rset ); //!> 注意不要把rset看作是简单的一个变量
- //!> 注意它其实是可以包含一组套接字的哦,
- //!> 相当于是封装的数组!每次都要是新的哦!
-
- if( select( maxfd, &rset, NULL, NULL, NULL ) == -1 )
- {
- printf("Client Select Error..n");
- exit(EXIT_FAILURE );
- }
-
- //!> if 连接口有信息
- if( FD_ISSET( connfd, &rset ) ) //!> if 连接端口有信息
- {
- printf( "client get from server ...n" );
- memset( recv, 0, sizeof( recv ) );
- n = read( connfd, recv, MAXLINE );
- if( n == 0 )
- {
- printf("Recv ok...n");
- break;
- }
- else if( n == -1 )
- {
- printf("Recv error...n");
- break;
- }
- else
- {
- lens = strlen( recv );
- recv[lens] = ' ';
- //!> 写到stdout
- write( STDOUT_FILENO, recv, MAXLINE );
- printf("n");
- }
-
- }
-
- //!> if 有stdin输入
- if( FD_ISSET( fileno( fp ), &rset ) ) //!> if 有输入
- {
- //!> printf("client stdin ...n");
-
- memset( send, 0, sizeof( send ) );
- if( fgets( send, MAXLINE, fp ) == NULL )
- {
- printf("End...n");
- exit( EXIT_FAILURE );
- }
- else
- {
- //!>if( str )
- lens = strlen( send );
- send[lens-1] = ' '; //!> 减一的原因是不要回车字符
- //!> 经验值:这一步非常重要的哦!!!!!!!!
- if( strcmp( send, "q" ) == 0 )
- {
- printf( "Bye..n" );
- return;
- }
-
- printf("Client send : %sn", send);
- write( connfd, send, strlen( send ) );
- }
- }
-
- }
-
- }
-
- int main( int argc, char ** argv )
- {
- //!> char * SERV_IP = "10.30.97.188";
- char buf[MAXLINE];
- int connfd;
- struct sockaddr_in servaddr;
-
- if( argc != 2 )
- {
- printf("Input server ip !n");
- exit( EXIT_FAILURE );
- }
-
- //!> 建立套接字
- if( ( connfd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 )
- {
- printf("Socket Error...n" , errno );
- exit( EXIT_FAILURE );
- }
-
- //!> 套接字信息
- bzero(&servaddr, sizeof(servaddr));
- servaddr.sin_family = AF_INET;
- servaddr.sin_port = htons(SERV_PORT);
- inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
-
- //!> 链接server
- if( connect( connfd, ( struct sockaddr * )&servaddr, sizeof( servaddr ) ) < 0 )
- {
- printf("Connect error..n");
- exit(EXIT_FAILURE);
- }
- /*else
- {
- printf("Connet ok..n");
- }*/
-
- //!>
- //!> send and recv
- send_and_recv( connfd );
-
- //!>
-
- close( connfd );
- printf("Exitn");
-
- return 0;
- }
原文链接
http://blog.csdn.net/geng823/article/details/42144461
本文由宝马娱乐在线发布于网络频道,转载请注明出处:Linux网络编程 - TCP Socket 简单练习:线程池实现并发