Python 性能分析 [line_profiler]


line_profiler

line_profiler模块是性能分析工具。为了找到代码中运行速度较慢处或瓶颈,可以通过这个模块实现,而不再使用time计时。line_profiler模块可以记录每行代码的运行时间和耗时百分比。

源码分析

class LineProfiler(CLineProfiler):
    """ A profiler that records the execution times of individual lines.
    """

    def __call__(self, func):
        """ Decorate a function to start the profiler on function entry and stop
        it on function exit.
        """
        self.add_function(func)
        if is_classmethod(func):
            wrapper = self.wrap_classmethod(func)
        elif is_coroutine(func):
            wrapper = self.wrap_coroutine(func)
        elif is_generator(func):
            wrapper = self.wrap_generator(func)
        else:
            wrapper = self.wrap_function(func)
        return wrapper
  • self.add_function(func):

      def add_function(self, *args, **kwargs): # real signature unknown
          """ Record line profiling information for the given Python function. """
          pass
    • 显示函数每行所用时间和调用函数每行所用时间
  • is_classmethod(func):

      def is_classmethod(f):
          return isinstance(f, classmethod)
    • isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()。

      • type() 不会认为子类是一种父类类型,不考虑继承关系。
      • isinstance() 会认为子类是一种父类类型,考虑继承关系。
  • is_generator(func):

    CO_GENERATOR = 0x0020
      def is_generator(f):
          """ Return True if a function is a generator.
          """
          isgen = (f.__code__.co_flags & CO_GENERATOR) != 0
          return isgen
    • f.__code__.co_flags:每一个 Python的对象都有一个__code__.co_flags属性,这是一串预转换为int类型的bin二进制比特串,用于记录当前方程的种种标识(flags),生成器的flags特征值为32,即0x0020。当编译器在解释一个方程并产生字节码时,遇到yield关键字后,解释器就会将0x0020加至方程的__code__.co_flags标识中,然后通过比特运算获取flags中包含的种种信息。
    • is_generator通过co_flags & 0x0020判断是否为generator对象。
  • is_coroutine(func):

      def is_coroutine(f):
          return inspect.iscoroutinefunction(f)
    
      def iscoroutinefunction(obj):
          """Return true if the object is a coroutine function.
    
          Coroutine functions are defined with "async def" syntax.
          """
          return _has_code_flag(obj, CO_COROUTINE)
    
      def _has_code_flag(f, flag):
          """Return true if ``f`` is a function (or a method or functools.partial
          wrapper wrapping a function) whose code object has the given ``flag``
          set in its flags."""
          while ismethod(f):  # Return true if the object is an instance method.
              f = f.__func__
          f = functools._unwrap_partial(f)
          if not isfunction(f):
              return False
          return bool(f.__code__.co_flags & flag)
    • 如果对象是协程函数(一个用async def语法定义的函数)则返回True。
    • f.__code__.co_flags & flag:原理同is_generator。

self.wrap_classmethod(func)

    def wrap_classmethod(self, func):
        """
        Wrap a classmethod to profile it.
        """
        @functools.wraps(func)
        def wrapper(*args, **kwds):
            self.enable_by_count()
            try:
                result = func.__func__(func.__class__, *args, **kwds)
            finally:
                self.disable_by_count()
            return result
        return wrapper
  • self.enable_by_count()

    def enable_by_count(self, *args, **kwargs): # real signature unknown
        """ Enable the profiler if it hasn't been enabled before. """
        pass
    
  • self.disable_by_count()

    def disable_by_count(self, *args, **kwargs): # real signature unknown
        """
        Disable the profiler if the number of disable requests matches the
                number of enable requests.
        """
        pass
  • result = func.__func__(func.__class__, *args, **kwds):class 中的某个 method 其实只有一个实体,也就是无论我们产生了多少个instances,它们都是共用同一个function func,但是每个instance 都会有一个将func绑定到自己的bound method func,我们要观察到真正的(unbound) function,就使用func.__func__,例如:

    class a:
        def b(self):
            pass
    
    c = a()
    c.b.__func__
    # <function a.b at 0x00000131E7440670>
    c.b
    # <bound method a.b of <__main__.a object at 0x00000131E743A3D0>>

self.wrap_coroutine(func)

    def wrap_coroutine(self, func):
        """
        Wrap a Python 3.5 coroutine to profile it.
        """

        @functools.wraps(func)
        async def wrapper(*args, **kwds):
            self.enable_by_count()
            try:
                result = await func(*args, **kwds)
            finally:
                self.disable_by_count()
            return result

        return wrapper
  • result = await func(*args, **kwds):因为是协程函数,所以应当添加await关键字。

self.wrap_generator(func)

    def wrap_generator(self, func):
        """ Wrap a generator to profile it.
        """
        @functools.wraps(func)
        def wrapper(*args, **kwds):
            g = func(*args, **kwds)
            # The first iterate will not be a .send()
            self.enable_by_count()
            try:
                item = next(g)
            except StopIteration:
                return
            finally:
                self.disable_by_count()
            input_ = (yield item)
            # But any following one might be.
            while True:
                self.enable_by_count()
                try:
                    item = g.send(input_)
                except StopIteration:
                    return
                finally:
                    self.disable_by_count()
                input_ = (yield item)
        return wrapper

self.wrap_function(func)

    def wrap_function(self, func):
        """ Wrap a function to profile it.
        """
        @functools.wraps(func)
        def wrapper(*args, **kwds):
            self.enable_by_count()
            try:
                result = func(*args, **kwds)
            finally:
                self.disable_by_count()
            return result
        return wrapper

使用实例

__call__

import random
from line_profiler import LineProfiler


def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
    new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
    for i in range(len(matrix_1)):
        for j in range(len(matrix_2[0])):
            for k in range(len(matrix_2)):
                new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
    return new_matrix


def generate_matrix(x, y):
    return [[random.randint(1, 10) for i in range(y)] for j in range(x)]


def main_(x, y, z):
    m1 = generate_matrix(x, y)
    m2 = generate_matrix(y, z)
    calc(m1, m2)


if __name__ == '__main__':
    lp = LineProfiler()
    lp_wrapper = lp(main_)
    lp_wrapper(200, 100, 500)
    lp.print_stats()
C:\Users\ronie\Desktop\program\myenv\Scripts\python.exe C:/Users/ronie/Desktop/program/python/highPerformancePython/test.py
Timer unit: 1e-07 s

Total time: 7.05498 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: main_ at line 18

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    18                                           def main_(x, y, z):
    19         1     993997.0 993997.0      1.4      m1 = generate_matrix(x, y)
    20         1    2591529.0 2591529.0      3.7      m2 = generate_matrix(y, z)
    21         1   66964227.0 66964227.0     94.9      calc(m1, m2)


Process finished with exit code 0

  • Hits:调用次数;
  • Time:耗时;
  • Per Hit:每次调用平均耗时;
  • %Time:耗时百分比

add_function

import random
from line_profiler import LineProfiler


def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
    new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
    for i in range(len(matrix_1)):
        for j in range(len(matrix_2[0])):
            for k in range(len(matrix_2)):
                new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
    return new_matrix


def generate_matrix(x, y):
    return [[random.randint(1, 10) for i in range(y)] for j in range(x)]


def main_(x, y, z):
    m1 = generate_matrix(x, y)
    m2 = generate_matrix(y, z)
    calc(m1, m2)


if __name__ == '__main__':
    lp = LineProfiler()
    lp_wrapper = lp(main_)
    lp.add_function(calc)
    lp_wrapper(200, 100, 500)
    lp.print_stats()
C:\Users\ronie\Desktop\program\myenv\Scripts\python.exe C:/Users/ronie/Desktop/program/python/highPerformancePython/test.py
Timer unit: 1e-07 s

Total time: 8.02796 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: calc at line 5

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     5                                           def calc(matrix_1: list[list[int]], matrix_2: list[list[int]]):
     6         1     170133.0 170133.0      0.2      new_matrix = [[0 for i in range(len(matrix_2[0]))] for j in range(len(matrix_1))]
     7       200        581.0      2.9      0.0      for i in range(len(matrix_1)):
     8    100000     276987.0      2.8      0.3          for j in range(len(matrix_2[0])):
     9  10000000   26503798.0      2.7     33.0              for k in range(len(matrix_2)):
    10  10000000   53328117.0      5.3     66.4                  new_matrix[i][j] += matrix_1[i][k] * matrix_2[k][j]
    11         1          7.0      7.0      0.0      return new_matrix

Total time: 12.9151 s
File: C:\Users\ronie\Desktop\program\python\highPerformancePython\test.py
Function: main_ at line 18

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    18                                           def main_(x, y, z):
    19         1    1000846.0 1000846.0      0.8      m1 = generate_matrix(x, y)
    20         1    2386930.0 2386930.0      1.8      m2 = generate_matrix(y, z)
    21         1  125763279.0 125763279.0     97.4      calc(m1, m2)


Process finished with exit code 0

dump_stats

    def dump_stats(self, filename):
        """ Dump a representation of the data to a file as a pickled LineStats
        object from `get_stats()`.
        """
        lstats = self.get_stats()
        with open(filename, 'wb') as f:
            pickle.dump(lstats, f, pickle.HIGHEST_PROTOCOL)
def get_stats(self, *args, **kwargs): # real signature unknown
    """ Return a LineStats object containing the timings. """
    pass

声明:Hello World|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - Python 性能分析 [line_profiler]


我的朋友,理论是灰色的,而生活之树是常青的!