1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
/*
pmqlsi.c: LTP PMQ-based link service daemon. Receives
LTP segments via a POSIX message queue.
Author: Scott Burleigh, JPL
Copyright (c) 2010, California Institute of Technology.
ALL RIGHTS RESERVED. U.S. Government Sponsorship
acknowledged.
*/
#include "pmqlsa.h"
static void interruptThread()
{
isignal(SIGTERM, interruptThread);
}
/* * * Receiver thread functions * * */
typedef struct
{
mqd_t mq;
pthread_t mainThread;
int running;
} ReceiverThreadParms;
static void *handleMessages(void *parm)
{
/* Main loop for message reception and handling. */
ReceiverThreadParms *rtp = (ReceiverThreadParms *) parm;
int segLength;
char msgbuf[PMQLSA_MSGSIZE];
unsigned int mqp; /* Priority of rec'd msg. */
iblock(SIGTERM);
while (rtp->running)
{
segLength = mq_receive(rtp->mq, msgbuf, sizeof msgbuf, &mqp);
switch (segLength)
{
case 1: /* Normal stop. */
continue;
case -1:
putSysErrmsg("pmqlsi failed receiving msg", NULL);
pthread_kill(rtp->mainThread, SIGTERM);
rtp->running = 0;
continue;
}
if (ltpHandleInboundSegment(msgbuf, segLength) < 0)
{
putErrmsg("Can't handle inbound segment.", NULL);
pthread_kill(rtp->mainThread, SIGTERM);
rtp->running = 0;
continue;
}
/* Make sure other tasks have a chance to run. */
sm_TaskYield();
}
writeErrmsgMemos();
writeMemo("[i] pmqlsi receiver thread has ended.");
return NULL;
}
/* * * Main thread functions * * * */
#if defined (VXWORKS) || defined (RTEMS)
int pmqlsi(int a1, int a2, int a3, int a4, int a5,
int a6, int a7, int a8, int a9, int a10)
{
char *mqName = (char *) a1;
#else
int main(int argc, char *argv[])
{
char *mqName = (argc > 1 ? argv[1] : NULL);
#endif
LtpVdb *vdb;
struct mq_attr mqAttributes =
{ 0, PMQLSA_MAXMSG, PMQLSA_MSGSIZE, 0 };
ReceiverThreadParms rtp;
pthread_t receiverThread;
char stop = '0';
if (mqName == NULL)
{
puts("Usage: pmqlsi <message queue name>");
return 0;
}
/* Note that ltpadmin must be run before the first
* invocation of ltplsi, to initialize the LTP database
* (as necessary) and dynamic database. */
if (ltpInit(0) < 0)
{
putErrmsg("pmqlsi can't initialize LTP.", NULL);
return 1;
}
vdb = getLtpVdb();
if (vdb->lsiPid > 0 && vdb->lsiPid != sm_TaskIdSelf())
{
putErrmsg("LSI task is already started.", itoa(vdb->lsiPid));
return 1;
}
/* All command-line arguments are now validated. */
rtp.mq = mq_open(mqName, O_RDWR | O_CREAT, 0777, &mqAttributes);
if (rtp.mq == (mqd_t) -1)
{
putSysErrmsg("pmglsi can't open message queue", mqName);
return 1;
}
/* Set up signal handling; SIGTERM is shutdown signal. */
isignal(SIGTERM, interruptThread);
/* Start the receiver thread. */
rtp.running = 1;
rtp.mainThread = pthread_self();
if (pthread_create(&receiverThread, NULL, handleMessages, &rtp))
{
mq_close(rtp.mq);
putSysErrmsg("pmqlsi can't create receiver thread", NULL);
return 1;
}
/* Now sleep until interrupted by SIGTERM, at which point
* it's time to stop the link service. */
writeMemo("[i] pmqlsi is running");
snooze(2000000000);
/* Time to shut down. */
rtp.running = 0;
mq_send(rtp.mq, &stop, 1, 0); /* Tell receiver to stop. */
pthread_join(receiverThread, NULL);
mq_close(rtp.mq);
writeErrmsgMemos();
writeMemo("[i] pmqlsi duct has ended.");
return 0;
}
|