File: AMQueueImplP.nc

package info (click to toggle)
tinyos 2.1.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, jessie, jessie-kfreebsd, stretch
  • size: 47,476 kB
  • ctags: 36,607
  • sloc: ansic: 63,646; cpp: 14,974; java: 10,358; python: 5,215; makefile: 1,724; sh: 902; asm: 597; xml: 392; perl: 74; awk: 46
file content (221 lines) | stat: -rw-r--r-- 7,639 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// $Id: AMQueueImplP.nc,v 1.11 2010-06-29 22:07:56 scipio Exp $
/*
* Copyright (c) 2005 Stanford University. All rights reserved.
*
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 * - Redistributions in binary form must reproduce the above copyright
 *   notice, this list of conditions and the following disclaimer in the
 *   documentation and/or other materials provided with the
 *   distribution.
 * - Neither the name of the copyright holders nor the names of
 *   its contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
 * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 * OF THE POSSIBILITY OF SUCH DAMAGE.
*/

/**
 * An AM send queue that provides a Service Instance pattern for
 * formatted packets and calls an underlying AMSend in a round-robin
 * fashion. Used to share L2 bandwidth between different communication
 * clients.
 *
 * @author Philip Levis
 * @date   Jan 16 2006
 */ 

#include "AM.h"

generic module AMQueueImplP(int numClients) @safe() {
    provides interface Send[uint8_t client];
    uses{
        interface AMSend[am_id_t id];
        interface AMPacket;
        interface Packet;
    }
}

implementation {
    typedef struct {
        message_t* ONE_NOK msg;
    } queue_entry_t;
  
    uint8_t current = numClients; // mark as empty
    queue_entry_t queue[numClients];
    uint8_t cancelMask[numClients/8 + 1];

    void tryToSend();
  
    void nextPacket() {
        uint8_t i;
        current = (current + 1) % numClients;
        for(i = 0; i < numClients; i++) {
            if((queue[current].msg == NULL) ||
               (cancelMask[current/8] & (1 << current%8)))
            {
                current = (current + 1) % numClients;
            }
            else {
                break;
            }
        }
        if(i >= numClients) current = numClients;
    }

    /**
     * Accepts a properly formatted AM packet for later sending.
     * Assumes that someone has filled in the AM packet fields
     * (destination, AM type).
     *
     * @param msg - the message to send
     * @param len - the length of the payload
     *
     */
    command error_t Send.send[uint8_t clientId](message_t* msg,
                                                uint8_t len) {
        if (clientId >= numClients) {
            return FAIL;
        }
        if (queue[clientId].msg != NULL) {
            return EBUSY;
        }
        dbg("AMQueue", "AMQueue: request to send from %hhu (%p): passed checks\n", clientId, msg);
        
        queue[clientId].msg = msg;
        call Packet.setPayloadLength(msg, len);
    
        if (current >= numClients) { // queue empty
            error_t err;
            am_id_t amId = call AMPacket.type(msg);
            am_addr_t dest = call AMPacket.destination(msg);
      
            dbg("AMQueue", "%s: request to send from %hhu (%p): queue empty\n", __FUNCTION__, clientId, msg);
            current = clientId;
            
            err = call AMSend.send[amId](dest, msg, len);
            if (err != SUCCESS) {
                dbg("AMQueue", "%s: underlying send failed.\n", __FUNCTION__);
                current = numClients;
                queue[clientId].msg = NULL;
                
            }
            return err;
        }
        else {
            dbg("AMQueue", "AMQueue: request to send from %hhu (%p): queue not empty\n", clientId, msg);
        }
        return SUCCESS;
    }

    task void CancelTask() {
        uint8_t i,j,mask,last;
        message_t *msg;
        for(i = 0; i < numClients/8 + 1; i++) {
            if(cancelMask[i]) {
                for(mask = 1, j = 0; j < 8; j++) {
                    if(cancelMask[i] & mask) {
                        last = i*8 + j;
                        msg = queue[last].msg;
                        queue[last].msg = NULL;
                        cancelMask[i] &= ~mask;
                        signal Send.sendDone[last](msg, ECANCEL);
                    }
                    mask <<= 1;
                }
            }
        }
    }
    
    command error_t Send.cancel[uint8_t clientId](message_t* msg) {
        if (clientId >= numClients ||         // Not a valid client    
            queue[clientId].msg == NULL ||    // No packet pending
            queue[clientId].msg != msg) {     // Not the right packet
            return FAIL;
        }
        if(current == clientId) {
            am_id_t amId = call AMPacket.type(msg);
            error_t err = call AMSend.cancel[amId](msg);
            return err;
        }
        else {
            cancelMask[clientId/8] |= 1 << clientId % 8;
            post CancelTask();
            return SUCCESS;
        }
    }

    void sendDone(uint8_t last, message_t * ONE msg, error_t err) {
        queue[last].msg = NULL;
        tryToSend();
        signal Send.sendDone[last](msg, err);
    }

    task void errorTask() {
        sendDone(current, queue[current].msg, FAIL);
    }

    // NOTE: Increments current!
    void tryToSend() {
        nextPacket();
        if (current < numClients) { // queue not empty
            error_t nextErr;
            message_t* nextMsg = queue[current].msg;
            am_id_t nextId = call AMPacket.type(nextMsg);
            am_addr_t nextDest = call AMPacket.destination(nextMsg);
            uint8_t len = call Packet.payloadLength(nextMsg);
            nextErr = call AMSend.send[nextId](nextDest, nextMsg, len);
            if(nextErr != SUCCESS) {
                post errorTask();
            }
        }
    }
  
    event void AMSend.sendDone[am_id_t id](message_t* msg, error_t err) {
      // Bug fix from John Regehr: if the underlying radio mixes things
      // up, we don't want to read memory incorrectly. This can occur
      // on the mica2.
      // Note that since all AM packets go through this queue, this
      // means that the radio has a problem. -pal
      if (current >= numClients) {
	return;
      }
      if(queue[current].msg == msg) {
	sendDone(current, msg, err);
      }
      else {
	dbg("PointerBug", "%s received send done for %p, signaling for %p.\n",
	    __FUNCTION__, msg, queue[current].msg);
      }
    }
    
    command uint8_t Send.maxPayloadLength[uint8_t id]() {
        return call AMSend.maxPayloadLength[0]();
    }

    command void* Send.getPayload[uint8_t id](message_t* m, uint8_t len) {
      return call AMSend.getPayload[0](m, len);
    }

    default event void Send.sendDone[uint8_t id](message_t* msg, error_t err) {
        // Do nothing
    }
    default command error_t AMSend.send[uint8_t id](am_addr_t am_id, message_t* msg, uint8_t len) {
        return FAIL;
    }
}