博客再开张

最开始写博客应该是大一下学期左右就开始了,当时是在CSDN上写,主要记录一些平时学习和开发时遇到的一些问题和解决方式的经验文章,也有一部分是算法题的题解。这是我CSDN主页地址,截止目前已经有10w+的访问量了。 后来觉得有自己的独立博客更酷一些,也整过hexo这类的博客引擎,觉得限制较多,部署繁琐,觉得从头到尾自己设计开发一个博客更拽,当时就真的开始从头写博客网站,数据库设计、后端接口、前端样式、简单的评论功能、后台管理,后台新建文章的页面样式都自己在大学宿舍里一点点地写,印象比较深刻的有两个地方,一个是实现了文章发布之后保存为静态文件,之后再访问时直接托管到nginx,请求不用到达后端。另一个是markdown解析器和代码高亮的调研,当时调研了好多的markdown解析,最终终于找到了自己满意的markdown-it,还实现了例如自定义图片大小和图片浮动方向的语法和对应的样式。之前那个博客的域名已经迁移到这里了,目前可访问的地址是https://123.206.178.92,之后会将那个博客的文章陆续迁移到这里来。 时间久了之后,发现维护起来比较麻烦,而且技术含量不高,就想选一个比较方便管理、扩展的博客系统,偶然的机会发现了typecho,发现挺不错,整个框架很轻,主题和插件修改起来也方便,而且是开源的,可以自己随便写一些扩展或者做一些修改。然后又找到了handsome 这款typecho主题,这是一款功能丰富、做的很用心的主题,虽然是收费的,但是没关系。后面会基于这个主题做一些样式上的设置和定制。 写博客是个好习惯,博客的本质是记录,记录自己的成长,记录自己的心得体会、对生活的感悟。写博客时就像跟自己对话,这种感觉有些微妙。 我的博客又开张了,希望自己坚持写下去。

March 22, 2021 · 1 min · Egbert Ke

Unix中的信号是怎么工作的

信号的作用 信号是一种异步通知机制,用来告知进程一个事件已经发生 信号的产生 信号可以由用户调用kill命令发送,也可以由操作系统在某些事件发生时产生,如计时器到期、子进程结束、访问了不改访问的内存等。 信号的工作过程1 当一个信号发送给某个进程时,内核会查看该进程的PCB以决定改信号的处理方式,如果该信号的处理动作是SIG_IGN,则会忽略改信号,如果处理动作是SIG_DFL,则内核会找到该信号的默认处理程序地址并执行。 如果该进程定义了该信号的处理程序,则会执行该程序。 如果进程注册了信号处理程序,则会在进程的待处理信号表中添加一项。当进程下次被调度执行时,内核会首先往该进程的堆栈空间添加一些数据,然后改变执行指令的地址(相当于改变8086架构CPU的CS和IP寄存器的值),就像进程自己调用了信号处理程序一样。 当信号处理程序结束时,会继续执行之前的代码。 内核通常需要知道信号处理程序何时返回,比如当信号处理函数执行时,需要阻止相当的信号被再次传递,或者当信号处理函数执行完后,需要重新调用被信号中断的系统调用。要做到这一点,还需要改变堆栈和指令指针。 内核怎么知道进程的信号处理程序结束了2 内核往进程中映射了一页内存,改内存中有一个用来通知内核处理程序已完成的系统调用,然后将信号处理程序的返回地址改为该内存页的地址(就是改变栈顶的值)。 how-do-unix-signals-work ↩︎ how-signals-work-internally ↩︎

February 6, 2021 · 1 min · Egbert Ke

aiohttp ClientSession 用法踩坑

一般是这么用的: async with aiohttp.ClientSession() as ses: res = await ses.post(xxx) text = await res.text() xxx 没有问题 但是为了减少缩进,可能想这样封装一下: class HTTP: @staticmethod async def post(*args, **kwargs): async with aiohttp.ClientSession() as ses: return await ses.post(*args, **kwargs) 这是有问题的,with 上下文之后会关闭session的连接和资源,如果payload比较大,在连接关闭之后还没读完的话,可能会卡在await ses.text()那里,导致超时 所以需要在上下文关闭之前就把内容读取完毕并返回。 可以这样: class HTTP: @staticmethod async def post(*args, **kwargs): async with aiohttp.ClientSession() as ses: async with await ses.post(*args, **kwargs) as res: return res, await res.text() 或者这样: class HTTP: @classmethod async def get(cls, *args, **kargs): await cls....

September 1, 2020 · 1 min · Egbert Ke

问题记录:Python 协程相关

import asyncio import multiprocessing q = multiprocessing.Queue(10000) for i in range(100): q.put(i) async def coro(i): print('coro... {}'.format(i)) async def device_video_main(j): loop = asyncio.get_event_loop() for i in range(5): asyncio.ensure_future(coro(j), loop=loop) # await asyncio.sleep(1) async def run_integrate(): while True: j = q.get() print('j: ', j) if True: loop = asyncio.get_event_loop() coro = device_video_main(j) loop.create_task(coro) # asyncio.ensure_future(coro, loop=loop) # await asyncio.sleep(1) # print(loop.is_running()) # async def main(): # loop = asyncio.get_event_loop() # for i in range(3): # asyncio....

May 14, 2020 · 1 min · Egbert Ke

asyncio.sleep 和 time.sleep 的区别

time.sleep是针对整个线程,整个线程会挂起,不再执行任何操作。 asyncio.sleep是针对当前协程而言,告诉事件循环:请去执行别的操作,相当于模拟了一次网络IO,不会阻塞其他协程的执行。 import time import asyncio async def hello(): print('Hello ...') await asyncio.sleep(5) # time.sleep(5) print('... World!') async def main(): await asyncio.gather(hello(), hello()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 运行结果: Hello ... Hello ... ... World! ... World! import time import asyncio async def hello(): print('Hello ...') # await asyncio.sleep(5) time.sleep(5) print('... World!') async def main(): await asyncio.gather(hello(), hello()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) 运行结果: Hello ... ... World! Hello ... ... World!

May 10, 2020 · 1 min · Egbert Ke

SQLAlchemy中常见的query filter

equals query.filter(User.name == 'leela') not equals: query.filter(User.name != 'leela') LIKE query.filter(User.name.like('%leela%')) IN query.filter(User.name.in_(['leela', 'akshay', 'santanu'])) # works with query objects too: query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%santanu%')))) NOT IN query.filter(~User.name.in_(['lee', 'sonal', 'akshay'])) IS NULL filter(User.name == None) IS NOT NULL filter(User.name != None) AND from sqlalchemy import and_ filter(and_(User.name == 'leela', User.fullname == 'leela dharan')) #or, default without and_ method comma separated list of conditions are AND filter(User.name == 'leela', User.fullname == 'leela dharan') # or call filter()/filter_by() multiple times filter(User....

December 8, 2019 · 1 min · Egbert Ke

Python 在CSV文件中写入中文字符

对于UTF-8编码,Excel要求BOM(字节顺序标记)写在文件的开始,否则它会假设这是ANSI编码,这个就是与locale有依赖性了。 Python2 #!python2 #coding:utf8 import csv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','wb') as f: f.write(u'\ufeff'.encode('utf8')) w = csv.writer(f) for row in data: w.writerow([item.encode('utf8') for item in row]) Python3 #!python3 #coding:utf8 import csv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','w',newline='',encoding='utf-8-sig') as f: w = csv.writer(f) w.writerows(data) unicodecsv #!python2 #coding:utf8 import unicodecsv data = [[u'American',u'美国人'], [u'Chinese',u'中国人']] with open('results.csv','wb') as f: w = unicodecsv.writer(f,encoding='utf-8-sig') w.writerows(data) 转载自简书

September 25, 2019 · 1 min · Egbert Ke

Python 获取线程中的异常信息

通常情况下我们无法将多线程中的异常带回主线程,所以也就无法打印线程中的异常,而通过traceback模块,我们可以对线程做如下修改,从而实现捕获线程异常的目的1。 import threading import traceback def my_func(): raise BaseException("thread exception") class ExceptionThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): """ Redirect exceptions of thread to an exception handler. """ threading.Thread.__init__(self, group, target, name, args, kwargs, verbose) if kwargs is None: kwargs = {} self._target = target self._args = args self._kwargs = kwargs self._exc = None def run(self): try: if self._target: self._target() except BaseException as e: import sys self._exc = sys....

September 12, 2019 · 1 min · Egbert Ke

Python 限制函数运行时间

实际项目中会涉及到需要对有些函数的响应时间做一些限制,如果超时就退出函数的执行,停止等待。 使用signal 使用signal有所限制,需要在linux系统上,并且需要在主线程中使用。方法二使用线程计时,不受此限制。 # coding=utf-8 import signal import time def set_timeout(num, callback): def wrap(func): def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame. raise RuntimeError def to_do(*args, **kwargs): try: signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数 signal.alarm(num) # 设置 num 秒的闹钟 print('start alarm signal.') r = func(*args, **kwargs) print('close alarm signal.') signal.alarm(0) # 关闭闹钟 return r except RuntimeError as e: callback() return to_do return wrap def after_timeout(): # 超时后的处理函数 print("Time out!") @set_timeout(2, after_timeout) # 限时 2 秒超时 def connect(): # 要执行的函数 time....

September 11, 2019 · 2 min · Egbert Ke

Python中的operator模块

operator 模块提供了一套与Python的内置运算符对应的高效率函数。例如,operator.add(x, y) 与表达式 x+y 相同。 许多函数名与特殊方法名相同,只是没有双下划线。为了向后兼容性,也保留了许多包含双下划线的函数。为了表述清楚,建议使用没有双下划线的函数。 operator.attrgetter def attrgetter(*items): if any(not isinstance(item, str) for item in items): raise TypeError('attribute name must be a string') if len(items) == 1: attr = items[0] def g(obj): return resolve_attr(obj, attr) else: def g(obj): return tuple(resolve_attr(obj, attr) for attr in items) return g def resolve_attr(obj, attr): for name in attr.split("."): obj = getattr(obj, name) return obj operator.itemgetter def itemgetter(*items): if len(items) == 1: item = items[0] def g(obj): return obj[item] else: def g(obj): return tuple(obj[item] for item in items) return g examples...

September 9, 2019 · 1 min · Egbert Ke