Интеграция приложений на основе WebSphere MQ

         

MQLONG CompCode; MQLONG Reason; MQOD


/*************************************************************************************/ /* Имя программы: AMQSGAMA */ /* Описание: Основанная на модели Publish/Subscribe программа */ /* моделирует результаты футбольного матча и */ /* отправляет их от издателя к брокеру */ /* Statement: Licensed Materials - Property of IBM */ /* SupportPac MA0E */ /* (C) Copyright IBM Corp. 1999 */ /*************************************************************************************/ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <cmqc.h> /* MQI */ #include <cmqpsc.h> /* MQI Publish/Subscribe */ #include <windows.h> #if MQAT_DEFAULT == MQAT_WINDOWS_NT #define msSleep(time) \ Sleep(time) #elif MQAT_DEFAULT == MQAT_UNIX #define msSleep(time) \ { \ struct timeval tval; \ tval.tv_sec = time / 1000; \ tval.tv_usec = (time % 1000) * 1000; \ select(0, NULL, NULL, NULL, &tval); \ } #endif #define STREAM "SAMPLE.BROKER.RESULTS.STREAM" #define TOPIC_PREFIX "Sport/Soccer/Event/" #define MATCH_STARTED "MatchStarted" #define MATCH_ENDED "MatchEnded" #define SCORE_UPDATE "ScoreUpdate" #define MATCH_LENGTH 30000 /* 30 Second match length */ #define REAL_TIME_RATIO 333 #define AVERAGE_NUM_OF_GOALS 5 #define DEFAULT_MESSAGE_SIZE 512 /* Maximum buffer size for a message */

static const MQRFH DefaultMQRFH = {MQRFH_DEFAULT}; typedef struct { MQCHAR32 Team1; MQCHAR32 Team2; } Match_Teams, *pMatch_Teams; void BuildMQRFHeader( PMQBYTE pStart , PMQLONG pDataLength , MQCHAR TopicType[] ); void PutPublication( MQHCONN hConn , MQHOBJ hObj , PMQBYTE pMessage , MQLONG messageLength , PMQLONG pCompCode , PMQLONG pReason );

int main(int argc, char **argv) { MQHCONN hConn = MQHC_UNUSABLE_HCONN; MQHOBJ hObj = MQHO_UNUSABLE_HOBJ; MQLONG CompCode; MQLONG Reason; MQOD od = { MQOD_DEFAULT }; MQLONG Options; PMQBYTE pMessageBlock = NULL; MQLONG messageLength; MQLONG timeRemaining; MQLONG delay; PMQCHAR pScoringTeam; pMatch_Teams pTeams; MQCHAR32 team1; MQCHAR32 team2; char QMName[MQ_Q_MGR_NAME_LENGTH+1] = ""; MQLONG randomNumber; MQLONG ConnReason;

/* Проверка аргументов программы */ if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) ) { printf("Usage: amqsgam team1 team2 <QManager>\n"); printf(" Maximum 31 characters per team name,\n"); printf(" no spaces or '\"' characters allowed.\n"); exit(0); } else { strcpy(team1, argv[1]); strcpy(team2, argv[2]); } /* Использовать default queue manager или заданный в зависимости от наличия аргумена */ if (argc > 3) strcpy(QMName, argv[3]); MQCONN( QMName, &hConn, &CompCode, &ConnReason ); if( CompCode == MQCC_FAILED ) { printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason); } else if( ConnReason == MQRC_ALREADY_CONNECTED ) { CompCode = MQCC_OK; } if( CompCode == MQCC_OK ) { strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH); Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING; MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n", od.ObjectName, CompCode, Reason); } }

if( CompCode == MQCC_OK ) { srand( (unsigned)(time( NULL )) + (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) ); timeRemaining = MATCH_LENGTH; messageLength = DEFAULT_MESSAGE_SIZE; pMessageBlock = (PMQBYTE)malloc(messageLength); if( pMessageBlock == NULL ) { printf("Unable to allocate storage\n"); } else { if( CompCode == MQCC_OK ) {

/* создание MQRFH для публикации о начале матча */ BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED ); pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams); printf("Match between %s and %s\n", team1, team2);

/* помещение сообщения (публикации) в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } else { /* Моделирование попытки забить один из 5 голов (каждые 30 сек) */ while( (timeRemaining > 0)&&(CompCode == MQCC_OK) ) { randomNumber = rand(); delay = REAL_TIME_RATIO + ( (randomNumber * MATCH_LENGTH) / (RAND_MAX * AVERAGE_NUM_OF_GOALS)); if( delay > timeRemaining ) delay = timeRemaining; msSleep(delay); timeRemaining -= delay; if( timeRemaining > 0 ) { if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */ { messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE ); printf("GOAL! "); pScoringTeam = (PMQCHAR)pMessageBlock + messageLength; if( rand() < (RAND_MAX/2) ) { strcpy(pScoringTeam, team1); printf(team1); } else { strcpy(pScoringTeam, team2); printf(team2); } printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO)); messageLength += sizeof(MQCHAR32);

/* помещение сообщения о забитом голе в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } /* конец цикла по времени окончания матча ( timeRemaining ) */ if( CompCode == MQCC_OK ) { printf("Full time\n"); messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED );

pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams);

/* помещение сообщения о конце матча в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } free( pMessageBlock ); } /* end of else (pMessageBlock != NULL) */ }

if( hObj != MQHO_UNUSABLE_HOBJ ) { MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason ); if( CompCode != MQCC_OK ) printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason); }

if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) ) { MQDISC( &hConn, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason); } return(0); } /* end of main Function */

/* Function Name : BuildMQRFHeader */ void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] ) { PMQRFH pRFHeader = (PMQRFH)pStart; PMQCHAR pNameValueString; memset((PMQBYTE)pStart, 0, *pDataLength); memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED); memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH); pRFHeader->CodedCharSetId = MQCCSI_INHERIT; pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED; strcpy(pNameValueString, MQPS_COMMAND_B); strcat(pNameValueString, MQPS_PUBLISH); strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B); strcat(pNameValueString, MQPS_NO_REGISTRATION); strcat(pNameValueString, MQPS_TOPIC_B); strcat(pNameValueString, TOPIC_PREFIX); strcat(pNameValueString, TopicType);

*pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16; pRFHeader->StrucLength = *pDataLength; }

/* Function Name : PutPublication */ void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage, MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason ) { MQPMO pmo = { MQPMO_DEFAULT }; MQMD md = { MQMD_DEFAULT }; memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH); md.MsgType = MQMT_DATAGRAM; md.Persistence = MQPER_PERSISTENT; pmo.Options |= MQPMO_NEW_MSG_ID; MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason ); }
Листинг 10.1. Программа amqsgama
Закрыть окно


MQCHAR32 team1;

MQCHAR32 team2;

char QMName[MQ_Q_MGR_NAME_LENGTH+1] = "";

MQLONG randomNumber;

MQLONG ConnReason;

/* Проверка аргументов программы */

if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) )

{

printf("Usage: amqsgam team1 team2 \n");

printf(" Maximum 31 characters per team name,\n");

printf(" no spaces or '\"' characters allowed.\n");

exit(0);

}

else

{

strcpy(team1, argv[1]);

strcpy(team2, argv[2]);

}

/* Использовать default queue manager или заданный в зависимости от наличия аргумена */

if (argc > 3) strcpy(QMName, argv[3]);

MQCONN( QMName, &hConn, &CompCode, &ConnReason );

if( CompCode == MQCC_FAILED )

{

printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason);

}

else if( ConnReason == MQRC_ALREADY_CONNECTED )

{

CompCode = MQCC_OK;

}

if( CompCode == MQCC_OK )

{

strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH);

Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;

MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason );

if( CompCode != MQCC_OK )

{

printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n",

od.ObjectName, CompCode, Reason);

}

}

if( CompCode == MQCC_OK )

{

srand( (unsigned)(time( NULL ))

+ (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) );

timeRemaining = MATCH_LENGTH;

messageLength = DEFAULT_MESSAGE_SIZE;

pMessageBlock = (PMQBYTE)malloc(messageLength);

if( pMessageBlock == NULL )

{

printf("Unable to allocate storage\n");

}

else

{

if( CompCode == MQCC_OK )

{

/* создание MQRFH для публикации о начале матча */

BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED );

pTeams = (pMatch_Teams)(pMessageBlock + messageLength);

strcpy(pTeams->Team1, team1);

strcpy(pTeams->Team2, team2);

messageLength += sizeof(Match_Teams);

printf("Match between %s and %s\n", team1, team2);



/* помещение сообщения (публикации) в очередь потока */

PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );

if( CompCode != MQCC_OK )

{

printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);

}

else

{

/* Моделирование попытки забить один из 5 голов (каждые 30 сек) */

while( (timeRemaining > 0)&&(CompCode == MQCC_OK) )

{

randomNumber = rand();

delay = REAL_TIME_RATIO

+ ( (randomNumber * MATCH_LENGTH)

/ (RAND_MAX * AVERAGE_NUM_OF_GOALS));

if( delay > timeRemaining ) delay = timeRemaining;

msSleep(delay);

timeRemaining -= delay;

if( timeRemaining > 0 )

{

if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */

{

messageLength = DEFAULT_MESSAGE_SIZE;

BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE );

printf("GOAL! ");

pScoringTeam = (PMQCHAR)pMessageBlock + messageLength;

if( rand() < (RAND_MAX/2) )

{

strcpy(pScoringTeam, team1);

printf(team1);

}

else

{

strcpy(pScoringTeam, team2);

printf(team2);

}

printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO));

messageLength += sizeof(MQCHAR32);

/* помещение сообщения о забитом голе в очередь потока */

PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );

if( CompCode != MQCC_OK )

printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);

}

}

} /* конец цикла по времени окончания матча ( timeRemaining ) */

if( CompCode == MQCC_OK )

{

printf("Full time\n");

messageLength = DEFAULT_MESSAGE_SIZE;

BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED );

pTeams = (pMatch_Teams)(pMessageBlock + messageLength);

strcpy(pTeams->Team1, team1);

strcpy(pTeams->Team2, team2);

messageLength += sizeof(Match_Teams);

/* помещение сообщения о конце матча в очередь потока */

PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );



if( CompCode != MQCC_OK )

printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);

}

}

}

free( pMessageBlock );

} /* end of else (pMessageBlock != NULL) */

}

if( hObj != MQHO_UNUSABLE_HOBJ )

{

MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason );

if( CompCode != MQCC_OK )

printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason);

}

if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) )

{

MQDISC( &hConn, &CompCode, &Reason );

if( CompCode != MQCC_OK )

printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason);

}

return(0);

}

/* end of main Function */

/* Function Name : BuildMQRFHeader */

void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] )

{

PMQRFH pRFHeader = (PMQRFH)pStart;

PMQCHAR pNameValueString;

memset((PMQBYTE)pStart, 0, *pDataLength);

memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED);

memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH);

pRFHeader->CodedCharSetId = MQCCSI_INHERIT;

pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED;

strcpy(pNameValueString, MQPS_COMMAND_B);

strcat(pNameValueString, MQPS_PUBLISH);

strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B);

strcat(pNameValueString, MQPS_NO_REGISTRATION);

strcat(pNameValueString, MQPS_TOPIC_B);

strcat(pNameValueString, TOPIC_PREFIX);

strcat(pNameValueString, TopicType);

*pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16;

pRFHeader->StrucLength = *pDataLength;

}

/* Function Name : PutPublication */

void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage,

MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason )

{

MQPMO pmo = { MQPMO_DEFAULT };

MQMD md = { MQMD_DEFAULT };

memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH);

md.MsgType = MQMT_DATAGRAM;

md.Persistence = MQPER_PERSISTENT;

pmo.Options |= MQPMO_NEW_MSG_ID;

MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason );

}


Содержание раздела