line_profiler
line_profiler模块是性能分析工具。为了找到代码中运行速度较慢处或瓶颈,可以通过这个模块实现,而不再使用time计时。line_profiler模块可以记录每行代码的运行时间和耗时百分比。
源码分析
class LineProfiler(CLineProfiler):
""" A profiler that records the execution times of individual lines.
"""
def __call__(self, func):
""" Decorate a function to start the profiler on function entry and stop
it on function exit.
"""
self.add_function(func)
if is_classmethod(func):
wrapper = self.wrap_classmethod(func)
elif is_coroutine(func):
wrapper = self.wrap_coroutine(func)
elif is_generator(func):
wrapper = self.wrap_generator(func)
else:
wrapper = self.wrap_function(func)
return wrapper
self.add_function(func):
def add_function(self, *args, **kwargs): # real signature unknown """ Record line profiling information for the given Python function. """ pass
- 显示函数每行所用时间和调用函数每行所用时间
is_classmethod(func):
def is_classmethod(f): return isinstance(f, classmethod)
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()。
- type() 不会认为子类是一种父类类型,不考虑继承关系。
- isinstance() 会认为子类是一种父类类型,考虑继承关系。
is_generator(func):
CO_GENERATOR = 0x0020 def is_generator(f): """ Return True if a function is a generator. """ isgen = (f.__code__.co_flags & CO_GENERATOR) != 0 return isgen
- f.__code__.co_flags:每一个 Python的对象都有一个__code__.co_flags属性,这是一串预转换为int类型的bin二进制比特串,用于记录当前方程的种种标识(flags),生成器的flags特征值为32,即0x0020。当编译器在解释一个方程并产生字节码时,遇到yield关键字后,解释器就会将0x0020加至方程的__code__.co_flags标识中,然后通过比特运算获取flags中包含的种种信息。
- is_generator通过co_flags & 0x0020判断是否为generator对象。
is_coroutine(func):
def is_coroutine(f): return inspect.iscoroutinefunction(f) def iscoroutinefunction(obj): """Return true if the object is a coroutine function. Coroutine functions are defined with "async def" syntax. """ return _has_code_flag(obj, CO_COROUTINE) def _has_code_flag(f, flag): """Return true if ``f`` is a function (or a method or functools.partial wrapper wrapping a function) whose code object has the given ``flag`` set in its flags.""" while ismethod(f): # Return true if the object is an instance method. f = f.__func__ f = functools._unwrap_partial(f) if not isfunction(f): return False return bool(f.__code__.co_flags & flag)
- 如果对象是协程函数(一个用async def语法定义的函数)则返回True。
- f.__code__.co_flags & flag:原理同is_generator。
self.wrap_classmethod(func)
def wrap_classmethod(self, func):
"""
Wrap a classmethod to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func.__func__(func.__class__, *args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
self.enable_by_count()
def enable_by_count(self, *args, **kwargs): # real signature unknown """ Enable the profiler if it hasn't been enabled before. """ pass
self.disable_by_count()
def disable_by_count(self, *args, **kwargs): # real signature unknown """ Disable the profiler if the number of disable requests matches the number of enable requests. """ pass
result = func.__func__(func.__class__, *args, **kwds)
:class 中的某个 method 其实只有一个实体,也就是无论我们产生了多少个instances,它们都是共用同一个function func,但是每个instance 都会有一个将func绑定到自己的bound method func,我们要观察到真正的(unbound) function,就使用func.__func__,例如:class a: def b(self): pass c = a() c.b.__func__ # <function a.b at 0x00000131E7440670> c.b # <bound method a.b of <__main__.a object at 0x00000131E743A3D0>>
self.wrap_coroutine(func)
def wrap_coroutine(self, func):
"""
Wrap a Python 3.5 coroutine to profile it.
"""
@functools.wraps(func)
async def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = await func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
result = await func(*args, **kwds)
:因为是协程函数,所以应当添加await
关键字。
self.wrap_generator(func)
def wrap_generator(self, func):
""" Wrap a generator to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# The first iterate will not be a .send()
self.enable_by_count()
try:
item = next(g)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
# But any following one might be.
while True:
self.enable_by_count()
try:
item = g.send(input_)
except StopIteration:
return
finally:
self.disable_by_count()
input_ = (yield item)
return wrapper
self.wrap_function(func)
def wrap_function(self, func):
""" Wrap a function to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
使用实例
__call__
:
import random
from line_profiler import LineProfiler
def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
for i in range(len(matrix_1)):
for j in range(len(matrix_2[0])):
for k in range(len(matrix_2)):
new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
return new_matrix
def generate_matrix(x, y):
return [[random.randint(1, 10) for i in range(y)] for j in range(x)]
def main_(x, y, z):
m1 = generate_matrix(x, y)
m2 = generate_matrix(y, z)
calc(m1, m2)
if __name__ == '__main__':
lp = LineProfiler()
lp_wrapper = lp(main_)
lp_wrapper(200, 100, 500)
lp.print_stats()
C:\Users\ronie\Desktop\program\myenv\Scripts\python.exe C:/Users/ronie/Desktop/program/python/highPerformancePython/test.py
Timer unit: 1e-07 s
Total time: 7.05498 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: main_ at line 18
Line # Hits Time Per Hit % Time Line Contents
==============================================================
18 def main_(x, y, z):
19 1 993997.0 993997.0 1.4 m1 = generate_matrix(x, y)
20 1 2591529.0 2591529.0 3.7 m2 = generate_matrix(y, z)
21 1 66964227.0 66964227.0 94.9 calc(m1, m2)
Process finished with exit code 0
- Hits:调用次数;
- Time:耗时;
- Per Hit:每次调用平均耗时;
- %Time:耗时百分比
add_function
import random
from line_profiler import LineProfiler
def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
for i in range(len(matrix_1)):
for j in range(len(matrix_2[0])):
for k in range(len(matrix_2)):
new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
return new_matrix
def generate_matrix(x, y):
return [[random.randint(1, 10) for i in range(y)] for j in range(x)]
def main_(x, y, z):
m1 = generate_matrix(x, y)
m2 = generate_matrix(y, z)
calc(m1, m2)
if __name__ == '__main__':
lp = LineProfiler()
lp_wrapper = lp(main_)
lp.add_function(calc)
lp_wrapper(200, 100, 500)
lp.print_stats()
C:\Users\ronie\Desktop\program\myenv\Scripts\python.exe C:/Users/ronie/Desktop/program/python/highPerformancePython/test.py
Timer unit: 1e-07 s
Total time: 8.02796 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: calc at line 5
Line # Hits Time Per Hit % Time Line Contents
==============================================================
5 def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
6 1 170133.0 170133.0 0.2 new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
7 200 581.0 2.9 0.0 for i in range(len(matrix_1)):
8 100000 276987.0 2.8 0.3 for j in range(len(matrix_2[0])):
9 10000000 26503798.0 2.7 33.0 for k in range(len(matrix_2)):
10 10000000 53328117.0 5.3 66.4 new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
11 1 7.0 7.0 0.0 return new_matrix
Total time: 12.9151 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: main_ at line 18
Line # Hits Time Per Hit % Time Line Contents
==============================================================
18 def main_(x, y, z):
19 1 1000846.0 1000846.0 0.8 m1 = generate_matrix(x, y)
20 1 2386930.0 2386930.0 1.8 m2 = generate_matrix(y, z)
21 1 125763279.0 125763279.0 97.4 calc(m1, m2)
Process finished with exit code 0
dump_stats
def dump_stats(self, filename):
""" Dump a representation of the data to a file as a pickled LineStats
object from `get_stats()`.
"""
lstats = self.get_stats()
with open(filename, 'wb') as f:
pickle.dump(lstats, f, pickle.HIGHEST_PROTOCOL)
def get_stats(self, *args, **kwargs): # real signature unknown
""" Return a LineStats object containing the timings. """
pass
Comments | NOTHING