https://habr.com/ru/company/mailru/blog/462311/- Блог компании Mail.ru Group
- Python
- Программирование
Это двенадцатая подборка советов про Python и программирование из моего авторского канала @pythonetc.
←
Предыдущие подборки
Нельзя изменять переменные замыканий с помощью простого присваивания. Python расценивает присваивание как определение внутри тела функции и вообще не делает замыкание.
Работает отлично, выводит на экран
2
:
def make_closure(x):
def closure():
print(x)
return closure
make_closure(2)()
А этот код бросает
UnboundLocalError: local variable 'x' referenced before assignment
:
def make_closure(x):
def closure():
print(x)
x *= 2
print(x)
return closure
make_closure(2)()
Чтобы код работал, используйте
nonlocal
. Это явным образом говорит интерпретатору не рассматривать присвоение как определение:
def make_closure(x):
def closure():
nonlocal x
print(x)
x *= 2
print(x)
return closure
make_closure(2)()
Иногда в ходе итерации вам нужно узнать, какой это элемент обрабатывается, первый или последний. Это можно легко выяснить с помощью явного флага:
def sparse_list(iterable, num_of_zeros=1):
result = []
zeros = [0 for _ in range(num_of_zeros)]
first = True
for x in iterable:
if not first:
result += zeros
result.append(x)
first = False
return result
assert sparse_list([1, 2, 3], 2) == [
1,
0, 0,
2,
0, 0,
3,
]
Конечно, вы могли бы обрабатывать первый элемент за пределами цикла. Это выглядит чище, но приводит к частичному дублированию кода. Кроме того, сделать это будет не так просто при работе с абстрактным
iterable
:
def sparse_list(iterable, num_of_zeros=1):
result = []
zeros = [0 for _ in range(num_of_zeros)]
iterator = iter(iterable)
try:
result.append(next(iterator))
except StopIteration:
return []
for x in iterator:
result += zeros
result.append(x)
return result
Ещё вы можете использовать
enumerate
и выполнять проверку
i == 0
(работает только для определения первого элемента, а не последнего), однако наилучшим решением будет генератор, возвращающий вместе с элементом
iterable
флаги
first
и
last
:
def first_last_iter(iterable):
iterator = iter(iterable)
first = True
last = False
while not last:
if first:
try:
current = next(iterator)
except StopIteration:
return
else:
current = next_one
try:
next_one = next(iterator)
except StopIteration:
last = True
yield (first, last, current)
first = False
Теперь исходная функция может выглядеть так:
def sparse_list(iterable, num_of_zeros=1):
result = []
zeros = [0 for _ in range(num_of_zeros)]
for first, last, x in first_last_iter(iterable):
if not first:
result += zeros
result.append(x)
return result
Если вам нужно измерить время, прошедшее между двумя событиями, то используйте
time.monotonic()
вместо
time.time()
.
time.monotonic()
никогда не изменяется в меньшую сторону, даже при обновлении системных часов:
from contextlib import contextmanager
import time
@contextmanager
def timeit():
start = time.monotonic()
yield
print(time.monotonic() - start)
def main():
with timeit():
time.sleep(2)
main()
Вложенные менеджеры контекста обычно не знают, что они вложены. Вы можете сообщить им об этом, создавая внутренние менеджеры с помощью внешнего:
from contextlib import AbstractContextManager
import time
class TimeItContextManager(AbstractContextManager):
def __init__(self, name, parent=None):
super().__init__()
self._name = name
self._parent = parent
self._start = None
self._substracted = 0
def __enter__(self):
self._start = time.monotonic()
return self
def __exit__(self, exc_type, exc_value, traceback):
delta = time.monotonic() - self._start
if self._parent is not None:
self._parent.substract(delta)
print(self._name, 'total', delta)
print(self._name, 'outer', delta - self._substracted)
return False
def child(self, name):
return type(self)(name, parent=self)
def substract(self, n):
self._substracted += n
timeit = TimeItContextManager
def main():
with timeit('large') as large_t:
with large_t.child('medium') as medium_t:
with medium_t.child('small-1'):
time.sleep(1)
with medium_t.child('small-2'):
time.sleep(1)
time.sleep(1)
time.sleep(1)
main()
Когда вам нужно передать информацию по цепочке вызовов, то первое, что приходит в голову, это передавать данные в виде аргументов функций.
В некоторых случаях может быть гораздо удобнее модифицировать все функции в цепочке для передачи новой порции данных. Вместо этого вы можете задать некий контекст, который будет использоваться всеми функциями в цепочке. Как это сделать?
Самое простое решение — использовать глобальную переменную. В Python вы также можете использовать модули и классы в качестве хранителей контекста, потому что они, строго говоря, тоже являются глобальными переменными. Вероятно, вы и так уже это регулярно делаете, например, для журналирования.
Если ваше приложение многопоточное, то обычные глобальные переменные вам не подойдут, поскольку они не потокобезопасны. В каждый момент времени у вас может выполняться несколько цепочек вызовов, и каждой из них нужен собственный контекст. Вам поможет модуль
threading
, он предоставляет объект
threading.local()
, который потокобезопасен. Хранить в нём данные можно с помощью простого обращения к атрибутам:
threading.local().symbol = '@'
.
Тем не менее, оба описанных подхода не concurrency-safe, то есть они не подходят для цепочки вызовов корутин, в которой система не только вызывает функции, но и ожидает их исполнения. Когда корутина выполняет
await
, поток событий может запустить другую корутину из другой цепочки. Это не будет работать:
import asyncio
import sys
global_symbol = '.'
async def indication(timeout):
while True:
print(global_symbol, end='')
sys.stdout.flush()
await asyncio.sleep(timeout)
async def sleep(t, indication_t, symbol='.'):
loop = asyncio.get_event_loop()
global global_symbol
global_symbol = symbol
task = loop.create_task(
indication(indication_t)
)
await asyncio.sleep(t)
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
sleep(1, 0.1, '0'),
sleep(1, 0.1, 'a'),
sleep(1, 0.1, 'b'),
sleep(1, 0.1, 'c'),
))
Исправить это можно, заставив цикл задавать и восстанавливать контекст при каждом переключении между корутинами. Реализовать такое поведение можно с помощью модуля
contextvars
, который доступен начиная с Python 3.7.
import asyncio
import sys
import contextvars
global_symbol = contextvars.ContextVar('symbol')
async def indication(timeout):
while True:
print(global_symbol.get(), end='')
sys.stdout.flush()
await asyncio.sleep(timeout)
async def sleep(t, indication_t, symbol='.'):
loop = asyncio.get_event_loop()
global_symbol.set(symbol)
task = loop.create_task(indication(indication_t))
await asyncio.sleep(t)
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
sleep(1, 0.1, '0'),
sleep(1, 0.1, 'a'),
sleep(1, 0.1, 'b'),
sleep(1, 0.1, 'c'),
))