http://habrahabr.ru/post/262697/
Сегодня хочу поговорить о такой интереснейшей ORM, как peewee. Система лёгкая, быстрая, синтаксис запросов немного сложнее, чем у Django ORM, однако позволяет потенциально следить за тем SQL кодом, который получается на выходе.
Поскольку я работал над Python приложением, соединяющимся с БД, выбор пал на простое решение, которое позволило бы стандартизировать обращения к базе данных. До этого коллеги в аналогичных приложениях использовали Django, но его установка делала бы application излишне громоздким (тем более, что в его requirements и так значилось слишком много зависимостей).
Через недельку работы над проектом, руководитель попросил добавить в базу несколько полей и, соответственно, возник вопрос: как делать migrate. Миграции в peewee есть. их механизм описан
тут. Однако, каким образом нам производить эти миграции — не понятно.
Абсолютно чистый, девственный механизм миграций предполагает, что мы можем его использовать так, как захотим сами. Пожалуй, первое, что приходит на ум — сделать процедуру полностью автоматической и запускать её при каждом старте приложения. Тем более, что приложение не требует моментальной реакции или молниеносного выполнения.
Перед описанием логики автоматических миграций, хотелось бы обговорить общие условия работы:
- Соединение с базой живёт в отдельном Borg классе с соответствующим атрибутом — ссылкой на соединение.
- Об автоматическом создании файлов миграции не идёт и речи. Только об автоматическом применении изменений.
- Второй пункт позволяет нам делать миграции как мы хотим, и для каждой новой версии нашего приложения использовать любую логику для сохранения предыдущих данных.
Итак, начнём с того, что создадим абстрактный класс для миграций.
Абстрактный класс для мигратораimport abc
from playhouse.migrate import (migrate, MySQLMigrator)
class Migrator(object):
"""
Migration interface
"""
__metaclass__ = abc.ABCMeta
connection = db_connection.connection # db_connection is a Borg instance
migrator = MySQLMigrator(db_connection.connection)
@abc.abstractproperty
def migrations(self):
"""
List of the migrations dictionaries
:param self: class instance
:return: list
"""
return [
{'statement': 1 != 2, 'migration': ['list', 'of', 'migration', 'options'],
'migration_kwargs': {}, 'pre_migrations': list(), 'post_migrations': list()}
] # Just an example
def migrate(self):
"""
Run migrations
"""
for migration in self.migrations:
if migration['statement']:
# Run scripts before the migration
pre_migrations = migration.get('pre_migrations', list())
for pre_m in pre_migrations:
pre_m()
# Migrate
with db_connection.connection.transaction():
migration_kwargs = migration.get('migration_kwargs', {})
migrate(*migration['migration'], **migration_kwargs)
# Run scripts after the migration
post_migrations = migration.get('post_migrations', list())
for post_m in post_migrations:
post_m()
Собственно, класс состоит из трёх частей: предопределённых аргументов, которые ссылаются на коннектор с БД, свойства migrations — списка словарей для миграции, а также, ф-ии migrate, которая запускает сначала все процедуры, предваряющие миграцию, потом саму миграцию, а потом ф-ии, которые должны выполниться после миграции.
Теперь, создадим какой-нибудь контроллер, который будет автоматически искать миграции и запускать их.
Образец автопоиска миграцийimport sys
import re
def get_migration_modules(packages=[]):
"""
Get python modules with migrations
:param packages: iterable - list or tuple with packages names for the searching
:return: list - ('module.path', 'module_name')
"""
# List of the modules to migrate
migration_modules = list()
for pack in packages:
migration_module = __import__(pack, globals(), locals(), fromlist=[str('migrations')])
try:
# Check, that imported object is module
if inspect.ismodule(migration_module.migrations):
# Find submodules inside the module
for importer, modname, ispkg in pkgutil.iter_modules(migration_module.migrations.__path__):
if re.match(r'^\d{3,}_migration_[\d\w_]+$', modname) and not ispkg:
migration_modules.append((migration_module.migrations.__name__, modname))
# Unregister module
sys.modules.pop(migration_module.__name__)
except AttributeError:
pass
return migration_modules
def get_migration_classes(migration_modules):
"""
Get list of the migration classes
:type migration_modules: iterable
:param migration_modules: array with a migration modules
:return: list
"""
migration_classes = list()
for mig_mod, m in migration_modules:
mig = __import__(mig_mod, globals(), locals(), fromlist=[m])
try:
target_module = mig.__getattribute__(m)
# Check, that imported object is module
if inspect.ismodule(target_module):
for name, obj in inspect.getmembers(target_module):
# Get all containing elements
if inspect.isclass(obj) and issubclass(obj, Migrator) and obj != Migrator:
# Save this elements
migration_classes.append(obj)
# Remove imported module from the stack
sys.modules.pop(mig.__name__)
except AttributeError:
pass
return migration_classes
Запускается автопоиск следующим образом:
Автопоиск и выполнение миграций# Get modules with migrations
m_mods = get_migration_modules(packages=['package_1', 'package_2', 'package_3'])
# Get migration classes
m_classes = get_migration_classes(m_mods)
# Execute migrations
for m_class in m_classes:
mig = m_class()
mig.migrate()
Обратите внимание на строчку r'^\d{3,}_migration_[\d\w_]+$' — она нужна для поиска миграций по шаблону. Значит, по этому шаблону нам следует создать файлы миграций.
В приложениях package_1, package_2 и package_3 создадим пакеты (с __init__.py) migrations. И, например, в package_1.migrations создадим модуль 001_migration_add_first_fields.py
Попробуем изобразить содержимое этого файла:
001_migration_add_first_fields.pyfrom controllers.migrator import Migrator
from package_1.models import FirstModel
class AddNewFields(Migrator):
"""
Append new fields to the FirstModel
"""
table_name = FirstModel._meta.db_table # Get name of the table for target model
def __field_not_exists(self):
"""
Check, that new field does not exists
:return: bool
"""
q = 'SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_NAME = \'{0}\' AND COLUMN_NAME = \'my_new_field\''.format(self.table_name)
cursor = self.connection.execute_sql(q)
result = int(cursor.fetchone()[0])
return result == 0
@property
def migrations(self):
return [
# add my_new_field column
{
'statement': self.__field_not_exists(),
'migration': [self.migrator.add_column(self.table_name, 'my_new_field', FirstModel.my_new_field)],
}
]
Всё! Теперь, при запуске автопоиска, контроллер найдёт AddNewFields и, если выполняется statement, запустит migration.
Вообще, система описана кратко, и, как можно понять, имеет небольшой запас мощности:
- Позволяет выполнять процедуры до или после запуска (соответственно, бэкапить и разворачивать данные из бэкапа).
- Позволяет проверять необходимость проведения миграций.
- Можно делать NULL поля без указания defaults.
Последнюю фишку рассмотрим подробнее:
Модифицируем def migrations()@property
def migrations(self):
# Modify NULL field
my_new_field = FirstModel.my_new_field
my_new_field.default = 'Rewrite me'
return [
# add my_new_field column
{
'statement': self.__field_not_exists(),
'migration': [self.migrator.add_column(self.table_name, 'my_new_field', FirstModel.my_new_field)],
}
]
Ну и, соответственно, если нам нужно после миграций для каждой записи сформировать уникальное значение нового поля, просто добавим в возвращаемый словарь элемент 'post_migrations' со списком функций, которые последовательно запустятся после добавления поля.