一、线程
threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
创建线程的两种方式 1.threading.Thread
import threadingdef f1(arg): print(arg)t = threading.Thread(target=f1,args=(123,))#t.start代表这个线程已经准备就绪,等待cpu的调度。t.start()
2.自定义,继承threading.Thread
class MyThread(threading.Thread): def __init__(self, func,args): self.func = func self.args = args super(MyThread, self).__init__() def run(self): self.func(self.args)def f2(arg): print(arg)t1 = MyThread(f2,1234)t1.start()
二、线程锁
当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
##没使用锁import threadingimport timeNUM = 10def f1(): global NUM NUM -= 1 time.sleep(2) print(NUM)for i in range(10): t = threading.Thread(target=f1) t.start()##使用锁def f1(l): global NUM #上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM) #开锁 l.release()#创建锁,Rlock可以加多层锁lock = threading.RLock()for i in range(10): t = threading.Thread(target=f1,args=(lock,)) t.start()
三、线程池
#!/usr/bin/env python#-*- coding:utf-8 -*-import queueimport threadingimport timeclass ThreadPool: def __init__(self,maxsize=5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread)pool = ThreadPool(5)#把用完的线程再放回容器中def task(arg,p): print(arg) time.sleep(1) p.add_thread()for i in range(100):#t 是threading.Thread类 t = pool.get_thread()#创建线程对象 obj = t(target=task,args=(i,pool,)) obj.start()
进程
进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。并且python不能再Windows下创建进程!
并且在使用多进程的时候,最好是创建和CPU核数相等的进程
默认的进程之间相互是独立,如果想让进程之间数据共享,使用如下方法。
manager.dict() #共享数据
from multiprocessing import Processfrom multiprocessing import Managerdef foo(i,arg): arg[i] = i + 100 print(arg.values())if __name__ == '__main__': obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join()
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:apply和apply_async
#!/usr/bin/env python#-*- coding:utf-8 -*-from multiprocessing import Poolimport timedef f1(arg): time.sleep(1) print(arg)if __name__ == '__main__': pool = Pool(5) for i in range(20): # pool.apply(func=f1,args=(i,)) #一个一个执行 pool.apply_async(func=f1,args=(i,)) #并发 # pool.close() #所有的任务执行完毕(5个一起执行) #立即终止 time.sleep(1) pool.terminate() pool.join()
协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
#!/usr/bin/env python#-*- coding:utf-8 -*-from greenlet import greenletdef test1(): print(12) gr2.switch() print(34) gr2.switch()def test2(): print(56) gr1.switch() print(78)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()
遇到IO操作自动切换
#!/usr/bin/env python#-*- coding:utf-8 -*-#应用场景 #####监控url(检测)from gevent import monkey; monkey.patch_all()import geventimport requestsdef f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url))gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'),])
memcache
简述: Memcache是一套分布式的高速缓存系统,目前被许多网站使用以提升网站的访问速度,尤其对于一些大型的、需要频繁访问数据库的网站访问速度提升效果十分显著。 工作原理: MemCache的工作流程如下:先检查客户端的请求数据是否在memcached中,如有,直接把请求数据返回,不再对数据库进行任何操作;如果请求的数据不在memcached中,就去查数据库,把从数据库中获取的数据返回给客户端,同时把数据缓存一份到memcached中(memcached客户端不负责,需要程序明确实现);每次更新数据库的同时更新memcached中的数据,保证一致性;当分配给memcached内存空间用完之后,会使用LRU(Least Recently Used,最近最少使用)策略加上到期失效策略,失效数据首先被替换,然后再替换掉最近未使用的数据。 Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。 Memcached是以守护程序(监听)方式运行于一个或多个服务器中,随时会接收客户端的连接和操作 memcahce的安装
yum install libevent-devel -ywget http://memcached.org/latesttar -zxvf memcached-1.x.x.tar.gzcd memcached-1.x.x./configure && make && make test && sudo make install
启动memcache
[root@linux-node1 ~]# memcached -d -m 10 -u root -l 192.168.1.11 -p 11211 -c 256 -P /tmp/memcached.pid[root@linux-node1 ~]# netstat -antlp|grep 11211tcp 0 0 192.168.1.11:11211 0.0.0.0:* LISTEN 2251/memcached##### 参数说明-d 是启动一个守护进程-m 是分配给Memcache使用的内存数量,单位是MB-u 是运行Memcache的用户-l 是监听的服务器IP地址-p 是设置Memcache监听的端口,最好是1024以上的端口-c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定-P 是设置保存Memcache的pid文件
memcache命令
存储命令: set/add/replace/append/prepend/cas获取命令: get/gets其他命令: delete/stats..
python操作memcache
python操作Memcached使用Python-memcached模块下载安装:https://pypi.python.org/pypi/python-memcached###############import memcachemc = memcache.Client(['192.168.1.11:11211'], debug=True)mc.set("foo", "bar")ret = mc.get('foo')print ret ###add #!/usr/bin/env python # -*- coding:utf-8 -*-
import
memcache
mc
=
memcache.Client([
'192.168.1.11:11211'
], debug
=
True
)
mc.add(
'k1'
,
'v1'
)
# mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!
支持集群
python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比。
队列
Python队列方法
·put 放数据
·get 取数据
·qsize 返回队列的大小
·maxsize 最大支持个数
·join 等待队列为空的时候,在执行别的操作
·empty 当队列为空的时候,返回True,否则返回False
·full 当队列为满的时候,返回True,否则返回False
######先进先出
#!/usr/bin/env python#-*- coding:utf-8 -*-import queue#maxsize 最大支持多少个排队数q = queue.Queue(2)#put放数据,默认阻塞,block是否阻塞,timeout超时时间#empty 检查队列是否为空print(q.empty())q.put(123)q.put(456)q.get()#get取数据,默认阻塞,block是否阻塞,timeout超时时间#队列中的真实个数,qsizeprint(q.qsize())#join,task_done阻塞进程,当队列中任务执行完成后,不再阻塞,task_done表示任务执行完成
#######其他三种队列
#!/usr/bin/env python#-*- coding:utf-8 -*-import queue#后进先出队列q = queue.LifoQueue()q.put(123)q.put(456)print(q.get())#优先级队列#当优先级相同时,按放数据顺序取数据q1 = queue.PriorityQueue()q.put((1,"alex1"))q.put((2,"alex2"))#双向队列q2 = queue.deque()q2.append(123)q2.append(456)q2.appendleft(333)print(q2)
生产者消费者模型
消费者 ========》 队列(缓冲区) =========》 生产者
#!/usr/bin/env python#-*- coding:utf-8 -*-import threadingimport timeimport randomimport queue #队列模块def Producer(name,que): #生产者 while True: que.put('包子') #相当于把包子放到仓库里 print('%s:做了一个包子' %name) #打印出做了一个包子出来 time.sleep(random.randrange(5)) #厨师5秒内做出一个包子def Consumer(name,que): #消费者 while True: try: #异常处理,如果碰到没有包子可吃就等待厨师做包子 que.get_nowait() print('%s:吃了一个包子' %name) except Exception: print(u'没有包子了') time.sleep(random.randrange(3)) #消费者3秒内吃掉一个包子q = queue.Queue() #队列p1 = threading.Thread(target=Producer,args=['厨师1',q]) #目标是Producer这个函数,args是传参p2 = threading.Thread(target=Producer,args=['厨师2',q])p1.start()p2.start()c1 = threading.Thread(target=Consumer,args=['张三',q])c2 = threading.Thread(target=Consumer,args=['李四',q])c1.start()c2.start()