aiohttp ClientSession 用法踩坑

一般是这么用的: async with aiohttp.ClientSession() as ses: res = await ses.post(xxx) text = await res.text() xxx 没有问题 但是为了减少缩进,可能想这样封装一下: class HTTP: @staticmethod async def post(*args, **kwargs): async with aiohttp.ClientSession() as ses: return await ses.post(*args, **kwargs) 这是有问题的,with 上下文之后会关闭session的连接和资源,如果payload比较大,在连接关闭之后还没读完的话,可能会卡在await ses.text()那里,导致超时 所以需要在上下文关闭之前就把内容读取完毕并返回。 可以这样: class HTTP: @staticmethod async def post(*args, **kwargs): async with aiohttp.ClientSession() as ses: async with await ses.post(*args, **kwargs) as res: return res, await res.text() 或者这样: class HTTP: @classmethod async def get(cls, *args, **kargs): await cls....

September 1, 2020 · 1 min · Egbert Ke

问题记录:Python 协程相关

import asyncio import multiprocessing q = multiprocessing.Queue(10000) for i in range(100): q.put(i) async def coro(i): print('coro... {}'.format(i)) async def device_video_main(j): loop = asyncio.get_event_loop() for i in range(5): asyncio.ensure_future(coro(j), loop=loop) # await asyncio.sleep(1) async def run_integrate(): while True: j = q.get() print('j: ', j) if True: loop = asyncio.get_event_loop() coro = device_video_main(j) loop.create_task(coro) # asyncio.ensure_future(coro, loop=loop) # await asyncio.sleep(1) # print(loop.is_running()) # async def main(): # loop = asyncio.get_event_loop() # for i in range(3): # asyncio....

May 14, 2020 · 1 min · Egbert Ke

asyncio.sleep 和 time.sleep 的区别

time.sleep是针对整个线程,整个线程会挂起,不再执行任何操作。 asyncio.sleep是针对当前协程而言,告诉事件循环:请去执行别的操作,相当于模拟了一次网络IO,不会阻塞其他协程的执行。 import time import asyncio async def hello(): print('Hello ...') await asyncio.sleep(5) # time.sleep(5) print('... World!') async def main(): await asyncio.gather(hello(), hello()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 运行结果: Hello ... Hello ... ... World! ... World! import time import asyncio async def hello(): print('Hello ...') # await asyncio.sleep(5) time.sleep(5) print('... World!') async def main(): await asyncio.gather(hello(), hello()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 运行结果: Hello ... ... World! Hello ... ... World!

May 10, 2020 · 1 min · Egbert Ke

SQLAlchemy中常见的query filter

equals query.filter(User.name == 'leela') not equals: query.filter(User.name != 'leela') LIKE query.filter(User.name.like('%leela%')) IN query.filter(User.name.in_(['leela', 'akshay', 'santanu'])) # works with query objects too: query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%santanu%')))) NOT IN query.filter(~User.name.in_(['lee', 'sonal', 'akshay'])) IS NULL filter(User.name == None) IS NOT NULL filter(User.name != None) AND from sqlalchemy import and_ filter(and_(User.name == 'leela', User.fullname == 'leela dharan')) #or, default without and_ method comma separated list of conditions are AND filter(User.name == 'leela', User.fullname == 'leela dharan') # or call filter()/filter_by() multiple times filter(User....

December 8, 2019 · 1 min · Egbert Ke

Python 在CSV文件中写入中文字符

对于UTF-8编码,Excel要求BOM(字节顺序标记)写在文件的开始,否则它会假设这是ANSI编码,这个就是与locale有依赖性了。 Python2 #!python2 #coding:utf8 import csv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','wb') as f: f.write(u'\ufeff'.encode('utf8')) w = csv.writer(f) for row in data: w.writerow([item.encode('utf8') for item in row]) Python3 #!python3 #coding:utf8 import csv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','w',newline='',encoding='utf-8-sig') as f: w = csv.writer(f) w.writerows(data) unicodecsv #!python2 #coding:utf8 import unicodecsv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','wb') as f: w = unicodecsv.writer(f,encoding='utf-8-sig') w.writerows(data) 转载自简书

September 25, 2019 · 1 min · Egbert Ke

Python 获取线程中的异常信息

通常情况下我们无法将多线程中的异常带回主线程,所以也就无法打印线程中的异常,而通过traceback模块,我们可以对线程做如下修改,从而实现捕获线程异常的目的1。 import threading import traceback def my_func(): raise BaseException("thread exception") class ExceptionThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): """ Redirect exceptions of thread to an exception handler. """ threading.Thread.__init__(self, group, target, name, args, kwargs, verbose) if kwargs is None: kwargs = {} self._target = target self._args = args self._kwargs = kwargs self._exc = None def run(self): try: if self._target: self._target() except BaseException as e: import sys self._exc = sys....

September 12, 2019 · 1 min · Egbert Ke

Python 限制函数运行时间

实际项目中会涉及到需要对有些函数的响应时间做一些限制,如果超时就退出函数的执行,停止等待。 使用signal 使用signal有所限制,需要在linux系统上,并且需要在主线程中使用。方法二使用线程计时,不受此限制。 # coding=utf-8 import signal import time def set_timeout(num, callback): def wrap(func): def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame. raise RuntimeError def to_do(*args, **kwargs): try: signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数 signal.alarm(num) # 设置 num 秒的闹钟 print('start alarm signal.') r = func(*args, **kwargs) print('close alarm signal.') signal.alarm(0) # 关闭闹钟 return r except RuntimeError as e: callback() return to_do return wrap def after_timeout(): # 超时后的处理函数 print("Time out!") @set_timeout(2, after_timeout) # 限时 2 秒超时 def connect(): # 要执行的函数 time....

September 11, 2019 · 2 min · Egbert Ke

Python中的operator模块

operator 模块提供了一套与Python的内置运算符对应的高效率函数。例如,operator.add(x, y) 与表达式 x+y 相同。 许多函数名与特殊方法名相同,只是没有双下划线。为了向后兼容性,也保留了许多包含双下划线的函数。为了表述清楚,建议使用没有双下划线的函数。 operator.attrgetter def attrgetter(*items): if any(not isinstance(item, str) for item in items): raise TypeError('attribute name must be a string') if len(items) == 1: attr = items[0] def g(obj): return resolve_attr(obj, attr) else: def g(obj): return tuple(resolve_attr(obj, attr) for attr in items) return g def resolve_attr(obj, attr): for name in attr.split("."): obj = getattr(obj, name) return obj operator.itemgetter def itemgetter(*items): if len(items) == 1: item = items[0] def g(obj): return obj[item] else: def g(obj): return tuple(obj[item] for item in items) return g examples...

September 9, 2019 · 1 min · Egbert Ke

Python中的async/await

async/await是实现异步IO的语法糖,是Python3.7新出的关键字。async def可创建协程,而await可用来等待一个可等待对象的执行完成。这大大简化了协程的创建(在Python2中创建协程需要yield和send协同操作) 下面这个例子很简洁的说明了什么是异步IO1: import asyncio async def count(): print("One") await asyncio.sleep(1) print("Two") async def main(): await asyncio.gather(count(), count(), count()) if __name__ == "__main__": import time s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"{__file__} executed in {elapsed:0.2f} seconds.") 运行结果: $ python3 countasync.py One One One Two Two Two countasync.py executed in 1.01 seconds. gather会等待所有协程都返回后再返回一个结果列表,as_completed会当协程返回后立即返回: >>> import asyncio >>> async def coro(seq) -> list: ... """'IO' wait time is proportional to the max element....

July 9, 2019 · 1 min · Egbert Ke

Python 中的协程

Python 中的协程 函数也叫子程序,其调用过程一般为:Main中调用A,等待A结束后调用B,等待B结束后调用C… 函数的调用一般是单入口。 相比于函数,协程可以有多个入口来暂停(切换到其他协程执行)和恢复协程的执行。另外,协程的调用不像函数调用需要主函数按照特定顺序依次调用子程序,协程之间是协作关系,可以来回切换。 相比于线程,他们都是通过切换达到协作的目的。线程是由操作系统调度来实现切换,而协程是语言级别的切换,开销更小。 Python中,可以用生成器中的yield实现协程(支持不完全) 协程实现生产者/消费者模型1 import time def consumer(): r = '' while True: n = yield r if not n: return print('consuming {}'.format(n)) time.sleep(1) r = '200 OK' def produce(c): next(c) # 启动协程,Python2写法: c.next() n = 0 while n < 5: n = n + 1 print('producing: {}'.format(n)) r = c.send(n) print('consumer return: {}'.format(r)) c.close() # 关闭协程 c = consumer() produce(c) 运行结果: producing: 1 consuming 1 consumer return: 200 OK producing: 2 consuming 2 consumer return: 200 OK producing: 3 consuming 3 consumer return: 200 OK producing: 4 consuming 4 consumer return: 200 OK producing: 5 consuming 5 consumer return: 200 OK 用连接协程的方式创建管道2 def producer(sentence, next_coroutine): tokens = sentence....

July 5, 2019 · 1 min · Egbert Ke