MQLONG CompCode; MQLONG Reason; MQOD
/*************************************************************************************/ /* Имя программы: AMQSGAMA */ /* Описание: Основанная на модели Publish/Subscribe программа */ /* моделирует результаты футбольного матча и */ /* отправляет их от издателя к брокеру */ /* Statement: Licensed Materials - Property of IBM */ /* SupportPac MA0E */ /* (C) Copyright IBM Corp. 1999 */ /*************************************************************************************/ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <cmqc.h> /* MQI */ #include <cmqpsc.h> /* MQI Publish/Subscribe */ #include <windows.h> #if MQAT_DEFAULT == MQAT_WINDOWS_NT #define msSleep(time) \ Sleep(time) #elif MQAT_DEFAULT == MQAT_UNIX #define msSleep(time) \ { \ struct timeval tval; \ tval.tv_sec = time / 1000; \ tval.tv_usec = (time % 1000) * 1000; \ select(0, NULL, NULL, NULL, &tval); \ } #endif #define STREAM "SAMPLE.BROKER.RESULTS.STREAM" #define TOPIC_PREFIX "Sport/Soccer/Event/" #define MATCH_STARTED "MatchStarted" #define MATCH_ENDED "MatchEnded" #define SCORE_UPDATE "ScoreUpdate" #define MATCH_LENGTH 30000 /* 30 Second match length */ #define REAL_TIME_RATIO 333 #define AVERAGE_NUM_OF_GOALS 5 #define DEFAULT_MESSAGE_SIZE 512 /* Maximum buffer size for a message */ static const MQRFH DefaultMQRFH = {MQRFH_DEFAULT}; typedef struct { MQCHAR32 Team1; MQCHAR32 Team2; } Match_Teams, *pMatch_Teams; void BuildMQRFHeader( PMQBYTE pStart , PMQLONG pDataLength , MQCHAR TopicType[] ); void PutPublication( MQHCONN hConn , MQHOBJ hObj , PMQBYTE pMessage , MQLONG messageLength , PMQLONG pCompCode , PMQLONG pReason ); int main(int argc, char **argv) { MQHCONN hConn = MQHC_UNUSABLE_HCONN; MQHOBJ hObj = MQHO_UNUSABLE_HOBJ; MQLONG CompCode; MQLONG Reason; MQOD od = { MQOD_DEFAULT }; MQLONG Options; PMQBYTE pMessageBlock = NULL; MQLONG messageLength; MQLONG timeRemaining; MQLONG delay; PMQCHAR pScoringTeam; pMatch_Teams pTeams; MQCHAR32 team1; MQCHAR32 team2; char QMName[MQ_Q_MGR_NAME_LENGTH+1] = ""; MQLONG randomNumber; MQLONG ConnReason; /* Проверка аргументов программы */ if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) ) { printf("Usage: amqsgam team1 team2 <QManager>\n"); printf(" Maximum 31 characters per team name,\n"); printf(" no spaces or '\"' characters allowed.\n"); exit(0); } else { strcpy(team1, argv[1]); strcpy(team2, argv[2]); } /* Использовать default queue manager или заданный в зависимости от наличия аргумена */ if (argc > 3) strcpy(QMName, argv[3]); MQCONN( QMName, &hConn, &CompCode, &ConnReason ); if( CompCode == MQCC_FAILED ) { printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason); } else if( ConnReason == MQRC_ALREADY_CONNECTED ) { CompCode = MQCC_OK; } if( CompCode == MQCC_OK ) { strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH); Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING; MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n", od.ObjectName, CompCode, Reason); } } if( CompCode == MQCC_OK ) { srand( (unsigned)(time( NULL )) + (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) ); timeRemaining = MATCH_LENGTH; messageLength = DEFAULT_MESSAGE_SIZE; pMessageBlock = (PMQBYTE)malloc(messageLength); if( pMessageBlock == NULL ) { printf("Unable to allocate storage\n"); } else { if( CompCode == MQCC_OK ) { /* создание MQRFH для публикации о начале матча */ BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED ); pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams); printf("Match between %s and %s\n", team1, team2); /* помещение сообщения (публикации) в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } else { /* Моделирование попытки забить один из 5 голов (каждые 30 сек) */ while( (timeRemaining > 0)&&(CompCode == MQCC_OK) ) { randomNumber = rand(); delay = REAL_TIME_RATIO + ( (randomNumber * MATCH_LENGTH) / (RAND_MAX * AVERAGE_NUM_OF_GOALS)); if( delay > timeRemaining ) delay = timeRemaining; msSleep(delay); timeRemaining -= delay; if( timeRemaining > 0 ) { if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */ { messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE ); printf("GOAL! "); pScoringTeam = (PMQCHAR)pMessageBlock + messageLength; if( rand() < (RAND_MAX/2) ) { strcpy(pScoringTeam, team1); printf(team1); } else { strcpy(pScoringTeam, team2); printf(team2); } printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO)); messageLength += sizeof(MQCHAR32); /* помещение сообщения о забитом голе в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } /* конец цикла по времени окончания матча ( timeRemaining ) */ if( CompCode == MQCC_OK ) { printf("Full time\n"); messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED ); pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams); /* помещение сообщения о конце матча в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } free( pMessageBlock ); } /* end of else (pMessageBlock != NULL) */ } if( hObj != MQHO_UNUSABLE_HOBJ ) { MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason ); if( CompCode != MQCC_OK ) printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason); } if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) ) { MQDISC( &hConn, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason); } return(0); } /* end of main Function */ /* Function Name : BuildMQRFHeader */ void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] ) { PMQRFH pRFHeader = (PMQRFH)pStart; PMQCHAR pNameValueString; memset((PMQBYTE)pStart, 0, *pDataLength); memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED); memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH); pRFHeader->CodedCharSetId = MQCCSI_INHERIT; pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED; strcpy(pNameValueString, MQPS_COMMAND_B); strcat(pNameValueString, MQPS_PUBLISH); strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B); strcat(pNameValueString, MQPS_NO_REGISTRATION); strcat(pNameValueString, MQPS_TOPIC_B); strcat(pNameValueString, TOPIC_PREFIX); strcat(pNameValueString, TopicType); *pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16; pRFHeader->StrucLength = *pDataLength; } /* Function Name : PutPublication */ void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage, MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason ) { MQPMO pmo = { MQPMO_DEFAULT }; MQMD md = { MQMD_DEFAULT }; memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH); md.MsgType = MQMT_DATAGRAM; md.Persistence = MQPER_PERSISTENT; pmo.Options |= MQPMO_NEW_MSG_ID; MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason ); } |
Листинг 10.1. Программа amqsgama |
Закрыть окно |
MQCHAR32 team1;
MQCHAR32 team2;
char QMName[MQ_Q_MGR_NAME_LENGTH+1] = "";
MQLONG randomNumber;
MQLONG ConnReason;
/* Проверка аргументов программы */
if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) )
{
printf("Usage: amqsgam team1 team2 \n");
printf(" Maximum 31 characters per team name,\n");
printf(" no spaces or '\"' characters allowed.\n");
exit(0);
}
else
{
strcpy(team1, argv[1]);
strcpy(team2, argv[2]);
}
/* Использовать default queue manager или заданный в зависимости от наличия аргумена */
if (argc > 3) strcpy(QMName, argv[3]);
MQCONN( QMName, &hConn, &CompCode, &ConnReason );
if( CompCode == MQCC_FAILED )
{
printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason);
}
else if( ConnReason == MQRC_ALREADY_CONNECTED )
{
CompCode = MQCC_OK;
}
if( CompCode == MQCC_OK )
{
strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH);
Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason );
if( CompCode != MQCC_OK )
{
printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n",
od.ObjectName, CompCode, Reason);
}
}
if( CompCode == MQCC_OK )
{
srand( (unsigned)(time( NULL ))
+ (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) );
timeRemaining = MATCH_LENGTH;
messageLength = DEFAULT_MESSAGE_SIZE;
pMessageBlock = (PMQBYTE)malloc(messageLength);
if( pMessageBlock == NULL )
{
printf("Unable to allocate storage\n");
}
else
{
if( CompCode == MQCC_OK )
{
/* создание MQRFH для публикации о начале матча */
BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED );
pTeams = (pMatch_Teams)(pMessageBlock + messageLength);
strcpy(pTeams->Team1, team1);
strcpy(pTeams->Team2, team2);
messageLength += sizeof(Match_Teams);
printf("Match between %s and %s\n", team1, team2);
/* помещение сообщения (публикации) в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
{
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
else
{
/* Моделирование попытки забить один из 5 голов (каждые 30 сек) */
while( (timeRemaining > 0)&&(CompCode == MQCC_OK) )
{
randomNumber = rand();
delay = REAL_TIME_RATIO
+ ( (randomNumber * MATCH_LENGTH)
/ (RAND_MAX * AVERAGE_NUM_OF_GOALS));
if( delay > timeRemaining ) delay = timeRemaining;
msSleep(delay);
timeRemaining -= delay;
if( timeRemaining > 0 )
{
if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */
{
messageLength = DEFAULT_MESSAGE_SIZE;
BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE );
printf("GOAL! ");
pScoringTeam = (PMQCHAR)pMessageBlock + messageLength;
if( rand() < (RAND_MAX/2) )
{
strcpy(pScoringTeam, team1);
printf(team1);
}
else
{
strcpy(pScoringTeam, team2);
printf(team2);
}
printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO));
messageLength += sizeof(MQCHAR32);
/* помещение сообщения о забитом голе в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
}
} /* конец цикла по времени окончания матча ( timeRemaining ) */
if( CompCode == MQCC_OK )
{
printf("Full time\n");
messageLength = DEFAULT_MESSAGE_SIZE;
BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED );
pTeams = (pMatch_Teams)(pMessageBlock + messageLength);
strcpy(pTeams->Team1, team1);
strcpy(pTeams->Team2, team2);
messageLength += sizeof(Match_Teams);
/* помещение сообщения о конце матча в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
}
}
free( pMessageBlock );
} /* end of else (pMessageBlock != NULL) */
}
if( hObj != MQHO_UNUSABLE_HOBJ )
{
MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason );
if( CompCode != MQCC_OK )
printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) )
{
MQDISC( &hConn, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
return(0);
}
/* end of main Function */
/* Function Name : BuildMQRFHeader */
void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] )
{
PMQRFH pRFHeader = (PMQRFH)pStart;
PMQCHAR pNameValueString;
memset((PMQBYTE)pStart, 0, *pDataLength);
memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED);
memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH);
pRFHeader->CodedCharSetId = MQCCSI_INHERIT;
pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED;
strcpy(pNameValueString, MQPS_COMMAND_B);
strcat(pNameValueString, MQPS_PUBLISH);
strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B);
strcat(pNameValueString, MQPS_NO_REGISTRATION);
strcat(pNameValueString, MQPS_TOPIC_B);
strcat(pNameValueString, TOPIC_PREFIX);
strcat(pNameValueString, TopicType);
*pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16;
pRFHeader->StrucLength = *pDataLength;
}
/* Function Name : PutPublication */
void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage,
MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason )
{
MQPMO pmo = { MQPMO_DEFAULT };
MQMD md = { MQMD_DEFAULT };
memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH);
md.MsgType = MQMT_DATAGRAM;
md.Persistence = MQPER_PERSISTENT;
pmo.Options |= MQPMO_NEW_MSG_ID;
MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason );
}