Технология разработки приложений для модели публикация-подписка
Задача создания собственных приложений Издателя и подписчика рассматривается на следующей упрощенной модели: один издатель, один брокер и один подписчик (рис.10.2). Будет использоваться формат сообщений WebSphere MQ – MQRFH и классический MQI интерфейс. Для создания приложений необходимо иметь руководство по командам и опциям Publish/Subscribe, а также некоторые знания по программированию на С.
Рис. 10.2. Модель Издатель/Подписчик
Приложение Издателя должно публиковать символьные строки, задаваемые пользователем на определенные темы. Следовательно, такая программа с именем publisher может иметь следующий формат запуска:
publisher Topic Command QMgrName PubQueue
где Topic – тема публикации, символьная строка длиной не более 256 байт; Command – команды Издателя для брокера (RegPub, Publish, ReqUpdate, DeletePub, DeregPub), PubQueue - очередь издателя. Текст публикации вводится в командном окне с запущенной программой.
Приложение подписчика должно иметь возможность зарегистрироваться (подписаться) на заданные темы и отображать результаты полученных публикаций. Программа подписчика с именем subscriber может иметь формат запуска:
subscriber Topic QMgrName SubQueue
где Topic – тема подписки, SubQueue - очередь подписчика. Текст публикации на тему Topic отображается в командном окне с запущенной программой subscriber.
Для работы приложений требуется создание очередей: одна у Издателя и одна у подписчика, наименования которых, соответственно, Publisher_queue и Subscriber_queue. издатель будет получать сообщения от брокера через Publisher_queue. подписчик - регистрировать темы и получать публикации через Subscriber_queue. Создание этих очередей осуществляется простой командой runmqsc:
runmqsc QMgrName define qlocal(имя очереди) end
Следует рассмотреть потоки сообщений и то, как сообщения по определенной теме находят своего подписчика.
- подписчик посылает заявки на подписку в управляющую очередь брокера: SYSTEM.BROKER.CONTROL.QUEUE, содержащие подписку на определенную тему, например, TestTopic. брокер читает это сообщение и запоминает детали подписки, такие как тема и очередь, в которой подписчик хочет получать сообщения. Потоки являются средством группирования разных тем. Состояния брокера и заявок на подписку запоминаются во внутренних очередях брокера: SYSTEM.BROKER.*.
- Приложение-издатель публикует информацию по определенной теме, например, по TestTopic и оно попадает в потоковую очередь брокера. По умолчанию это очередь:
SYSTEM.BROKER.DEFAULT.STREAM
- Сообщение о публикации читается брокером и направляется в очередь подписчика.
- Приложение подписчика читает публикацию из очереди подписчика.
Сообщения публикация-подписка имеет время жизни, по истечению которого они удаляются. Сообщения публикация-подписка являются постоянными сообщениями (persistent) и восстанавливаются после перезагрузки менеджера и/или брокера.
Также как и сообщения WebSphere MQ потоки информации от издателя к подписчику являются асинхронными. Если брокер не работает, то публикация будет оставаться в потоковой очереди до тех пор, пока брокер не стартует. Если приложение подписчика не активно, то публикация будет оставаться в очереди подписчика.
Основные шаги, из которых складывается создание Приложения издателя. Таких шагов для программы publisher будет 7.
Шаг 1. Для того чтобы положить сообщение в очередь потоков брокера, необходимо подключиться к менеджеру брокера и открыть очередь командой MQOPEN для помещения сообщений.
Шаг 2. Для публикации необходимо сформировать MQRFH структуру (см. руководство по WebSphere MQ Publish/Subscribe). Все поля этой структуры должны иметь определенные значения. Некоторые значения MQRFH должны быть изменены по сравнению со значениями по умолчанию перед публикацией или в тот момент, когда мы определим значения этих полей.
Шаг 3. Сразу за структурой MQRFH должна следовать символьная строка NameValueString. Указатель pNameValueString определяет начальную позицию NameValueString в теле сообщения.
Шаг 4. Содержимое строки NameValueString должно включать всю необходимую информацию о публикации и всю необходимую последовательность команд для формирования непрерывной символьной строки нашей публикации. Содержимое строки NameValueString позволяет брокеру определить Publish/Subscribe команды и порядок их обработки.
Шаг 5. Поле StrucLength должно содержать длину MQRFH структуры и сопровождающей его строки NameValueString. Длина MQRFH фиксирована и длина NameValueString – переменная. Значение StrucLength позволяет не применять разделитель конца строки в NameValueString, хотя и он может быть использован, если это необходимо приложению-подписчику. MQRFH и NameValueString выравниваются на границе 16 байт автоматически.
Шаг 6. Данные о публикации ( в данном случае строка данных для публикации) помещаются сразу после MQFRH и NameValueString структуры, указатель pUserData должен быть установлен на начальную позицию строки введенных данных о публикации.
Шаг 7. Осуществляется процедура тестирования работы программы. Программа компилируется, устраняются ошибки компиляции. Она вызывается, например, командой:
publisher TestTopic Publish QM_broker Publisher_queue
В командном окне программы publisher вводится сообщение по теме: Hello
На этом этапе еще нет приложения подписчика, поэтому необходимо:
- проверить, нет ли сообщений об ошибках от брокера в очереди Publisher_queue и исправить это; есть ли успешные сообщения от брокера;
- посмотреть сгенерированное сообщение о публикации в очереди SYSTEM.DEFAULT.LOCAL.STREAM, используя MQ explorer или команду amqsbcg и убедиться, что в формате сообщения о публикации нет ошибок.
Основные шаги, из которых складывается создание Приложения подписчика.
Шаг 1. Прежде всего, командой MQOPEN необходимо открыть очередь брокера для того, чтобы затем положить командное сообщение в соответствующую очередь.
Шаг 2. На этом шаге необходимо зарегистрировать интересы в виде темы и послать брокеру запрос на регистрацию подписки. Поскольку команды из основной программы будут посылаться брокеру довольно часто, необходимо разработать функцию SendBrokerCommand, одним из аргументов которой является командная строка для помещения в NameValueString командного сообщения. Эта функция SendBrokerCommand должна быть аналогичной по коду, как и для приложения Издателя.
Шаг 3. Теперь, когда подписчик зарегистрирован, можно ждать поступление публикаций в очередь подписчика, и как только они поступят, их можно прочесть командой MQGET. Первое, что необходимо сделать при получении сообщения, это проверить, что оно в нужном формате MQRFH.
Шаг 4. После распознавания формата MQRFH сообщения, можно выделить порцию наиболее интересного сообщения. В данном случае нет необходимости смотреть на строку NameValueString так как интересуют данные, которые следуют за этой строкой, они задаются указателем pUserData и могут быть напечатаны на экране или выведены в файл.
Шаг 5. На этом последнем шаге необходимо отказаться от регистрации и, как это делалось раньше на шаге 2, вызвать функцию SendBrokerCommand, добавив соответствующую команду для отказа от регистрации. Теперь, так же как и раньше, надо скомпилировать код и затем исправить ошибки компиляции. После этого программа готова для работы, ее можно выполнить:
subscriber TestTopic QM_broker Subscriber_queue
Этот запуск должен отобразить информацию об успешной регистрации у брокера. Если сообщение об успешной регистрации не поступило, то следует посмотреть сообщение об ошибке и исправить ее. После того как прошла успешная регистрация, можно попробовать получить сообщение по подписке. Для этого запустить программу publisher на тему TestTopic с сообщением, например, PublisherReady. В результате, в программе-подписчике должна появиться информация: PublisherReady.
Если этого не произойдет, то необходимо найти ошибку.
Получив успешно одну публикацию от одного издателя, можно расширить эксперимент и попробовать запустить много программ-издателей и программ-подписчиков. Каждой из них понадобиться своя очередь. Если сделать публикации на разные темы, можно ожидать получение определенных сообщений на экранах подписчиков, подписавшихся на соответствующие темы.