一、背景知识
1、地址空间进一步理解
在父子进程对同一变量进行修改时发生写时拷贝,这时候拷贝的基本单位是4KB,会将该变量所在的页框全拷贝一份,这是因为修改该变量很有可能会修改其周围的变量(局部性原理),这是一种以空间换时间的做法;malloc和new其实对申请内存做了封装,申请的也是4KB的整数倍。
OS如何管理页框?先描述再组织。描述:用一个结构体struct page,管理采用数组struct page memory[n] 等数据结构。一个4GB的内存有1048576(1MB)(计算方式:4*1024*1024*1024 / (4*1024))个页框,那么管理的数组有1048576个元素,每一个page有了下标,这里的下标*4KB就是对应的每个page的下标。
2、页表的深入理解
如果页表是真的如下图所示,左边是虚拟地址,右边是物理地址,那么每一个条目就要有8个字节,加上表示位置的一个字节,就是9个字节,一个进程的虚拟地址空间一般是4GB,那么一张页表就有4GB*9=36GB大小,这明显是不合适的。
物理地址一定是在物理内存中的某个页的。只要找到该物理地址在该页内的偏移量,就可以找到该物理地址对应的那个字节。
对于一个物理地址,其实就是一个32位的二进制数字。真实的页表是将这32位数字划分为10+10+12三类。
3、虚拟地址的本质
函数是有地址的,函数的地址一般指函数入口的地址,而函数这一段代码块是一段连续的地址。函数的本质就是一段连续的代码地址。
虚拟地址是一种资源。
二、线程的概念与代码实现
1、概念
线程是进程内部运行的CPU调度的基本单位。
线程是在进程的PCB中运行的,一个进程中可能有多个PCB(对应在LINUX中就是多个task_struct)
Linux中的线程:
复用pcb,用pcb统一表示执行流,这样就不需要为线程单独设计数据结构和调度算法了。
windows中的线程:对线程先描述再组织,有一个struct tcb结构体,内部有线程ID、优先级、状态、上下文、连接属性等属性。每一个线程都要与进程相连接。最后设计成的示意图如下:
Linux系统实现线程的方式是更优的,维护成本低,更简单也就更不容易出错。
Windows中是真的有进程,而Linux中叫轻量级进程。
2、代码实现
创建线程的函数:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void *newthread_process(void *str)
{
while (1)
{
sleep(1);
cout<<"new thread processing... pid:"<<getpid()<<endl;
}
}
int main()
{
pthread_t mythread1;
int thread_ret = pthread_create(&mythread1, nullptr, newthread_process, (void *)"new thread");
//主线程
while(1)
{
cout<<"main thread processing... pid:"<<getpid()<<endl;
sleep(1);
}
return 0;
}
.PHONY: all
all:test_thread
test_thread : test_thread.cpp
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY: clean
clean:
rm -f test_thread
在编译时需要带入pthread动态库,即-lpthread
ps-aL查看轻量级进程(ps axj查看进程)其中LWP表示的就是轻量级进程的ID
3、关于线程的几个问题:
(1)已经有多进程,为何要有多线程?
① 进程创建成本非常高,要创建PCB、地址空间、页表,并建立映射等;创建线程只需要创建PCB,然后将资源分配给该PCB。
② 运行时线程相对于进程,切换时不需要切换地址空间和页表。线程调度成本低。
线程调度成本为何低?
实际页表对应一个寄存器,仅仅是这个寄存器的切换不会有太大的效率影响。
CPU内部有一个硬件cache,根据局部性原理存储热数据;如果是进程间切换,cache中的热数据需要切换;而线程切换的cache热数据无需变化。
③ 删除时线程也成本低。
(2)那为何要有进程?
线程也有劣势。在一个进程中的多个线程,当一个线程出现野指针、除0等类似的错误时,整个进程都要崩溃回收,其他线程也就崩溃了,线程的健壮性比较差。一个线程出异常,可能会改变其他线程的数据,这样其他线程的正确性无法保证,所以全部崩溃。
(3)不同的OS实现线程的方式不一样?
Windows实现线程是先描述再组织;Linux则是通过复用代码。尽管实现方式不一样,但都满足线程的统一定义:进程内部运行的CPU调度的基本单位。只是具体的如何实现在进程内部,如何实现其是CPU调度的基本单位,是不一样的。线程的原理都是正确的。
(4)线程的页表划分
每一个线程有自己的代码,对应在页表中不同的区域。也就是说,对于同一张页表,不同的线程对应着页表上不同的区域。这样就是线程对页表的划分。
(5)OS与进程的关系
OS其实就是一个进程。虚拟机的原理就是这样,虚拟机中一个OS挂掉了不会影响其他OS。
(6)如何分几个线程?
对于计算密集型应用,对于一个单核CPU,分再多的线程,效率只会变低,因为线程切换需要成本。最好分的线程个数与CPU核数一样合适。
对于I/O密集型应用,可以多创建几个线程,因为IO操作的大部分时间都在等待,多分几个线程这样等待的时间重叠。
4、代码验证线程的健壮性较低
代码:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void *newthread_process(void *str)
{
while (1)
{
int x=rand()%5;
cout<<"new thread processing... pid:"<<getpid()<<endl;
sleep(1);
if(x==0)
{
int *p=nullptr;
*p=100;
}
}
}
int main()
{
srand(time(nullptr));
pthread_t mythread1,mythread2,mythread3;
int thread_ret1 = pthread_create(&mythread1, nullptr, newthread_process, (void *)"new thread");
int thread_ret2 = pthread_create(&mythread2, nullptr, newthread_process, (void *)"new thread");
int thread_ret3 = pthread_create(&mythread3, nullptr, newthread_process, (void *)"new thread");
//主线程
while(1)
{
sleep(1);
cout<<"main thread processing... pid:"<<getpid()<<endl;
}
return 0;
}
验证结果:当一个进程出现报错,所有进程都退出。
不仅健壮性较低,由于多线程共享地址空间的大部分资源,所以其缺乏访问控制。
5、进程与线程对比
线程独有的数据,比较重要的是:一组寄存器和栈。
寄存器:硬件中的上下文数据,反映了线程可以动态运行的;
栈:每个线程都要有自己独立的栈结构,因为函数执行要是独立的。线程在运行的时候,会形成各种临时变量,临时变量会被每个线程保存在自己的栈区。
三、线程控制
在编译以上代码时加上了-lpthread。由于linux中没有线程只有轻量级进程,所以系统调用的接口只会给用户提供创建轻量级进程的接口;而我们在写代码时直接使用的是线程的相关接口,这是通过pthread动态库实现的。
1、pthread_create函数详解
第一个参数是输出型参数,其实际是Linux对 unsigned long int 类型的一个封装,是一个地址;第二个参数是要修改的线程的属性,可以直接设置为nullptr,第三个参数是一个返回值为void*且参数也为void*的函数指针,第四个参数是回调的函数参数,传给第三个参数作为函数参数。
返回值为0,表示正确创建了线程,否则返回错误码,thread中的内容将会是未定义。
2、代码验证
(1)等待线程的函数
第一个参数即为pthread_create函数中的第一个参数,表示要等待哪一个线程;第二个参数是得到pthread_create函数中的第三个参数的函数最终的返回值.
(2)代码
①两个线程的代码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
class ThreadData
{
public:
int x;
int y;
string name;
int Add()
{
return x+y;
}
private:
};
class ThreadResult
{
public:
int x;
int y;
int result;
private:
};
void* threadRun(void* args)
{
/*int cnt=5;
while(cnt)
{
cout<<"new thread run,cnt:"<<cnt--<<endl;
sleep(1);
}*/
auto td=static_cast<ThreadData*>(args);// 安全强转
//cout<<"id: "<<td->id<<" name:"<<td->name<<endl;
ThreadResult* ret=new ThreadResult();
ret->x=td->x;
ret->y=td->y;
ret->result=td->Add();
delete td;
return (void*)ret;
}
string PrintToHex(pthread_t& tid)
{
char cache[64];
snprintf(cache,sizeof(cache),"0x%lx",tid);
return cache;
}
int main()
{
pthread_t tid;
ThreadData* td=new ThreadData();//推荐在堆空间上开辟
td->x=10;
td->y=20;
td->name="thread-1";
int n=pthread_create(&tid,nullptr,threadRun,(void*)td);
if(n!=0)
{
cerr<<"create thread"<<endl;
return 1;
}
string tid_hex=PrintToHex(tid);
cout<<"tid的16进制形式为:"<<tid_hex<<endl;//验证tid
//join等待
cout<<"pthread join begin..."<<endl;
ThreadResult* ret=nullptr;
pthread_join(tid,(void**)&ret);//会阻塞到这等待 类似于wait
//如果不join,当主线程退出,所有的线程都退出了,因为进程都退出了 ,因为当main线程不退出,而new线程退出时,不join会造成类似僵尸线程的问题
cout<<"ret:"<<ret->result<<endl;
cout<<"pthread join success"<<endl;
return 0;
}
②多线程代码
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
using namespace std;
string PrintToHex(pthread_t& tid)
{
char cache[64];
snprintf(cache,sizeof(cache),"0x%lx",tid);
return cache;
}
void *ThreadRun(void *args)
{
string name = static_cast<char *>(args);
while (1)
{
cout << "new thread running,name:" << name << endl;
sleep(1);
//break;
}
//pthread_exit(args);//等价于return args
//return args;
}
int main()
{
int num = 10;
vector<pthread_t> tids;
for (int i = 0; i < num; i++)
{
pthread_t tid;
/*char name[64];*///这样写是在栈空间上开辟,会有问题——线程覆盖问题
char* name=new char[64];//这样在堆空间上开辟
sprintf(name, "thread-%d", i + 1);
pthread_create(&tid, nullptr, ThreadRun, (void *)name);
tids.emplace_back(tid);
}
//join等待
sleep(5);
for(auto tid:tids)
{
pthread_cancel(tid);
void* ret=nullptr;
pthread_join(tid,(void**)&ret);
cout<<PrintToHex(tid)<<" quit ... ret:"<<(long long int)ret<<endl;
// delete ret;
}
//sleep(100);
return 0;
}
(3)几个问题
①main线程和new线程,谁先运行是不确定的。
②我们期望main线程是最后退出,如何做到?
main线程需要对new线程进行回收,所以我们期望main线程最后退出。做到这一点是通过join函数做到的。如果new线程退出了,main线程还没退出,且main线程没有join,那么此时new线程会进入类似僵尸进程的状态。
③tid到底是什么?
虚拟地址,可以以16进制的形式打印出来方便观察。
打印tid的代码:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include<string>
using namespace std;
string PrintToHex(pthread_t& tid)
{
char cache[64];
snprintf(cache,sizeof(cache),"0x%lx",tid);
return cache;
}
void* ThreadRun(void* args)
{
string name=static_cast<const char*>(args);
while(1)
{
sleep(1);
pthread_t tid=pthread_self();
cout<<"new thread running...name:"<<name<<" tid:"<<PrintToHex(tid)<<endl;
}
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,ThreadRun,(void*)"thread-1");
cout<<"new thread tid:"<<PrintToHex(tid)<<endl;
pthread_join(tid,nullptr);
return 0;
}
运行结果:
而查到的LWP:
也就是说,给用户提供的线程ID,不是内核中的LWP,而是pthread维护的一个唯一值。库内部也要承担对线程的管理。(首先便是对线程ID的赋值)
要理解线程ID是一个地址,首先理解pthread库。pthread库本质是一个文件,进程未开启时是在磁盘上的。
多线程在创建之前,首先是一个进程;最开始加载时磁盘上的可执行程序加载到内存,然后建立pcb、虚拟地址空间、页表,然后页表在内存中作映射,这时候一个进程被创建好,然后才有多线程。
当进程创建好之后,要创建新线程,调用pthread_create方法,这是就需要将pthread库加载到内存中。同时要将被加载的库映射到地址空间的堆栈之间的共享区。此时就可以创建新线程了。这是加载的pthread库叫做共享库。原因是,当有新进程创建时,不需要再从磁盘中加载pthread库了,而只需要在新进程的共享区建立与内存中 pthread库的映射。这和以前的动态库加载是一样的。
总结:线程ID就是线程属性集合的起始虚拟地址,其是在pthread库中维护的。
④全面看待对线程运行函数传参
给线程运行函数传参是穿一个void*,即一个地址,我们可以通过这个地址传整数、字符串、甚至类对象地址。传递一个类对象,那么可以给线程传递多个参数、方法。
如果直接在主线程定义一个结构体,那么这是在主线程的栈上开辟的空间存储该变量,这与线程要有自己的独立栈空间矛盾了,更重要的是,如果有多个新线程,其中某个新线程对该栈空间上的变量做修改,那么就会导致全部都会变化。
所以一般采用new的写法,在堆空间上开辟空间。
⑤线程运行函数的返回值
pthread_join函数的第二个参数是输出型参数,用于获取线程运行函数的返回值。返回值void* ret是指针变量,意味着其是有空间来接收返回值的。
与进程退出的区别是,进程异常退出时会有退出信号,但线程异常退出时意味着整个进程退出(信号是发给进程的),其余线程也就退出了。所以线程退出时只关注正确退出的情况。
线程返回一个void*,即一个地址,我们可以通过这个地址传整数、字符串、甚至类对象地址。传递一个类对象,那么可以让线程返回多个参数、方法。
⑥如何创建多线程?
见(2)②多线程代码
⑦新线程如何终止?
a、线程函数return
b、exit() 不可以,其表示的是进程终止;return表示函数退出,只有main函数的return表示进程退出,exit则是进程退出
c、pthread_exit()专门用来终止一个线程,是新线程自己主动调用
d、pthread_cancel取消一个线程 一般都用主线程取消一个新线程;新线程退出结果是-1
注:main线程结束,表示进程结束,所以要尽量保证主线程最后终止。
⑧如何不join线程,而是让其结束后直接退出
使用线程分离。一个线程被创建,默认是必须要被 join的。如果一个线程被分离,那么该线程的工作状态为分离状态,不需要也不能被join;但被分离的进程依旧属于进程内部。
可以新线程自己把自己分离pthread_detach(pthread_self()),也可以主线程将新线程分离。
四、C++中的多线程
以上说的是linux中的原生线程库的操作;C++标准中的线程库其实是对linux中的原生线程库的封装,这意味着在编译时也要加上-lpthread.
真实情况是,C++标准对每个环境下的线程都做了封装,所以语言具有跨平台性。
文件操作也是类似的。
类似C++标准库,简单封装一下线程库
hpp文件代码:
#pragma once
#include <pthread.h>
#include <iostream>
#include <string>
class mythread
{
typedef void (*fun_t)(const std::string &name);
public:
mythread(const std::string name,fun_t func)
:_name(name),_func(func)
{
_isrunning=false;
}
static void *threadRun(void *args)
{
mythread *thread = static_cast<mythread *>(args);
thread->_func(thread->_name);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
if (n != 0)
return false;
else
{
_isrunning = true;
return true;
}
}
void Stop()
{
if(_isrunning)
{
_isrunning=false;
::pthread_cancel(_tid);
}
}
void Join()
{
pthread_join(_tid,nullptr);
}
~mythread()
{
if(_isrunning)
{
Stop();
Join();
}
}
private:
pthread_t _tid;
std::string _name;
bool _isrunning;
fun_t _func;
};
注:这里的线程运行函数要注意,写在类内部默认有this指针参数,所以要加上static使其成为静态的。然后传入this指针调用类内部的函数fun_t.
#include<iostream>
#include<unistd.h>
#include<string>
#include"mythreadlib.hpp"
void mythreadrun(const std::string& name)
{
while(1)
{
std::cout<<"new thread is running,name:"<<name<<std::endl;
sleep(1);
}
}
int main()
{
std::string name="thread1";
mythread thread1(name,mythreadrun);
thread1.Start();
sleep(5);
thread1.Stop();
thread1.Join();
return 0;
}
以上就是我们自己模拟的对原生线程的管理。描述是通过这个类描述的,管理可以通过一个vector进行管理。
五、线程互斥
线程之间天然就很容易看到同一份资源,所以通信很容易,但容易造成数据不一致问题;对于多个线程能看到的资源,我们称之为共享资源,我们需要对这部分资源进行保护。保护资源的方式分为互斥和同步。
以下是一个抢票的代码,体现了数据不一致的问题。
#include <iostream>
#include <unistd.h>
#include <string>
#include "mythreadlib.hpp"
int g_tickets = 10000;
void mythreadrun(const std::string &name)
{
while (1)
{
if (g_tickets > 0)
{
usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
//std::cout << "here success" << std::endl;
std::cout << name << " get ticket:" << g_tickets << std::endl;
g_tickets--;
}
else
{
break;
}
}
}
int main()
{
// std::string name="thread1";
mythread thread1("thread1", mythreadrun);
mythread thread2("thread2", mythreadrun);
mythread thread3("thread3", mythreadrun);
mythread thread4("thread4", mythreadrun);
thread1.Start();
thread2.Start();
thread3.Start();
thread4.Start();
thread1.Join();
thread2.Join();
thread3.Join();
thread4.Join();
return 0;
}
抢到最后,程序运行结果:
会发现出现抢票负数的问题,但在代码逻辑中是对g_tickets做了是否大于0的判断的。
解释原因:
直接原因:首先,对g_tickets做是否大于0的判断是一种计算。该计算是通过CPU进行调度的。判断tickets是否大于0要通过三步:
CPU寄存器内部的数据可以有多套,属于线程私有,看起来放在了一套公共寄存器,但是属于线程私有,当线程切换时要带走自己的数据;回来的时候会恢复。tickets只剩一张时,线程1被调度,判断其是满足条件的,但当①步骤做完了,②步骤还没做时,线程被切换到线程2,此时全局变量tickets还没变化还是1,那么线程2也判断其是满足条件的,也可以抢这张票。tickets--和判断tickets是否大于0是两个独立的操作,其也需要分三步:重读数据,--数据,写回数据。那么在重读数据时就会发生tickets变为负数的情况。
1、相关接口使用
如果锁是全局的或者静态的,那么直接init即可,由于此时锁的生命周期与整个程序的生命周期是一致的,则不需要destory;如果锁是动态开辟的,那么则需要初始化函数对其初始化,且需要destory
锁被创建出来、初始化之后,需要加锁、解锁
我们使用锁对临界资源进行保护,本质是对临界区代码进行保护。我们对所有资源进行访问,本质都是通过代码进行访问的。所以我们保护资源,就是把访问资源的代码保护起来。
①加锁的原则是加锁的范围,粒度一定要尽量小(临界区包含的代码要尽量少)
②任何线程,要进行抢票,都要先申请锁,原则上不该有例外
③所有线程申请锁,前提是所有线程都得看到这把锁,锁本身也是共享资源;那么就要求加锁的过程,必须是原子的。
④原子性:要么不做,要么做完,没有中间状态,就是原子性。
⑦线程申请锁成功,执行临界区的代码期间,可以切换。(线程切换在任意时刻都可能做)但其他线程无法进入临界区,因为申请锁成功的线程未释放锁。也就是说,申请锁成功的线程可以放心的执行临界区代码,没有其他线程可以进入临界区。
这对于其他线程,要么本线程没有申请锁,要么释放了锁,对其他线程才有意义。这意味着,本线程访问临界区,对其他线程是原子的。
相关代码:
#include <iostream>
#include <unistd.h>
#include <string>
#include<vector>
#include "mythreadlib.hpp"
#include"LockGuard.hpp"
int g_tickets = 10000;
//pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; // 定义全局锁并初始化
/*void mythreadrun(const std::string &name)
{
while (1)
{
//pthread_mutex_lock(&gmutex);
if (g_tickets > 0)
{
usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
// std::cout << "here success" << std::endl;
std::cout << name << " get ticket:" << g_tickets << std::endl;
g_tickets--;
//pthread_mutex_unlock(&gmutex);
}
else
{
//pthread_mutex_unlock(&gmutex);
break;
}
}
}*/ //全局变量锁的写法
void myRoute (ThreadData* td)
{
while(1)
{
lockguard lock(td->_lock); //临时变量 此段代码执行完时生命周期结束
if (g_tickets > 0)
{
usleep(1000); // 休息1000微秒 1ms ->抢票花费的时间
// std::cout << "here success" << std::endl;
std::cout << td->_name << " get ticket:" << g_tickets << std::endl;
g_tickets--;
//pthread_mutex_unlock(td->_lock);
}
else
{
//pthread_mutex_unlock(&gmutex);
//pthread_mutex_unlock(td->_lock);
break;
}
}
}
static int thread_num=4;
int main()
{
std::vector<mythread> threads;
pthread_mutex_t lock;
pthread_mutex_init(&lock,nullptr);
for(int i=0;i<thread_num;i++)
{
std::string name="thread-"+std::to_string(i+1);
ThreadData* td=new ThreadData(name,&lock);
threads.emplace_back(name,myRoute,td);
}
for(auto& thread:threads)
{
thread.Start();
}
for(auto& thread:threads)
{
thread.Join();
}
return 0;
}
#pragma once
//hpp代码一
#include <pthread.h>
#include <iostream>
#include <string>
class ThreadData
{
public:
ThreadData(const std::string &name, pthread_mutex_t *lock)
: _name(name), _lock(lock)
{
}
public:
std::string _name;
pthread_mutex_t *_lock;
};
class mythread
{
typedef void (*fun_t)(ThreadData *td);
public:
mythread(const std::string name, fun_t func, ThreadData *td)
: _name(name), _func(func), _td(td)
{
_isrunning = false;
}
static void *threadRun(void *args)
{
mythread *thread = static_cast<mythread *>(args);
thread->_func(thread->_td);
thread->_isrunning = false; // 运行结束
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadRun, (void *)this);
if (n != 0)
return false;
else
{
_isrunning = true;
return true;
}
}
void Stop()
{
if (_isrunning)
{
_isrunning = false;
::pthread_cancel(_tid);
}
}
void Join()
{
pthread_join(_tid, nullptr);
}
~mythread()
{
if (_isrunning)
{
Stop();
Join();
}
}
private:
pthread_t _tid;
std::string _name;
bool _isrunning;
fun_t _func;
ThreadData *_td;
};
#pragma once
//hpp代码2
#include<pthread.h>
class lockguard
{
public:
lockguard(pthread_mutex_t* lock)
:_lock(lock)
{
pthread_mutex_lock(_lock);
}
~lockguard()
{
pthread_mutex_unlock(_lock);
}
private:
pthread_mutex_t* _lock;
};
2、从原理角度理解锁
申请锁成功,允许进入临界区的本质是pthread_mutex_lock()函数会返回;反之,申请锁失败(表示锁没有就绪),pthread_mutex_lock()函数不返回,线程阻塞了。当申请锁成功的线程pthread_mutex_unlock()之后,库中的线程会被唤醒,从而重新申请锁。
3、从实现角度理解锁
前提:
①CPU的寄存器只有一套,被所有的线程共享。但是寄存器里面的数据,属于执行流的上下文,属于执行流私有的数据。
②CPU在执行代码的时候,一定要有对应的执行载体。进程&&线程。
③数据在内存中,是被所有线程共享的。
结论:把数据移动到寄存器,本质是把数据从共享的状态变为线程私有的状态。
六、线程同步
仅仅是线程互斥的话,可能会导致一个线程在某段时间内一直获取资源,因为某线程在第一次获取临界资源的时候,下一次其再次获取临界资源的可能性是更大的。我们为了让线程获取临界资源更合理、让其具有顺序性,这里的顺序性就是同步。需要注意的是,这里的顺序可以是严格的顺序性,也可以是相对的顺序性。
1、条件变量
(1)相关接口
(2)理解条件变量
一个线程队列+通知机制(唤醒一个或者唤醒全部)
用一个测试代码测试:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>
#include <vector>
const int threadnum = 4;
int gtickets = 10000;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void *thread_Run(void *args)
{
char* name=static_cast<char*>(args);
while (1)
{
//加锁、同步
usleep(1000);
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
std::cout<<"I am "<<name<<",running..."<<std::endl;
pthread_mutex_unlock(&lock);
}
}
int main()
{
// 对线程同步进行测试
std::vector<pthread_t> threads;
for (int i = 0; i < threadnum; i++)
{
pthread_t tid;
char *name = new char[128];
snprintf(name, 128, "thread-%d", i + 1);
pthread_create(&tid, nullptr, thread_Run, (void *)name);
threads.emplace_back(tid);
}
// 线程等待
while(1)
{
//唤醒线程
//pthread_cond_broadcast(&cond);
pthread_cond_signal(&cond);
sleep(1);
}
// sleep(10);
for (auto& thread : threads)
{
pthread_join(thread, nullptr);
}
return 0;
}
测试结果:
可以发现是按照一定的顺序打印,这就保证了线程同步。
条件变量的使用需要配合互斥锁、等待以及唤醒函数。
2、生产消费模型
生产消费模型有一个三原则:
①一个交易场所:(特定数据结构形式存在的一段内存空间)
③三种关系:生产者和生产者,消费者和消费者,生产者和消费者,以上的三种关系全都是互斥关系。
3、代码实现生产消费模型
首先是利用阻塞队列来实现生产消费模型,什么是阻塞队列:
hpp代码:
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#define DEFAULT_CAP 4
template <typename T>
class Prod_Cons_Model
{
public:
Prod_Cons_Model() {}
Prod_Cons_Model(int max_cap = DEFAULT_CAP)
: _max_cap(max_cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cons_cond,nullptr);
pthread_cond_init(&_prod_cond,nullptr);
}
~Prod_Cons_Model()
{
pthread_mutex_destroy(&_mutex);
// pthread_cond_destroy(&_cons_cond);
// pthread_cond_destroy(&_prod_cond);
pthread_cond_destroy(&_cons_cond);
pthread_cond_destroy(&_prod_cond);
}
bool Is_Full()
{
return _block_queue.size() == _max_cap;
}
bool Is_Empty()
{
return _block_queue.size() == 0;
}
void Productor(const T &data)
{
// 临界资源
pthread_mutex_lock(&_mutex);
while (Is_Full()) // 是while而不是if
{
pthread_cond_wait(&_prod_cond, &_mutex);
}
_block_queue.push(data);
pthread_mutex_unlock(&_mutex);//和signal函数的顺序对调也可以
pthread_cond_signal(&_cons_cond);
}
// 消费数据
void Consume(T *ret)
{
pthread_mutex_lock(&_mutex);
while (Is_Empty())
{
pthread_cond_wait(&_cons_cond, &_mutex);
}
*ret = _block_queue.front();
_block_queue.pop();
// 唤醒生产者
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_prod_cond);
}
private:
std::queue<T> _block_queue;
int _max_cap;
pthread_mutex_t _mutex;
pthread_cond_t _cons_cond;
pthread_cond_t _prod_cond;
};
cpp代码:
#include "model.hpp"
#include <vector>
#include <cstdlib>
#include <pthread.h>
#include <ctime>
#include <iostream>
void *myconsume(void *args)
{
auto model = static_cast<Prod_Cons_Model<int> *>(args);
while (1)
{
int ret;
model->Consume(&ret);
std::cout << "I am customer,get data:" << ret << std::endl;
}
return nullptr;
}
void *myproductor(void *args)
{
auto model = static_cast<Prod_Cons_Model<int> *>(args);
while (1)
{
sleep(2);
int data = rand() % 5; // 生成5以内的随机数
model->Productor(data);
std::cout << "I am productor,prodece data:" << data << std::endl;
}
return nullptr;
}
int main()
{
Prod_Cons_Model<int> *Int_Model = new Prod_Cons_Model<int>(4);
pthread_t cons1;
pthread_t cons2;
pthread_t prod1;
pthread_t prod2;
pthread_t prod3;
srand(time(nullptr) ^ getpid());
pthread_create(&cons1, nullptr, myconsume, Int_Model);
pthread_create(&cons2, nullptr, myconsume, Int_Model);
pthread_create(&prod1, nullptr, myproductor, Int_Model);
pthread_create(&prod2, nullptr, myproductor, Int_Model);
pthread_create(&prod3, nullptr, myproductor, Int_Model);
pthread_join(cons1, nullptr);
pthread_join(cons2, nullptr);
pthread_join(prod1, nullptr);
pthread_join(prod2, nullptr);
pthread_join(prod3, nullptr);
return 0;
}
①pthread_cond_wait函数详解:
当pthread_cond_wait函数调用的时候,不仅让自己这个线程进入等待状态(停在该函数内部),还释放获取的锁,当被唤醒时再次进入队列中竞争锁。当再次竞争到锁时,函数才返回,这就是该函数第二个参数还要传入一个锁的原因。
pthread_cond_wait函数一定要在临界代码中,原因是等待函数要检测队列中数据是否满足条件,这个检测的过程需要在临界代码中。而信号量是不需要判断。
②用while而不是if
这里注意,在消费函数和生产函数内部需要等待的时候,要用while而不是if,原因是:当有2个线程进入等待状态,而唤醒用的是broad_cast的话,2个线程同时被唤醒,其中一个竞争到锁,另一个被阻塞在锁里而不是wait里。竞争到锁的线程对队列中的内容进行操作(以消耗数据为例),如果此时将队列中的数据全消耗完了,然后释放锁,而另一个线程竞争到锁之后,往后走会直接消耗队列中的数据,而队列中的数据已经为空了,这就会导致问题。所以要用while而不是if.
③解锁和唤醒函数的顺序
在生产函数和消费函数内部,解锁和唤醒两个函数的顺序是可以对调的。因为无论哪种顺序,唤醒线程之后,被唤醒的线程还是要竞争同一把锁,所以在解锁前和解锁后都一样。但是唤醒的操作一定要在对队列进行操作之后。
④分配任务
注意:这里生产消费模型,还可以用于任务的分配执行。例如,生产出一个任务交给消费者去执行。
⑤代码适用性
以上的代码不仅适合单生产单消费情景,也适合多生产多消费情景。具体在实际应用中的如何选择则要根据需要。
4、生产消费模型的特点
(1)协调忙闲不均
通过线程同步,实现对忙闲不均的协调
(2)生产者代码和消费者代码解耦
生产者代码和消费者代码互不影响。
(3)效率高
尽管在临界资源中永远只有一个线程在执行,但是生产任务、处理任务也是需要花费时间的,对于消费者来说,获取任务和处理任务是并发的;对于生产者来说,发送任务和产生任务是并发的;这里的并发对于整个工作流程是效率高的。
5、信号量
(1)信号量概念
见《共享内存与信号量》一文。
(2)信号量相关接口
初始化:
value表示的是多少信号量。
信号量的P操作:
该操作的意思是,申请信号量,申请成功时信号量--;申请失败时则会阻塞在这个函数中。
信号量的V操作:
(3)环形队列配合信号量实现生产消费模型
环形队列的特点:当head==end时,队列为空或者队列为满。如何判断空还是满:引入一个计数器。
多线程如何在环形队列中生产和消费:
①当队列为空,让生产者先生产
②当队列为满时,让消费者先消费
③为空为满是少数情况,大部分情况是既不为空,也不为满;此时head(生产者下标)和end(消费者下标)一定不指向一个位置,此时允许生产和消费同时进行。此时可以看出环形队列一定是比阻塞队列实现的生产消费模型快的。
以上的这些条件,可以直接使用信号量实现。所以说信号量是用来实现互斥与同步的。
对于消费者来说,数据资源是他真正的资源;而对于生产者而言,空间资源是其真正的资源。所以我们得设置两个信号量,一个是对应数据资源,一个是对应空间资源。在初始化时设置数据资源为空,空间资源为满(等于环形队列的容量)。
对于生产者,要申请空间资源(P一个空间资源),但释放的是一个数据资源(V一个空间资源),因为生产者申请到一个空间资源后,是向这个空间资源中放数据的,放完数据后空间资源并未增多,而是数据资源增多了一个;对于消费者,要申请数据资源,而要释放一个空间资源,因为消费者申请一个数据资源后,是拿到该数据,拿到之后该数据已经没有用了,所以是释放空间资源,可以让生产者再在这个空间生产数据。
(4)代码实现
先写上层调用逻辑:
对于单生产单消费模型,让生产者生产,消费者消费,让两者看到同一份环形队列资源,生产者不断地生产,消费者不断地消费(对于队列空和满的情况在.hpp文件中实现即可)。
Main.cc代码:
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *Consume(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (1)
{
// 消费
int out;
rq->Pop(&out);
// 处理数据
std::cout << "得到的数据为:" << out << std::endl;
sleep(1);
}
}
void *Productor(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
sleep(2);
while (1)
{
// 构造数据
int in = rand() % 10 + 1;
// 生产
rq->Push(in);
std::cout << "构建的数据为:" << in << std::endl;
sleep(2);
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consume, rq);
pthread_create(&p, nullptr, Productor, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
hpp代码:
#pragma once
#include<pthread.h>
#include<iostream>
#include<string>
#include<vector>
#include <semaphore.h>
#define DEFAULT_SIZE 5
template<typename t>
class RingQueue
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
public:
RingQueue(int size=DEFAULT_SIZE)
:_max_cap(size),
_head(0),
_end(0)
{
_queue.resize(_max_cap);
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,_max_cap);
pthread_mutex_init(&_c_mutex,nullptr);
pthread_mutex_init(&_p_mutex,nullptr);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
void Push(const t& data)//生产
{
P(_space_sem);
pthread_mutex_lock(&_p_mutex);
_queue[_head]=data;
_head++;
_head%=_max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(t* out)//消费
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out=_queue[_end];
_end++;
_end%=_max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
private:
std::vector<t> _queue;
int _max_cap;
int _head;//生产者下标
int _end;//消费者下标
// int _data_num; //不太需要
sem_t _data_sem;
sem_t _space_sem;
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
代码验证结果:(保持了同步以及互斥)
但如果要实现多生产多消费,则要加上锁。