File: NwsSensor.java

package info (click to toggle)
nws 2.11-3
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 2,700 kB
  • ctags: 2,820
  • sloc: ansic: 28,849; sh: 3,289; java: 1,205; makefile: 697; perl: 12
file content (253 lines) | stat: -rwxr-xr-x 8,538 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package edu.ucsb.cs.nws.nwsprotocol;

import java.io.*;
import java.net.*;


/**
 * The NwsSensor class implements the protocol specific to NWS sensors --
 * activity starting and stopping and experiment requests.  It does not
 * presently support clique protocol requests.
 */
public class NwsSensor extends NwsHost {


  /** See the constructor for NwsHost. */
  public NwsSensor(String hostName,
                   int hostPort,
                   boolean keepOpen) {
    super(hostName, hostPort, keepOpen);
  }


  /** See the constructor for NwsHost. */
  public NwsSensor(String hostName,
                   int hostPort) {
    super(hostName, hostPort);
  }


  /** See the constructor for NwsHost. */
  public NwsSensor(Socket s) {
    super(s);
  }


  /** Information returned by the tcpMessageTest() method. */
  public class TcpMessageTestResult {
    /** Bandwidth to the sensor in megabits/second. */
    public double bandwidth;
    /**
     * Latency in milliseconds.  Because Java provides no means to track
     * elapsed time in units finer than milliseconds, this value is truncated
     * (i.e. it will always be a whole number).
     */
    public double latency;
    /** Produces a TcpMessageTestResult with the specified field values. */
    public TcpMessageTestResult(double bandwidth,
                                double latency) {
      this.bandwidth = bandwidth;
      this.latency = latency;
    }
  }


  /** Requests that the sensor halt the activity registered as <i>name</i>. */
  public void haltActivity(String name) throws Exception {
    messagePrelude();
    NwsMessage.send(connection, ACTIVITY_STOP, new CString(name).toBytes());
    NwsMessage.receive(connection, this);
    messagePostlude();
  }


  /**
   * Requests that the sensor begin a new activity which applies <i>skill</i>,
   * under <i>control</i> and parameterized by <i>options</i>, and register it
   * as <i>name</i>.
   */
  public void startActivity(String name,
                            String control,
                            String skill,
                            NwsNameServer.Attribute[] options) throws Exception{
    byte[][] allBytes = {new CString(name).toBytes(),
                         new CString(control).toBytes(),
                         new CString(skill).toBytes(),
                         null};
    String allOptions = new String();
    for(int i = 0; i < options.length; i++) {
      allOptions = allOptions + options[i].toString() + "\t";
    }
    allBytes[3] = new CString(allOptions).toBytes();
    messagePrelude();
    NwsMessage.send
      (connection, ACTIVITY_START, NwsMessage.concatenateBytes(allBytes));
    NwsMessage.receive(connection, this);
    messagePostlude();
  }


  /**
   * Performs a TCP connection test with the sensor and returns the number of
   * milliseconds it takes to complete.
   */
  public long tcpConnectionTest() throws Exception {
    long startTime = System.currentTimeMillis();
    messagePrelude();
    long endTime = System.currentTimeMillis();
    messagePostlude();
    return endTime - startTime;
  }


  /**
   * Returns the result of a TCP message test, parameterized by
   * <i>experimentSize</i>, <i>bufferSize</i>, and <i>messageSize</i>,
   * performed with the sensor.
   */
  public TcpMessageTestResult tcpMessageTest(int experimentSize,
                                             int bufferSize,
                                             int messageSize) throws Exception {
    byte[][] allBytes = {NwsMessage.toBytes(experimentSize),
                         NwsMessage.toBytes(bufferSize),
                         NwsMessage.toBytes(messageSize)};

    long bandwidthFinish;
    long bandwidthStart;
    Socket expConnection;
    Handshake handshake;
    long latencyFinish;
    long latencyStart;
    int leftToSend = experimentSize;
    byte[] payload = new byte[messageSize];
    TcpMessageTestResult returnValue = new TcpMessageTestResult(0.0, 0.0);
    long startTime;

    messagePrelude();

    NwsMessage.send
      (connection, TCP_BW_REQ, NwsMessage.concatenateBytes(allBytes));
    handshake = (Handshake)NwsMessage.receive(connection, this);
    /*
     * Note: Java provides no way to set the socket send buffer size.  Even the
     * setSendBufferSize() method added in 1.2 supports only UDP sockets.
     */
    expConnection = new Socket(connection.getInetAddress(), handshake.port);

    latencyStart = System.currentTimeMillis();
    expConnection.getOutputStream().write(0);
    expConnection.getInputStream().read();
    latencyFinish = System.currentTimeMillis();
    returnValue.latency = (double)(latencyFinish - latencyStart);

    bandwidthStart = System.currentTimeMillis();
    while(leftToSend > messageSize) {
      expConnection.getOutputStream().write(payload);
      leftToSend -= messageSize;
    }
    expConnection.getOutputStream().write(payload, 0, leftToSend);
    expConnection.getInputStream().read();
    bandwidthFinish = System.currentTimeMillis();
    expConnection.close();
    returnValue.bandwidth = ((double)experimentSize * 8.0) /
                            (double)(bandwidthFinish-bandwidthStart) / 1000.0;

    messagePostlude();
    return returnValue;

  }


  /** The record transmitted in response to a TCP_BW_REQ. */
  protected static class Handshake {

    /** Reserved for future use. */
    public int address;
    /** The port to connect to. */
    public short port;

    /** Produces a Handshake from the parameters. */
    public Handshake(int address,
                     short port) {
      this.address = address;
      this.port = port;
    }

    /** Produces a Handshake initialized by reading <i>stream</i>. */
    public Handshake(DataInputStream stream) throws Exception {
      address = stream.readInt();
      port = stream.readShort();
    }

  }


  protected static final int ACTIVITY_START = NwsMessage.SENSOR_FIRST_MESSAGE;
  protected static final int ACTIVITY_STARTED = ACTIVITY_START + 1;

  protected static final int ACTIVITY_STOP =  ACTIVITY_STARTED + 1;
  protected static final int ACTIVITY_STOPPED = ACTIVITY_STOP + 1;

  protected static final int SENSOR_FAILED = NwsMessage.SENSOR_LAST_MESSAGE;

  protected static final int CLIQUE_ACTIVATE = NwsMessage.CLIQUE_FIRST_MESSAGE;
  protected static final int CLIQUE_DIE = CLIQUE_ACTIVATE + 1;
  protected static final int CLIQUE_ACK = CLIQUE_DIE + 1;
  protected static final int CLIQUE_TOKEN_FWD = CLIQUE_ACK + 1;
  protected static final int CLIQUE_SERIES = CLIQUE_TOKEN_FWD + 1;
  protected static final int CLIQUE_EXPERIMENT = CLIQUE_SERIES + 1;
  protected static final int CLIQUE_NEW_MEMBER = CLIQUE_EXPERIMENT + 1;
  protected static final int CLIQUE_REMOVE_MEMBER = CLIQUE_NEW_MEMBER + 1;


  protected static final int TCP_BW_REQ = NwsMessage.SKILLS_FIRST_MESSAGE;
  protected static final int SKILL_EXPERIMENT = TCP_BW_REQ + 1;
  protected static final int SKILL_RESULT = SKILL_EXPERIMENT + 1;
  protected static final int SKILL_FAILED = NwsMessage.SKILLS_LAST_MESSAGE;
  protected static final int TCP_HANDSHAKE = 411;


  /** See NwsMessage. */
  public Object receive(int message,
                        DataInputStream stream,
                        int dataLength) throws Exception {
    return (message == ACTIVITY_STARTED) ? null :
           (message == ACTIVITY_STOPPED) ? null :
           (message == TCP_HANDSHAKE) ? new Handshake(stream) :
           super.receive(message, stream, dataLength);
  }


  /**
   * The main() method for this class is a small test program that takes a list
   * of sensor specifications and prints the results of a standard TCP message
   * test for each of them.
   */
  public static void main(String[] args) {

    final int ONE_K = 1024;
    int bufferSize = 32 * ONE_K;
    int experimentSize = 64 * ONE_K;
    int messageSize = 16 * ONE_K;
    NwsSensor sensor;
    TcpMessageTestResult result;

    for(int i = 0; i < args.length; i++) {
      sensor = new NwsSensor(args[i], 8060);
      System.out.print("(" + experimentSize / ONE_K + "k" +
                       "," + bufferSize / ONE_K + "k" +
                       "," + messageSize / ONE_K + "k" +
                       ") to " + sensor.toString() + ":");
      try {
        result = sensor.tcpMessageTest(experimentSize, bufferSize, messageSize);
        System.out.println(" bandwidth: " + result.bandwidth +
                           " latency: " + result.latency);
      } catch(Exception x) {
        System.out.println(" failed");
      }
    }

  }


}