Python中的IO编程和多进程/线程

Posted by Waynerv on

category: Python

Tags: 多线程

IO编程

文件读写

  1. 在磁盘上读写文件的功能都是由操作系统提供的,现代操作系统不允许普通的程序直接操作磁盘,所以,读写文件就是请求操作系统打开一个文件对象(通常称为文件描述符),然后,通过操作系统提供的接口从这个文件对象中读取数据(读文件),或者把数据写入这个文件对象(写文件)。
  2. 打开文件对象方法:使用Python内置的open()函数打开文件对象,传入文件名和mode标识符。
  3. open()函数完整表示:

    open(file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None)`
    
    打开文件并返回对应的文件对象,如果文件无法打开,则引起OSError异常。
    

读文件

  1. 打开文件对象,使用mode标识符'r'表示读取(默认值):

    >>> f = open('/Users/michael/test.txt', 'r')
    
  2. 读取文件内容:

    • f.read()一次性读取全部文件对象的全部内容到内存,用一个str对象表示,可以反复调用read(size)方法,每次最多读取size个字节的内容
    • f.readline()每次读取一行内容,可以通过循环结构输出所有内容
    • f.readlines()读取全部内容并自动将文件内容分析成一个行的列表,该列表可以由 Python 的 for … in … 结构进行处理
  3. 关闭文件,调用文件对象的close()方法。文件使用完毕后必须关闭,因为文件对象会占用操作系统的资源。
  4. 无错读取,文件读写容易产生IOError,一旦出错则无法正常调用f.close(),可以通过try ... finally实现,但更推荐用with语句: with open('/path/to/file', 'r') as f: print(f.read()) # 自动调用f.close()方法

写文件

  1. 打开文件对象,使用mode标识符'w'表示覆盖写(追加写为'a'): f = open('/Users/michael/test.txt', 'w') f.write('Hello, world!') f.close()

  2. 同样需要在写入后调用f.close()关闭文件对象,才能保证已经写入磁盘。同样推荐使用with语句: with open('/Users/michael/test.txt', 'w') as f: f.write('Hello, world!')

字符编码

  1. 读取或写入非utf-8编码(Python3默认编码)的文本文件,调用open()函数时传入encoding=参数即可。编码错误可通过errors参数忽略。 f = open('/Users/michael/gbk.txt', 'r', encoding='gbk', errors='ignore')

二进制文件

  1. 要读取二进制文件,比如图片、视频等等,用'rb'模式打开文件即可(默认为't'模式即文本模式)。

StringIO和BytesIO

  1. 在内存中创建具备与文件对象一致的读写接口的对象。

文件读取指针

  1. 文件对象的tell()方法,返回文件的当前位置,即文件指针当前位置。
  2. 文件对象的seek()方法,用于移动文件读取指针到指定位置,参数为偏移量。

StringIO

  1. 创建一个StringIO,然后像文件一样写入即可。

    >>> from io import StringIO
    >>> f = StringIO() # 可以直接使用str参数初始化
    >>> f.write('hello')
    5
    >>> print(f.getvalue())
    hello
    
  2. getvalue()方法用于获得写入后的str。

  3. 可以用一个str初始化StringIO,然后像读文件一样读取。

BytesIO

  1. StringIO只能操作str,如果要操作二进制数据需要使用BytesIO。
  2. 读写方法与StringIO类似,但注意写入的对象是bytes:f.write('中文'.encode('utf-8'))

操作文件和目录

环境变量

  1. 查看所有环境变量:os.environ(实际是个字典对象)。
  2. 获取某个环境变量:os.environ.get('PATH')

操作文件和目录

  1. 操作目录:

    # 查看当前目录的绝对路径:
    >>> os.path.abspath('.')
    '/Users/michael'
    # 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来:
    >>> os.path.join('/Users/michael', 'testdir')
    '/Users/michael/testdir'
    # 然后创建一个目录:
    >>> os.mkdir('/Users/michael/testdir')
    # 删掉一个目录:
    >>> os.rmdir('/Users/michael/testdir')
    
  2. 尽量通过os.path.join合成路径而不是拼接字符串,因为不同操作系统路径分隔符不同。

  3. 拆分路径:os.path.split()函数,后一部分总是最后级别的目录或文件名。os.path.splitext()拆分出文件拓展名。
  4. 操作文件:

    # 对文件重命名:
    >>> os.rename('test.txt', 'test.py')
    # 删掉文件:
    >>> os.remove
    
  5. 复制文件需要借助其他的模块如shutil提供的copyfile()函数。

  6. 运用示例: # 列出当前目录下的所有目录 >>> [x for x in os.listdir('.') if os.path.isdir(x)] ['.lein', '.local', '.m2', '.npm', '.ssh', '.Trash', '.vim', 'Applications', 'Desktop', ...] # 列出当前目录下的所有.py文件 >>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py'] ['apis.py', 'config.py', 'models.py', 'pymonitor.py', 'test_db.py', 'urls.py', 'wsgiapp.py']

序列化

对象从内存中变成可存储或传输的过程称之为序列化,在Python中叫pickling。 序列化之后,可以把序列化后的内容写入磁盘,或者通过网络传输到别的机器上。 反过来,把变量内容从序列化的对象重新读到内存里称之为反序列化,即unpickling。

pickle

序列化
  1. 使用pickle.dumps()函数把任意对象序列化成一个bytes,然后再写入文件。

    >>> import pickle
    >>> d = dict(name='Bob', age=20, score=88)
    >>> pickle.dumps(d)
    b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K...'
    
  2. 使用pickle.dump(object, fileobject)函数直接把对象序列化后写入一个file-like-object。

    >>> f = open('dump.txt', 'wb')
    >>> pickle.dump(d, f)
    >>> f.close()
    
反序列化
  1. 使用pickle.loads()函数从bytes中反序列化出对象,需要先读取内容到bytes
  2. 使用pickle.load(fileobject)函数从一个file-like-object中直接反序列化出对象。 >>> f = open('dump.txt', 'rb') # 注意为二进制读写 >>> d = pickle.load(f) >>> f.close() >>> d {'age': 20, 'score': 88, 'name': 'Bob'}

JSON

  1. Python内置的json模块可以将对象转换为JSON格式,使用的函数及用法与pickle模块基本相同(JSON规定编码为utf-8)。 >>> import json >>> d = dict(name='Bob', age=20, score=88) >>> json.dumps(d) '{"age": 20, "score": 88, "name": "Bob"}' >>> json_str = '{"age": 20, "score": 88, "name": "Bob"}' >>> json.loads(json_str) {'age': 20, 'score': 88, 'name': 'Bob'}

JSON进阶

  1. Python的dict对象可以直接序列化为JSON的{},但实例对象无法直接序列化,需要手写转换函数,再用可选参数default传递进序列化函数。
  2. 示例: def student2dict(std): return { 'name': std.name, 'age': std.age, 'score': std.score } >>> print(json.dumps(s, default=student2dict)) {"age": 20, "name": "Bob", "score": 88}

进程和线程

  1. 分时系统:操作系统可以轮流让各个任务在单核心中交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。
  2. 真正的并行执行多任务只能在多核CPU上实现,由于任务数量远远多于CPU的核心数量,所以操作系统也会自动把很多任务轮流调度到每个核心上执行。
  3. 多任务实现的3中方式:多进程模式;多线程模式;多进+多线程模式。
  4. 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
  5. 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程 ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。

多进程

  1. 操作系统提供fork()系统函数调用来复制当前进程启动子进程,fork()调用一次返回两次,父进程返回子进程的ID,子进程永远返回0

multiprocessing

  1. Python通过multiprocessing模块编写多进程,它提供了一个Process类来代表一个进程对象。
  2. 创建子进程:创建一个Process类实例,使用targetargs关键字传入一个执行函数和函数的参数,用start()方法启动,join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
  3. 实例代码:

    import os
    from multiprocessing import Process
    
    # 子进程要执行的代码
    def run_proc(name):
        print(f'Run child process {name} {os.getpid()}...')
    
    if __name__ == '__main__':
        print(f'Parent process {os.getpid()}.')
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
    

Pool

  1. 启动大量的子进程时,可以用进程池的方式批量创建子进程:

    from multiprocessing import Pool
    import os, time, random
    
    
    def long_time_task(name):
        print(f'Run task {name} {os.getpid()}...')
        start = time.time()
        time.sleep(random.random()*3)
        end = time.time()
        print('Task {} runs {:.2f} seconds'.format(name, (end - start)))
    
    
    if __name__ == '__main__':
        print(f'Parent process {os.getpid()}.')
        p = Pool(4) # 初始化进程池
        for i in range(5):
            p.apply_async(long_time_task, args=(i,)) # 加入任务到进程池
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    
  2. 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。同时执行的Pool进程有大小限制,默认大小为CPU的核数。

子进程

  1. subprocess.call()执行由参数提供的命令,返回状态码(成功为‘0’)。我们可以用列表作为参数运行命令,也可以用字符串作为参数运行命令(通过设置参数shell=True),参数shell默认为False,示例:

    subprocess.call(['df', '-h'])
    

    把shell设置为True

    subprocess.call('du -hs $HOME', shell=True)
    
  2. subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

    import subprocess
    
    print('$ nslookup www.python.org')
    r = subprocess.call(['nslookup', 'www.python.org'])
    print('Exit code:', r)
    
  3. 可以通过communicate()方法进行输入(相当于执行命令后再手动输入):

    import subprocess
    
    print('$ nslookup')
    p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = p.communicate(b'set q=mx\npython.org\nexit\n') 
    print(output.decode('utf-8'))
    print('Exit code:', p.returncode)
    
  4. communicate()返回元祖(stdout_data, stderr_data). The data will be strings if streams were opened in text mode; otherwise, bytes.Note that if you want to send data to the process’s stdin, you need to create the Popen object with stdin=PIPE. Similarly, to get anything other than None in the result tuple, you need to give stdout=PIPE and/or stderr=PIPE too.

进程间通信

  1. Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。
  2. 示例:

    from multiprocessing import Process, Queue
    import os, time, random
    
    # 写数据进程
    def write(q):
        print(f'Process to write:{os.getpid()}')
        for value in ['A', 'B', 'C']:
            print(f'Put {value} to queue...')
            q.put(value)
            time.sleep(random.random())
    
    # 读数据进程
    def read(q):
        print(f'Process to read:{os.getpid()}')
        while True:
            value = q.get(True)
            print(f'Get {value} from queue.')
    
    if __name__ == '__main__':
        # 父进程创建Queue,并传给各个子进程
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
        # 启动子进程pw,写入:
        pw.start()
        # 启动子进程pr,读取:
        pr.start()
        # 等待pw结束
        pw.join()
        # pr进程里是死循环,无法等待其结束,只能强行终止:
        pr.terminate()
    

多线程

Threading

  1. 绝大多数情况下,我们只需要使用threading这个高级模块。构造方法:

    class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    
  2. 启动一个线程:创建Thread实例并把一个函数传入,然后调用start()开始执行。

    import time, threading
    
    # 新线程执行的代码:
    def loop():
        print('thread %s is running...' % threading.current_thread().name)
        n = 0
        while n < 5:
            n = n + 1
            print('thread %s >>> %s' % (threading.current_thread().name, n))
            time.sleep(1)
        print('thread %s ended.' % threading.current_thread().name)
    
    print('thread %s is running...' % threading.current_thread().name)
    t = threading.Thread(target=loop, name='LoopThread')
    t.start()
    t.join()
    print('thread %s ended.' % threading.current_thread().name)
    
  3. Python的threading模块提供current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定。

Lock

  1. 多线程和多进程最大的不同在于,多线程拥有独立的内存空间,多进程共享内存空间,多个线程可以同时修改同一变量。
  2. 修改全局变量的操作在CPU执行时是若干条语句,而操作系统对线程的调度具有随机性,可导致操作顺序打乱,对变量的修改产生非预期的结果。如双线程计算自增自减。
  3. 为了防止以上现象出现,可以给指定操作加锁,只有持有唯一的锁才能执行操作,不会造成修改变量的冲突。创建一个锁通过threading.Lock()来实现:

    balance = 0
    lock = threading.Lock()
    
    def run_thread(n):
        for i in range(100000):
            # 先要获取锁:
            lock.acquire()
            try:
                # 放心地改吧:
                change_it(n)
            finally:
                # 改完了一定要释放锁:
                lock.release()
    
  4. 锁的好处:确保了某段关键代码只能由一个线程从头到尾完整地执行;坏处:阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,多个不同锁的获取释放可能造成死锁。

GIL

  1. Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
  2. Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

ThreadLocal

  1. 多线程环境下,一个线程使用局部变量比使用全局变量好,全局变量的修改必须加锁。但是局部变量在一个线程中各个函数之间互相传递很麻烦。
  2. 存放各个线程局部变量的全局变量:ThreadLocal对象,每个线程都可以读写,对象的每个属性都是线程的局部变量,可以任意读写互不干扰,不用手动管理锁。可以理解为ThreadLocal对象是一个dict,存放各个线程与局部变量的映射。
  3. 示例:

    import threading
    
    # 创建全局的ThreadLocak对象:
    local_school = threading.local()
    
    def process_thread(name):
        # 绑定ThreadLocal的student:
        local_school.student = name
        process_student()
    
    def process_student():
        # 获取当前线程关联的student:
        std = local_school.student
        print(f'Hello, {std} (in {threading.current_thread().name})')
    
    t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
    t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

进程vs 线程

  1. 多进程模式稳定性高,进程之间互不影响;但创建进程的代价大且操作系统同时运行的进程数有限制。
  2. 多线程模式创建快,但任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存
  3. 线程切换:CPU切换任务有一定消耗(保存及准备环境),因此多任务数量具有一定限度
  4. 计算密集型任务主要消耗CPU资源,适合C语言编写。
  5. IO密集型大部分时间都在等待IO操作完成,适合用开发效率高的脚本语言。
  6. 异步IO:单核CPU上采用单进程模型就可以高效地支持多任务,Python语言中单线程的异步编程模型称为协程。

分布式进程

  1. multiprocessing模块的managers子模块支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。
  2. ...

异步IO

  1. 同步IO:发出IO指令后,必须等待IO结果返回,才能继续执行后续代码。IO操作过程中当前线程被挂起,其他需要CPU执行的代码无法被当前线程执行。
  2. 异步IO:只发出IO指令,并不等待IO结果,然后就去执行其他代码。异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程:

    loop = get_event_loop()
    while True:
        event = loop.get_event()
        process_event(event)
    
  3. 在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程

  1. 协程:子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
  2. 协程的优势:没有线程切换开销,执行效率极高;不需要多线程的锁机制,效率比多线程高。多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
  3. Python对协程的支持通过generator实现。
  4. 生成器:如果一个def的主体包含yield,这个函数会自动变成一个生成器(即使它包含一个return)。yield会将值传给next()的调用方,同时还会保存生成器函数的“状态”,通过使用next()来从generator中获取下一个值。
  5. yield关键字返回number的值,而像other = yield foo这样的语句的意思是,”返回foo的值,这个值返回给调用者的同时,将other的值也设置为那个值”。你可以通过send(n)方法来将n值”发送“给生成器中的other。
  6. 协程的生产者-消费者模型:

    def consumer():
        r = ''
        while True:
            n = yield r # 接收send(n)传递过来的n值并继续执行
            if not n:
                return
            print('[CONSUMER] Consuming %s...' % n)
            r = '200 OK'
    
    def produce(c):
        c.send(None)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] Producing %s...' % n)
            r = c.send(n) # 接收consumer回传的r值
            print('[PRODUCER] Consumer return: %s' % r)
        c.close()
    
    c = consumer()
    produce(c)
    

注:转载本文,请与作者联系




如果觉得文章对您有价值,请作者喝杯咖啡吧

|
donate qrcode

欢迎通过微信与我联系

wechat qrcode

0 Comments latest

No comments.