Data science и качественный код
- вторник, 18 апреля 2017 г. в 03:13:16
Обычно модели машинного обучения строят в jupyter-ноутбуках, код которых выглядит, мягко говоря, не очень — длинные простыни из лапши выражений и вызовов "на коленке" написанных функций. Понятно, что такой код почти невозможно поддерживать, поэтому каждый проект переписывается чуть ли не с нуля. А о внедрении этого кода в production даже подумать страшно.
Поэтому сегодня представляем на ваш строгий суд превью python'овской библиотеки по работе с датасетами и data science моделями. С ее помощью ваш код на python'е может выглядеть так:
my_dataset.
load('/some/path').
normalize().
resize(shape=(256, 256, 256)).
random_rotate(angle=(-30, 30)).
random_crop(shape=(64, 64, 64))
for i in range(MAX_ITER):
batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True)
# обучаем модель, подавая ей батчи с данными
В этой статье вы узнаете об основных классах и методах, которые помогут сделать ваш код простым, понятным и удобным.
Библиотека пока проходит финальную полировку и еще не выложена в открытый доступ.
Данная статья не является полной документацией, а лишь кратким описанием библиотеки и примеров ее использования.
Ваши комментарии помогут доработать библиотеку и включить в нее нужные вам возможности.
Объем данных может быть очень большим, да и к началу обработки данных у вас может вообще не быть всех данных, например, если они поступают постепенно. Поэтому класс Dataset
и не хранит в себе данные. Он включает в себя индекс — перечень элементов ваших данных (это могут быть идентификаторы или просто порядковые номера), а также Batch
-класс, в котором определены методы работы с данными.
dataset = Dataset(index = some_index, batch_class=DataFrameBatch)
Основное назначение Dataset
— формирование батчей.
batch = dataset.next_batch(BATCH_SIZE, shuffle=True)
# batch - объект класса DataFrameBatch,
# содержащий BATCH_SIZE элементов датасета
или можно вызвать генератор:
for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True):
# batch - объект класса DataFrameBatch
Батчи можно собирать строго упорядоченно или хаотически, итерировать бесконечно или сделать ровно 1 цикл по вашим данным. Можно даже на каждом шаге создавать батчи разного размера, если в вашей ситуации это имеет смысл.
Кроме итерирования в Dataset
доступна еще одна полезная операция — cv_split
— которая делит датасет на train, test и validation. И, что особенно удобно, каждый из них снова является датасетом.
dataset.cv_split([0.7, 0.2, 0.1]) # делим в пропорции 70 / 20 / 10
# а дальше все обычно
for i in range(MAX_ITER):
batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True)
# обучаем модель, подавая ей батчи с данными
Адресация элементов датасета осуществляется с помощью индекса. Это может быть набор идентификаторов (клиентов, транзакций, КТ-снимков) или просто порядковые номера (например, numpy.arange(N)
). Датасет может быть (почти) сколь угодно большим и не помещаться в оперативную память. Но это и не требуется. Ведь обработка данных выполняется батчами.
Создать индекс очень просто:
ds_index = DatasetIndex(sequence_of_item_ids)
В качестве последовательности может выступать список, numpy
-массив, pandas.Series
или любой другой итерируемый тип данных.
Когда исходные данные хранятся в отдельных файлах, то удобно строить индекс сразу из списка этих файлов:
ds_index = FilesIndex(path='/some/path/*.dat', no_ext=True)
Тут элементами индекса станут имена файлов (без расширений) из заданной директории.
Бывает, что элементы датасета (например, 3-мерные КТ снимки) хранятся в отдельных директориях.
ds_index = FilesIndex(path='/ct_images_??/*', dirs=True)
Так будет построен общий индекс всех поддиректорий из /ct_images_01
, /ct_images_02
, /ct_images_02
и т.д. Файловый индекс помнит полные пути своих элементов. Поэтому позднее в методе load
или save
можно удобно получить путь index.get_fullpath(index_item)
.
Хотя чаще всего вам вообще не придется оперировать индексами — вся нужная работа выполняется внутри, а вы уже работаете только с батчем целиком.
Вся логика хранения и методы обработки ваших данных определяются в Batch
-классе. Давайте в качестве примера создадим класс для работы с КТ-снимками. Базовый класс Batch
, потомком которого и станет наш CTImagesBatch
, уже имеет атрибут index
, который хранит список элементов данного батча, а также атрибут data
, который инициализируется в None
. И поскольку нам этого вполне хватает, то конструктор переопределять не будем.
Поэтому сразу перейдем к созданию action
-метода load
:
class CTImagesBatch(Batch):
@action
def load(self, src, fmt):
if fmt == 'dicom':
self.data = self._load_dicom(src)
elif fmt == 'blosc':
self.data = self._load_blosc(src)
elif fmt == 'npz':
self.data = self._load_npz(src)
else:
raise ValueError("Incorrect format")
return self
Во-первых, метод обязательно должен предваряться декоратором @action
(чуть позже вы узнаете зачем).
Во-вторых, он должен возвращать Batch
-объект. Это может быть новый объект того же самого класса (в данном случае CTImagesBatch), или объект другого класса (но обязательно потомка Batch
), или можно просто вернуть self
.
Такой подход позволяет описывать цепочки действий над данными. Причем в ходе обработки данные могут меняться не только по содержанию, но и по формату и структуре.
Не будем сейчас тратить время на приватные методы _load_dicom
, _load_blosc
и _load_npz
. Они умеют загружать данные из файлов определенного формата и возвращают 3-мерный numpy
-массив — [размер батча, ширина изображения, высота изображения]. Главное, что именно здесь мы определили, как устроены данные каждого батча, и дальше будем работать с этим массивом.
Теперь напишем метод very_complicated_processing
, который выполняет какую-то чрезвычайно сложную обработку снимков. Поскольку снимки в батче независимы друг от друга, то было бы удобно обрабатывать их параллельно.
class CTImagesBatch(Batch):
...
@action
@inbatch_parallel(target='threads')
def very_complicated_processing(self, item, *args, **kwargs):
# очень сложные вычисления...
return processed_image_as_array
То есть метод следует писать так словно он обрабатывает один снимок, и индекс этого снимка передается в первом параметре.
Чтобы магия параллелизма сработала, метод необходимо обернуть декоратором, где задается технология параллелизма (процессы, потоки и т.д.), а также функции пре- и постпроцессинга, которые вызываются до и после распараллеливания.
Кстати, операции с интенсивным вводом-выводом лучше писать как async
-методы и распараллеливать через target=’async’
, что позволит значительно ускорить загрузку-выгрузку данных.
Понятно, что это все добавляет удобства при программировании, однако совсем не избавляет от "думания", нужен ли тут параллелизм, какой именно и не станет ли от этого хуже.
Когда все action
-методы написаны, можно работать с батчем:
for i in range(MAX_ITER):
batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True)
processed_batch = batch.load('/some/path/', 'dicom')
.very_complicated_processing(some_arg=some_value)
.resize(shape=(256, 256, 256))
.random_rotate(angle=(-30, 30))
.random_crop(shape=(64, 64, 64))
# обучаем модель, подавая ей processed_batch с готовыми данными
Выглядит неплохо… но как-то это неправильно, что итерация по батчам смешана с обработкой данных. Да и цикл обучения модели хочется предельно сократить, чтобы там вообще ничего кроме next_batch
не было.
В общем, надо вынести цепочку action
-методов на уровень датасета.
И это можно сделать. Мы ведь не зря городили все эти action
-декораторы. В них скрывается хитрая магия переноса методов на уровень датасета. Поэтому просто пишите:
ct_images_pipeline = ct_images_dataset.pipeline().
.load('/some/path/', 'dicom')
.very_complicated_processing(some_arg=some_value)
.resize(shape=(256, 256, 256)).
.random_rotate(angle=(-30, 30))
.random_crop(shape=(64, 64, 64))
# ...
for i in range(MAX_ITER):
batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True)
# обучаем модель, подавая ей батчи с обработанными данными
Вам не нужно создавать новый класс-потомок Dataset
и описывать в нем все эти методы. Они есть в соответствующих Batch
-классах и отмечены декоратором @action
— значит вы их можете смело вызывать словно они есть в классе Dataset
.
Еще одна хитрость заключается в том, что при таком подходе все action
-методы становятся "ленивыми" (lazy) и выполняются отложенно. То есть загрузка, обработка, ресайз и прочие действия выполняются для каждого батча в момент формирования этого батча при вызове next_batch
.
И поскольку обработка каждого батча может занимать много времени, то было бы неплохо формировать батчи заблаговременно. Это особенно важно, если обучение модели выполняется на GPU, ведь тогда простой GPU в ожидании нового батча может запросто "съесть" все преимущества ее высокой производительности.
batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3)
Параметр prefetch
указывает, что надо параллельно считать 3 батча. Дополнительно можно указать технологию распараллеливания (процессы, потоки).
В реальных задачах машинного обучения вам редко придется иметь дело с единственным датасетом. Чаще всего у вас будет как минимум два набора данных: X и Y. Например, данные о параметрах домов и данные о их стоимости. В задачах компьютерного зрения кроме самих изображений еще есть метки классов, сегментирующие маски и bounding box’ы.
В общем, полезно уметь формировать параллельные батчи из нескольких датасетов. И для этого вы можете выполнить операцию join
или создать JointDataset
.
Если вам нужна лишь параллельная итерация по батчам, то удобнее будет создать единый датасет:
joint_dataset = JointDataset((ds_X, ds_Y))
Если ds_X
и ds_Y
основаны не на одном и том же индексе, то важно, чтобы индексы были одинаковой длины и одинаково упорядочены, то есть значение ds_Y[i]
соответствовало значению ds_X[i]
. В этом случае создание датасета будет выглядеть немного иначе:
joint_dataset = JointDataset((ds_X, ds_Y), align='order')
А дальше все происходит совершенно стандартным образом:
for i in range(MAX_ITER):
batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True)
Только теперь next_batch
возвращает не один батч, а tuple с батчами из каждого датасета.
Естественно, JointDataset
можно состоять и из пайплайнов:
pl_images = ct_images_ds.pipeline()
.load('/some/path', 'dicom')
.hu_normalize()
.resize(shape=(256,256,256))
.segment_lungs()
pl_labels = labels_ds.pipeline()
.load('/other/path', 'csv')
.apply(lambda x: (x['diagnosis'] == 'C').astype('int'))
full_ds = JointDataset((pl_images, pl_labels), align=’same’)
for i in range(MAX_ITER):
images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True)
# и теперь обучаем нейросеть, подавая в нее изображения и метки классов
И поскольку компонентами датасета являются пайплайны, то загрузка и обработка изображений и меток запускается лишь при вызове next_batch
. То есть все вычисления выполняются и батч формируется только тогда, когда он нужен.
Однако бывают и иные ситуации, когда нужно выполнить операцию с датасетом, применяя к нему данные из другого датасета.
Это лучше продемонстрировать на примере с КТ-снимками. Загружаем координаты и размеры раковых новообразований и формируем из них 3-мерные маски.
pl_masks = nodules_ds.pipeline()
.load('/other/path', 'csv')
.calculate_3d_masks()
Загружаем КТ-снимки и применяем к ним маски, чтобы выделить только раковые области.
pl_images = ct_images_ds.pipeline().
.load('/some/path', 'dicom')
.hu_normalize()
.resize(shape=(256, 256, 256))
.join(pl_masks)
.apply_masks(op=’mult’)
В join
вы указываете датасет. Благодаря чему в следующий action
-метод (в данном примере в apply_masks
) в качестве первого аргумента будут передаваться батчи из этого датасета. И не какие попало батчи, а ровно те, которые и нужны. Например, если текущий батч из ct_images_ds
содержит снимки 117, 234, 186 и 14, то и присоединяемый батч с масками также будет относиться к снимкам 117, 234, 186 и 14.
Естественно, метод apply_masks
должен быть написан с учетом данного аргумента, ведь его можно передать и явно, без предварительного join
'а. Причем в action
-методе можно уже не задумываться об индексах и идентификаторах элементов батча — вы просто к массиву снимков применяете массив масок.
И снова отмечу, что никакие загрузки и вычисления, ни с изображениями, ни с масками не будут запущены, пока вы не вызовете pl_images.next_batch
Итак, посмотрим как будет выглядет полный workflow data science проекта.
ct_images_index = FilesIndex(path='/ct_images_??/*', dirs=True)
ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch)
Выполняем препроцессинг и сохраняем обработанные снимки
ct_images_dataset.pipeline()
.load(None, 'dicom') # загружаем данные в формате dicom по путям из индекса
.hu_normalize()
.resize(shape=(256, 256, 256))
.segment_lungs()
.save('/preprocessed/images', 'blosc')
.run(BATCH_SIZE, shuffle=False, one_pass=True)
Описываем подготовку и аугментацию данных для модели
ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*')
ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch)
#
ct_images_pipeline = ct_preprocessed_dataset.pipeline()
.load(None, 'blosc')
.split_to_patches(shape=(64, 64, 64))
#
ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch)
ct_masks_pipeline = ct_masks_ds.pipeline().
.load('/preprocessed/masks', 'blosc')
.split_to_patches(shape=(64, 64, 64))
#
full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline))
Формируем тренировочные батчи и обучаем модель
full_ds.cv_split([0.8, 0.2])
for i in range(MAX_ITER):
images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True)
# обучаем модель, подавая в нее снимки и маски
for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True):
# рассчитываем метрики качества модели
Вот такая вот удобная библиотека, которая помогает значительно быстрее разрабатывать понятный код высокого качества, повторно использовать ранее созданные модели со сложным препроцессингом данных и даже разрабатывать production-ready системы.
А теперь вопрос: что еще стоит добавить в библиотеку? чего вам остро не хватает при работе с данными и моделями?