python

Подборка @pythonetc, июль 2019

  • четверг, 8 августа 2019 г. в 00:18:11
https://habr.com/ru/company/mailru/blog/462311/
  • Блог компании Mail.ru Group
  • Python
  • Программирование



Это двенадцатая подборка советов про Python и программирование из моего авторского канала @pythonetc.

Предыдущие подборки


Нельзя изменять переменные замыканий с помощью простого присваивания. Python расценивает присваивание как определение внутри тела функции и вообще не делает замыкание. 

Работает отлично, выводит на экран 2:

def make_closure(x):
    def closure():
        print(x)

    return closure

make_closure(2)()

А этот код бросает UnboundLocalError: local variable 'x' referenced before assignment:

def make_closure(x):
    def closure():
        print(x)
        x *= 2
        print(x)

    return closure

make_closure(2)()


Чтобы код работал, используйте nonlocal. Это явным образом говорит интерпретатору не рассматривать присвоение как определение:

def make_closure(x):
    def closure():
        nonlocal x
        print(x)
        x *= 2
        print(x)

    return closure

make_closure(2)()


Иногда в ходе итерации вам нужно узнать, какой это элемент обрабатывается, первый или последний. Это можно легко выяснить с помощью явного флага:

def sparse_list(iterable, num_of_zeros=1):
    result = []
    zeros = [0 for _ in range(num_of_zeros)]

    first = True
    for x in iterable:
        if not first:
            result += zeros
        result.append(x)

        first = False

    return result

assert sparse_list([1, 2, 3], 2) == [
    1,
    0, 0,
    2,
    0, 0,
    3,
]

Конечно, вы могли бы обрабатывать первый элемент за пределами цикла. Это выглядит чище, но приводит к частичному дублированию кода. Кроме того, сделать это будет не так просто при работе с абстрактным iterable:

def sparse_list(iterable, num_of_zeros=1):
    result = []
    zeros = [0 for _ in range(num_of_zeros)]

    iterator = iter(iterable)
    try:
        result.append(next(iterator))
    except StopIteration:
        return []

    for x in iterator:
       result += zeros
       result.append(x)

    return result

Ещё вы можете использовать enumerate и выполнять проверку i == 0 (работает только для определения первого элемента, а не последнего), однако наилучшим решением будет генератор, возвращающий вместе с элементом iterable флаги first и last:

def first_last_iter(iterable):
    iterator = iter(iterable)

    first = True
    last = False
    while not last:
    if first:
        try:
            current = next(iterator)
            except StopIteration:
                return
    else:
        current = next_one

    try:
        next_one = next(iterator)
    except StopIteration:
        last = True

    yield (first, last, current)

    first = False

Теперь исходная функция может выглядеть так:

def sparse_list(iterable, num_of_zeros=1):
    result = []
    zeros = [0 for _ in range(num_of_zeros)]

    for first, last, x in first_last_iter(iterable):
        if not first:
            result += zeros
        result.append(x)

    return result


Если вам нужно измерить время, прошедшее между двумя событиями, то используйте time.monotonic() вместо time.time(). time.monotonic() никогда не изменяется в меньшую сторону, даже при обновлении системных часов:

from contextlib import contextmanager
import time


@contextmanager
def timeit():
    start = time.monotonic()
    yield
    print(time.monotonic() - start)

def main():
    with timeit():
           time.sleep(2)

main()


Вложенные менеджеры контекста обычно не знают, что они вложены. Вы можете сообщить им об этом, создавая внутренние менеджеры с помощью внешнего:

from contextlib import AbstractContextManager
import time


class TimeItContextManager(AbstractContextManager):
    def __init__(self, name, parent=None):
        super().__init__()

        self._name = name
        self._parent = parent
        self._start = None
        self._substracted = 0

    def __enter__(self):
        self._start = time.monotonic()
        return self
        
    def __exit__(self, exc_type, exc_value, traceback):
        delta = time.monotonic() - self._start
        if self._parent is not None:
            self._parent.substract(delta)

    print(self._name, 'total', delta)
    print(self._name, 'outer', delta - self._substracted)

    return False

    def child(self, name):
        return type(self)(name, parent=self)

    def substract(self, n):
        self._substracted += n


timeit = TimeItContextManager


def main():
    with timeit('large') as large_t:
        with large_t.child('medium') as medium_t:
            with medium_t.child('small-1'):
                time.sleep(1)
            with medium_t.child('small-2'):
                time.sleep(1)
        time.sleep(1)
    time.sleep(1)


main()


Когда вам нужно передать информацию по цепочке вызовов, то первое, что приходит в голову, это передавать данные в виде аргументов функций.

В некоторых случаях может быть гораздо удобнее модифицировать все функции в цепочке для передачи новой порции данных. Вместо этого вы можете задать некий контекст, который будет использоваться всеми функциями в цепочке. Как это сделать?

Самое простое решение — использовать глобальную переменную. В Python вы также можете использовать модули и классы в качестве хранителей контекста, потому что они, строго говоря, тоже являются глобальными переменными. Вероятно, вы и так уже это регулярно делаете, например, для журналирования.

Если ваше приложение многопоточное, то обычные глобальные переменные вам не подойдут, поскольку они не потокобезопасны. В каждый момент времени у вас может выполняться несколько цепочек вызовов, и каждой из них нужен собственный контекст. Вам поможет модуль threading, он предоставляет объект threading.local(), который потокобезопасен. Хранить в нём данные можно с помощью простого обращения к атрибутам: threading.local().symbol = '@'.

Тем не менее, оба описанных подхода не concurrency-safe, то есть они не подходят для цепочки вызовов корутин, в которой система не только вызывает функции, но и ожидает их исполнения. Когда корутина выполняет await, поток событий может запустить другую корутину из другой цепочки. Это не будет работать:

import asyncio
import sys

global_symbol = '.'

async def indication(timeout):
    while True:
        print(global_symbol, end='')
        sys.stdout.flush()
        await asyncio.sleep(timeout)

async def sleep(t, indication_t, symbol='.'):
    loop = asyncio.get_event_loop()

    global global_symbol
    global_symbol = symbol
    task = loop.create_task(
            indication(indication_t)
    )
    await asyncio.sleep(t)
    task.cancel()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    sleep(1, 0.1, '0'),
    sleep(1, 0.1, 'a'),
    sleep(1, 0.1, 'b'),
    sleep(1, 0.1, 'c'),
))

Исправить это можно, заставив цикл задавать и восстанавливать контекст при каждом переключении между корутинами. Реализовать такое поведение можно с помощью модуля contextvars, который доступен начиная с Python 3.7.

import asyncio
import sys
import contextvars

global_symbol = contextvars.ContextVar('symbol')

async def indication(timeout):
    while True:
        print(global_symbol.get(), end='')
        sys.stdout.flush()
        await asyncio.sleep(timeout)

async def sleep(t, indication_t, symbol='.'):
    loop = asyncio.get_event_loop()

    global_symbol.set(symbol)
    task = loop.create_task(indication(indication_t))
    await asyncio.sleep(t)
    task.cancel()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    sleep(1, 0.1, '0'),
    sleep(1, 0.1, 'a'),
    sleep(1, 0.1, 'b'),
    sleep(1, 0.1, 'c'),
))