python

Чтение больших объемов данных в Python/Postgresql

  • среда, 6 апреля 2016 г. в 03:16:13
https://habrahabr.ru/post/280822/
  • Python
  • PostgreSQL


Стек рассматриваемых технологий: Postgresql 9.3, Python 2.7 с установленным модулем «psycopg2».

Проблема


Как часто в вашей практике приходилось сталкиваться с задачей обработки таблиц большого объема (более 10 млн. записей)? Думаю вы согласитесь, что данная задача является довольно ресурсоемкой как в плане времени обработки, так и задействованных ресурсов системы. Сегодня я постараюсь показать альтернативный способ решения задачи.

Предложение:


В СУБД Postgresql есть прекрасный оператор для работы с большими объемами информации, а именно «COPY». Применение данного оператора позволяет нам читать и записывать огромные объемы информации в таблицу. В данной статье мы будем рассматривать режим чтения.

Согласно документации оператора «COPY» нам доступны несколько режимов чтения в файл либо в поток STDOUT, а также различные форматы, в том числе и «csv». Как раз его мы и постараемся использовать с максимальной пользой.

Подготовка:


В качестве «подопытного кролика» мы создадим таблицу с 1 млн. записей и напишем небольшой скрипт, отражающий суть метода. Sql файл можно найти в моем репозитории git (ссылку можно найти внизу статьи).

Также не забудьте установить расширение psycopg2!

Реализация:


Для выборки данных мы будем использовать чудесную функцию «copy_expert», которая позволяет нам исполнять «COPY» запросы из Python клиента.

query = """
    SELECT * from big_data inner join big_data as t1 USING(fname)
"""
output = StringIO()
self.cursor.copy_expert("COPY (%s) TO STDOUT (FORMAT 'csv', HEADER true)" % query, output)
data = output.getvalue()
output.close()
result = list()
for item in getResults(data):
    # do whatever we need
    item = {k: None if v == "" else v for k, v in item.items()}
    result.append(item)

Пояснения к коду:

  1. В запросе делаем объединение на себя, для его усложнения (замечено, что преимущество в скорости прямо пропорционально сложности запроса);
  2. В качестве буфера используем объект «StringIO», куда мы будем записывать данные из курсора;
  3. Парсить строку будем генератором «getResults»;
  4. Для удобства интерпретации я преобразую все пустые строки в тип «None», т.к. после использования «COPY» мы получаем строковые значения;
  5. Хочу отметить, что формат я будем использовать «csv» с лидирующей строкой заголовков, почему именно так поймете, немного позже.

Код генератора:

def getResults(stream):
    """
    get result generator
    """
    f = StringIO(stream)
    result = csv.DictReader(f, restkey=None)
    for item in result:
        yield item
    f.close()

Пояснения:

  1. Как видно из листинга, опять же используем уже знакомый буфер «StringIO»;
  2. Для преобразования строки «csv» в словарь (dictionary) используем метод «DictReader» родной библиотеки csv. По умолчанию, данный метод принимает первую строку за список полей словаря.

Вот, собственно, и все, что нам требуется!

Моя конфигурация: MacBook Air 2013 Processor: 1,3 GHz Intel Core i5, Ram: 4 ГБ 1600 МГц DDR3, SSD.

PS:
Хочу отметить, что данный подход к ускорению чтения не всегда работает, а именно, если у вас довольно простая таблица из 3-5 полей, ощутимой разницы вы не заметите (по крайней мере до 1 млн.). Однако данный метод показывает просто сумасшедший прирост в скорости, при сложных запросах ускорение достигает до 10-20 раз! Также очень сильно влияет конфигурация железа, на котором исполняется скрипт.

Весь код можно найти в git репозитории https://github.com/drizgolovicha/python_bulk_read.

Буду рад замечаниям и предложениям по оптимизации!

Спасибо что дочитали до конца.

UPD:
Результаты замеров выборка (14к) записей:
  1. Прямой SELECT, Where условие по неиндексированному полю — 21,4с
  2. COPY предыдущего запроса — 13,1с
  3. Выборка того же SELECT, но из materialized view с индексом по полю — 12,6с
  4. COPY materialized view — 1.8с