Введение в REDIS-PY

В этой статье начнём изучать библиотеку на языке Python для управления NoSQL хранилищем Redis.

redis-py

redis-py - это библиотека на языке Python для управления NoSQL хранилищем Redis.

Установка

Для установки redis-py требуется установленный и запущенный Redis сервер . В [redis’s quickstart] вы найдете инструкцию по установке.

Чтобы установить redis-py выполните:

$ sudo pip install redis

или воспользуйтесь pip

$ sudo easy_install redis

или из исходников

$ sudo python setup.py install

Проверка

>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
'bar'

Описание API

В официальной документации по Redis{target="_blank"} вы найдете подробное описание команд. Redis-py предоставляет два клиентских класса для работы с этими командами. Класс StrictRedis максимально приближен к официальному синтаксису команд самого Redis, за некоторым исключением:

  • SELECT - не используется. В разделе “Многопоточность” вы найдете этому объяснение.
  • DEL - del - зарезервированное слово в языке Python. Поэтому в redis-py используется delete
  • CONFIG GET|SET - реализованы как отдельные команды config_get и config_set.
  • MULTI/EXEC - реализованы как часть класса Pipline. Pipline команды окружены инструкциями MULTI и EXEC. Такое поведение можно отключить установив переменную transaction в значение False. Подробнее о Pipline читайте ниже.
  • SUBSCRIBE/LISTEN - идентично pipeline, PubSub реализован как отдельный класс. Он использует свое соединение так, чтобы можно было выполнять не pubsub команды. Вызвав pubsub метод из клиента Redis вы получаете экземпляр PubSub, при помощи которого можно подписаться на разные каналы и отслеживать поступающие сообщения. PUBLISH можно вызывать только из клиента Redis.
  • SCAN/SSCAN/HSCAN/ZSCAN - все эти команды повторяют соответствующую реализация из Redis. В дополнение каждая команда поддерживает итерационные методы. За счет этих команд пользователю не приходится отслеживать положения курсора при работе в цикле. Для этого используйте следующие команды: scan_iter/sscan_iter/hscan_iter/zscan_iter

Так же класс Redis будучи наследником StrictRedis использует некоторые команды для обеспечения поддержки старых версий redis-py:

  • LREM - устанавливает порядок аргументов ‘num’ и ‘value’ таким образом, что значение ‘num’ по-умолчанию равно нулю
  • ZADD - Redis указывает аргумент ‘score’ до ‘value’. Эта замена произошла по ошибке и была замечена после выхода рабочей версии. Аргументы для Redis следует передавать в порядке - name1, score1, name2, score2, …
  • SETEX - обратный порядок аргументов ‘time’ и ‘value’

Пул соединений

За кулисами redis-py использует пул соединений для управления соединениями с сервером Redis. По-умолчанию, каждый экземпляр Redis создает свой собственный пул соединений. Такого поведения можно избежать, передав экземпляр уже существующего пула Redis классу как аргумент connection_pool. Такое решение может потребоваться при реализации сегментирования на клиентской стороне или получения полного контроля над соединениями.

>>> pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
>>> r = redis.Redis(connection_pool=pool)

Соединения

ConnectionPools управляет набором экземпляров Connection. Redis-py поддерживает два типа соединений. По-умолчанию используется стандартное соединение TCP. UnixDomainSocketConnection позволяет клиентам использующим одну машину в качестве сервера и клиента соединяться через Доменный сокет Unix (unix domain socket). Для использования UnixDomainSocketConnection просто передайте путь к файлу unix domain socket через строковый аргумент unix_socket_path. Так же проверьте что параметр unixsocket указан в файле redis.conf. По-умолчанию он закомментирован.

>>> r = redis.Redis(unix_socket_path='/tmp/redis.sock')

Так же вы можете создавать свои подклассы Connection. Такие классы полезны если требуется полностью контролировать поведение сокетов в пределах async фреймворка. Для инициализации клиентского класса с собственным соединением, сначала необходимо создать пул соединений, передав свой класс соединения в параметре connection_class. Остальные параметры необходимые пулу будут переданы классу указанному в процессе инициализации.

>>> pool = redis.ConnectionPool(connection_class=YourConnectionClass,  your_arg='...', ...)

Парсеры

Классы парсера помогают контролировать парсинг ответов сервера Redis. Redis-py содержит два класса парсеров: PythonParser и HiredisParser. Сначала redis-py применит HiredisParser если активирован модуль hiredis. В случае неудачи будет использован класс PythonParser.

Hiredis - библиотека написанная на языке С и поддерживаемая разработчиками ядра Redis. Применение Hiredis позволяет ускорить парсинг ответов сервера Redis до 10 раз. Такое улучшение заметно при выборке большого количества данных, например результат операций LRANGE и SMEMBERS. Hiredis доступен в PyPl и может быть установлен при помощи pip или easy_install:

$ pip install hiredis

или

$ easy_install hiredis

Callback функции ответа

Клиентский класс использует ряд callback функций для трансформации ответа от сервера к корректному виду для Python. Существует определённый набор этих функций заданных в клиентском классе Redis в словаре RESPONSE_CALLBACKS.

Собственные виды таких функций можно задать каждому экземпляру Redis используя метод set_response_callback. Этот метод принимает два параметра: название команды и функцию callback. Функция, добавленная таким образом, действительна только для экземпляра, для которого она была задана. Если вам необходимо задать функцию callback глобально, то следует создать подкласс клиента Redis и добавить свою функцию в словарь REDIS_CALLBACKS.

Callback ответа должен получить минимум один параметр: ответ от сервера Redis. Также можно передавать ключевые параметры для контроля интерпретации ответа. Такие параметры указываются при вызове команды execute_command. Реализация ZRANGE отлично показывает применение ключевых аргументов в callback функциях.

Многопоточность

Экземпляры клиентов Redis можно использовать в нескольких потоках одновременно. Соединения извлекаются из пула только при выполнении команд, и возвращаются после их завершения. Выполнение команды никогда не изменяет состояние экземпляра клиентского класса.

Но существует одно исключение - команда Redis SELECT. Эта команда позволяет переключиться на другую базу данных. Эта БД остается выбранной до тех пор пока не будет выбрана другая или закрыто соединение. Такое поведение может вернуть не то соединение в пул, которое было установлено изначально.

Поэтому redis-py не поддерживает команду SELECT на стороне клиента. При использовании нескольких БД в пределах одного приложения, следует создавать отдельные экземпляры клиентов, а по возможности и отдельные пулы соединений, для каждой базы данных. Не рекомендуется передавать PubSub или Pipline объекты между потоками.

Pipelines

Pipline - подкласс базового класса Redis, обеспечивающий поддержку буферизации нескольких команд в пределах одного запроса к серверу. Такая методика значительно ускоряет производительность за счет сокращения числа передачи TCP пакетов от клиента к серверу.

Piplines довольны просты в применении:

>>> r = redis.Redis(...)
>>> r.set('bing', 'baz')
>>> # Используйте метод pipeline() для создания экземпляра pipeline
>>> pipe = r.pipeline()
>>> # Следующие команды SET будут буферезированы
>>> pipe.set('foo', 'bar')
>>> pipe.get('bing')
>>> # метод EXECUTE выполнит все буферезированные команды
>>> # и вернёт список ответов для каждой команды
>>> pipe.execute()
[True, 'baz']

Для упрощения, каждая команда после буферизации возвращает объект pipeline. То есть можно выполнять команды по цепочке:

>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]

Так же pipeline автоматически выполняет команды группами. Чтобы изменить такое поведение, достаточно отключить транзакции:

>>> pipe = r.pipeline(transaction=False)

Довольно частая проблема - необходимость выборки дополнительных данных перед выполнением транзакции. Например, допустим команды INCR не существует, а нам необходимо создать атомарную версию команды INCR в Python.

Самое простое решение - получить значение при помощи GET, инкрементировать его в Python и вернуть значение командой SET. Но это не атомарное решение, так как разные клиенты могут выполнять эту операцию одновременно и каждый получит одинаковое значение от GET.

Используйте команду WATCH. Она позволяет отслеживать один или несколько ключей до начала транзакции. Если одно из этих значений изменится до начала транзакции, то она будет отменена и появится исключение типа WatchError. Поэтому решение нашей проблемы на стороне клиента может выглядеть следующим образом:

>>> with r.pipeline() as pipe:
...    while 1:
...        try:
...            pipe.watch('OUR-SEQUENCE-KEY')
...            current_value = pipe.get('OUR-SEQUENCE-KEY')
...            next_value = int(current_value) + 1
...            pipe.multi()
...            pipe.set('OUR-SEQUENCE-KEY', next_value)
...            pipe.execute()
...            break
...       except WatchError:
...            continue

Заметьте, так как Pipeline должен быть привязан к одному соединению в течении WATCH, следует взять на себя ответственность за возврат соединения в пул после вызова метода reset(). Если Pipeline используется в качестве контекста (как в примере выше), то метод reset() вызывается автоматически. Конечно вы можете вручную вызвать reset():

>>> pipe = r.pipeline()
>>> while 1:
...    try:
...        pipe.watch('OUR-SEQUENCE-KEY')
...        ...
...        pipe.execute()
...        break
...    except WatchError:
...        continue
...    finally:
...        pipe.reset()

Метод transaction упрощает отслеживание ошибок. Он принимает исполняемую функцию в качестве параметра, объект типа pipeline и любой набор ключей для отслеживания. Таким образом мы можем переписать наше решение следующим образом:

>>> def client_side_incr(pipe):
...    current_value = pipe.get('OUR-SEQUENCE-KEY')
...    next_value = int(current_value) + 1
...    pipe.multi()
...    pipe.set('OUR-SEQUENCE-KEY', next_value)
>>>
>>> r.transaction(client_side_incr, 'OUR-SEQUENCE-KEY')
[True]

Публикация/Подписка

Redis-py включает в себя объект PubSub, который может подписываться на разные каналы и отслеживать появление новых сообщений. Создать такой объект довольно просто:

>>> r = redis.StrictRedis(...)
>>> p = r.pubsub()

После создания объекта можно подписываться на каналы:

>>> p.subscribe('my-first-channel', 'my-second-channel', ...)
>>> p.psubscribe('my-*', ...)

Теперь PubSub подписан на указанные каналы. Подтверждение этому являются поступающие сообщения:

>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-second-channel', 'data': 1L}
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-first-channel', 'data': 2L}
>>> p.get_message()
{'pattern': None, 'type': 'psubscribe', 'channel': 'my-*', 'data': 3L}

Каждое сообщение представляет собой словарь со следующими ключами:

  • type - одно из следующих значений: subscribe, unsubscribe, psubscribe, punsubscribe, message, pmessage
  • channel - имя канала, от которого поступило сообщение
  • pattern - паттерн соответствующий каналу опубликовавшему сообщение. Принимает значение None за исключением типов pmessage
  • data - данные сообщения. Содержит номер каналов и паттернов, на которое данное соединение подписано. При сообщениях типа pmessage это поле содержит текст самого сообщения.

Давайте отправим сообщение.

# метод  publish возвращает количество  каналов или паттернов подписки. 
# 'my-first-channel' совпадает с 'my-first-channel'
# подпиской и паттенном подписки 'my-*', так что эт осообщение
# будет доставлено в 2 канала
>>> r.publish('my-first-channel', 'some data')
2
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 'some data', 'pattern': None, 'type': 'message'}
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 'some data', 'pattern': 'my-*', 'type': 'pmessage'}

Отписаться также просто. Если не передавать аргументов команде, то мы отпишемся от всех каналов.

>>> p.unsubscribe()
>>> p.punsubscribe('my-*')
>>> p.get_message()
{'channel': 'my-second-channel', 'data': 2L, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': 'my-first-channel', 'data': 1L, 'pattern': None, 'type': 'unsubscribe'}
>>> p.get_message()
{'channel': 'my-*', 'data': 0L, 'pattern': None, 'type': 'punsubscribe'}

Redis-py также позволяет регистрировать callback функции для управления публикацией сообщений. Обраотчик сообщения получает один аргумент - сообщение, которое представляет из себя словарь. Для того, чтобы подписаться на канал или паттерн с обработчиком сообщения, необходимо передать канал или название паттерна как ключевой аргумент, значением которого является сама callback функция.

При поступлении сообщения через канал или паттерн, такой словарь создается и передается обработчику сообщения. В нашем случае, мы получаем None в результате выполнения get_message(), так как сообщение было обработано ранее.

>>> def my_handler(message):
...    print 'MY HANDLER: ', message['data']
>>> p.subscribe(**{'my-channel': my_handler})
# читаем сообщение
>>> p.get_message()
{'pattern': None, 'type': 'subscribe', 'channel': 'my-channel', 'data': 1L}
>>> r.publish('my-channel', 'awesome data')
1
>>> message = p.get_message()
MY HANDLER:  awesome data
# `message` является None потому что сообщение было обработано нашим обработчиком.
>>> print message
None

Если такой функционал не требуется вашему приложению, вы можете просто игнорировать сообщения передав ignore_subscribe_messages=True в r.pubsub(). Таким образом все сообщения будут прочитаны, но в вашем приложении они не отобразятся.

>>> p = r.pubsub(ignore_subscribe_messages=True)
>>> p.subscribe('my-channel')
>>> p.get_message()  # прячет сообщения и возвращает None
>>> r.publish('my-channel')
1
>>> p.get_message()
{'channel': 'my-channel', data': 'my data', 'pattern': None, 'type': 'message'}

Существует три стратегии чтения сообщений.

Вышеприведенные примеры использовали pubsub.get_message(). Этот метод использует системный модуль select, чтобы быcтро получить сокет соединения. Если присутствуют данные, то get_message() читает их, форматирует и возвращает. Если данных нет, то в ответ получаем None. Таким образом довольно легко внедрить такую методику в ваше приложение.

>>> while True:
>>>    message = p.get_message()
>>>    if message:
>>>        # делаем что-то с сообщением
>>>    time.sleep(0.001)

Более ранние версии redis-py читают сообщения при помощи метода pubsub.listen(). По сути listen() - это генератор блокирующий систему до получения сообщения. Если вам требуется только прочитать сообщение и выполнить какое-либо действие в зависимости от него, метод listen() будет самым простым решением.

>>> for message in p.listen():
...    # что-то делаем с сообщением

Третий вариант выполняет цикл в отдельном потоке. Pubsub.run_in_thread() создает новый поток и запускает цикл. Объект потока возвращается в результате выполнения run_in_thread(). Метод thread.stop() останавливает поток и цикл в нем. По сути это просто оболочка для метода get_messages(), которая работает в отдельном потоке, тем самым отрабатывая без блокировки приложения. Run_in_thread() получает дополнительный параметр sleep_time. Если он указан, то в теле цикла при каждой итерации будет выполнена команда time.sleep().

Заметка: так как мы работаем в отдельном потоке, то обрабатывать сообщения можно только автоматически через обработчик. Таким образом, redis-py запрещает выполнять run_in_thread() если вы подписаны на паттерны или каналы без привязанных обработчиков.

>>> p.subscribe(**{'my-channel': my_handler})
>>> thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
>>> thread.stop()

Объект PubSub придерживается в той же семантической кодировке что и экземплер клиентского класса. Любой канал или паттерн использующий юникод будет передан Redis с тем же набором символов, что и на клиентской стороне. Если флаг decode_responses установлен в False (по-умолчанию), то значения 'channel', 'pattern' и 'data' в словаре будут байтовыми строками (str на Python2, byte на Python3). Если же этот флаг установлен в True, то все этим значения будут автоматические переведены в юникод строки.

Объект PubSub запоминает на какие каналы он подписан. В случае разрыва соединения, например ошибки сети, этот объект восстанавливает свое исходное состояние. Сообщения опубликованные в момент отключения не будут получены. Когда вы заканчиваете работу с объектом PubSub, необходимо вызвать метод close() для закрытия соединения.

>>> p = r.pubsub()
>>> ...
>>> p.close()

Скрипты LUA

Redis-py поддерживает команды EVAL, EVALSHA и SCRIPT. Хотя существует ряд нюансов при работе с ними. redis-py использует свой объект Script для упрощения работы. Для создания экземпляра скрипта, используйте функцию register_script на стороне клиента передав код LUA в качестве первого параметра. register_script возвращает экземпляр Script, который вы можете использовать дальше в своем коде.

Следующий пример LUA скрипта принимает два параметра: имя ключа и значение множителя. Скрипт выбирает значение ключа, умножает его на значение множителя и возвращает результат.

>>> r = redis.StrictRedis()
>>> lua = """
... local value = redis.call('GET', KEYS[1])
... value = tonumber(value)
... return value * ARGV[1]"""
>>> multiply = r.register_script(lua)

Теперь multiply - это скрипт, который можно выполнять как функцию. Script принимает следующие дополнительные параметры:

  • keys - список имен ключей, к которым будет получен доступ. Этот список становится KEYS в LUA
  • args - список значений параметров. Становится ARGS в LUA
  • client - Client Redis или Pipeline, который выполняет скрипт. Если клиент не указан, то используется клиент, который изначально использовался при создании экземпляра Script.

Продолжение примера:

>>> r.set('foo', 2)
>>> multiply(keys=['foo'], args=[5])
10

Значение ключа ‘foo’ - 2. При вызове multiply, ключ ‘foo’ и значение множителя передаются в скрипт. LUA выполняет скрипт и возвращает результат - 10. Экземпляры Script могут быть исполнены разными экземплярами клиента, даже если последний использует совсем другой Redis сервер.

>>> r2 = redis.StrictRedis('redis2.example.com')
>>> r2.set('foo', 3)
>>> multiply(keys=['foo'], args=[5], client=r2)
15

Объект Script заносит скрипт LUA в кеш Redis. В случае ошибки NOSCRIPT, он загрузит сохраненную версию скрипта и попробует выполнить её. Также объекты Script могут работать с Pipelines. Экземпляр pipeline следует передать при вызове скрипта в качестве аргумента клиента. Автоматически скрипт регистрируется в кеше Redis переде запуском pipeline.

>>> pipe = r.pipeline()
>>> pipe.set('foo', 5)
>>> multiply(keys=['foo'], args=[5], client=pipe)
>>> pipe.execute()
[True, 25]

Поддержка Sentinel

Redis-py может работать с Redis Sentinel для обнаружения узлов Redis. Потребуется как минимум одна служба Sentinel. Подключение redis-py к Sentinel довольно просто. Вы можете использовать соединение Sentinel для обнаружения главных и дочерних сетевых адресов.

>>> from redis.sentinel import Sentinel
>>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
>>> sentinel.discover_master('mymaster')
('127.0.0.1', 6379)
>>> sentinel.discover_slaves('mymaster')
[('127.0.0.1', 6380)]

Вы так же можете создать соединения клиента Redis при помощи экземпляра Sentinel. Можно просто подключиться к главной ветке (при записи) или к дочерней (при операциях чтения).

>>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
>>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
>>> master.set('foo', 'bar')
>>> slave.get('foo')
'bar'

Главный и дочерние объекты являются обычными экземплярами StrictRedis со своими пулами соединений привязанными к экземплярам Sentinel. При попытке подключения клиентов к Sentinel, сначала он обращается к серверам Sentinel для определения необходимого хоста. Если сервер не был найден, появляется исключение MasterNotFoundError или SlaveNotFoundError. Оба исключения являются подклассами ConnectionError.

При попытке подключения к дочернему хосту, пул соединений Sentinel просматривает все возможные соединения пока не найдет тот, к которому можно подключиться. Если нет доступных дочерних хостов, то соединение будет установлено с главным хостом.

Scan итераторы

С командами *SCAN, которые были представлены в Redis 2.8, могут возникнуть некотороые проблемы. Хотя эти команды полностью поддерживаются, redis-py использует следующие методы, которые возвращают итераторы для Python, для упрощения: scan_iter, hscan_iter, sscan_iter, zscan_iter.

>>> for key, value in (('A', '1'), ('B', '2'), ('C', '3')):
...    r.set(key, value)
>>> for key in r.scan_iter():
...    print key, r.get(key)
A 1
B 2
C 3

Комментарии

0
Войти
Комментариев нет.
Войдите чтобы оставлять комментарии.