File: NwsMemory.java

package info (click to toggle)
nws 2.11-3
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 2,700 kB
  • ctags: 2,820
  • sloc: ansic: 28,849; sh: 3,289; java: 1,205; makefile: 697; perl: 12
file content (258 lines) | stat: -rwxr-xr-x 8,545 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package edu.ucsb.cs.nws.nwsprotocol;

import java.io.*;
import java.net.*;


/**
 * The NwsMemory class implements the protocol specific to memory hosts --
 * fixed-length record storage and retrieval and expired storage purging.
 */
public class NwsMemory extends NwsHost {

  protected static final int STORE_STATE = NwsMessage.MEMORY_FIRST_MESSAGE;
  protected static final int STATE_STORED = STORE_STATE + 1;

  protected static final int FETCH_STATE = STATE_STORED + 1;
  protected static final int STATE_FETCHED = FETCH_STATE + 1;

  protected static final int AUTOFETCH_BEGIN = STATE_FETCHED + 1;
  protected static final int AUTOFETCH_ACK = AUTOFETCH_BEGIN + 1;

  protected static final int MEMORY_CLEAN = AUTOFETCH_ACK + 1;
  protected static final int MEMORY_CLEANED = MEMORY_CLEAN + 1;

  protected static final int MEMORY_LOGDEST = MEMORY_CLEANED + 1;
  protected static final int MEMORY_LOGDEST_ACK = MEMORY_LOGDEST + 1;
  protected static final int STORE_AND_REGISTER = MEMORY_LOGDEST_ACK + 1;

  protected static final int MEMORY_FAILED = NwsMessage.MEMORY_LAST_MESSAGE;


  /** See the constructor for NwsHost. */
  public NwsMemory(String hostName,
                   int hostPort,
                   boolean keepOpen) {
    super(hostName, hostPort, keepOpen);
  }


  /** See the constructor for NwsHost. */
  public NwsMemory(String hostName,
                   int hostPort) {
    super(hostName, hostPort);
  }


  /** See the constructor for NwsHost. */
  public NwsMemory(Socket s) {
    super(s);
  }


  /**
   * Requests that the memory begin automatically forwarding records each time
   * one is stored under any of the names listed in <i>names</i>.  Calling this
   * method terminates any previous autofetch request, so calling the method
   * with an empty string effectively ends autofetching activity.  Forwarded
   * records can be retrieved from the autoFetchCheck() method.  NOTE: it makes
   * no sense to call this method on a host for which the keepOpen constructor
   * parameter was false, since the connection will be closed before returning.
   */
  public void autoFetchBegin(String[] names) throws Exception {
    String allNames = new String();
    messagePrelude();
    for(int i = 0; i < names.length; i++) {
      allNames = allNames + names[i] + "\t";
    }
    allNames = allNames.substring(0, allNames.length() - 2);
    NwsMessage.send
      (connection, AUTOFETCH_BEGIN, new CString(allNames).toBytes());
    NwsMessage.receive(connection, this);
    messagePostlude();
  }


  /**
   * Checks to see if any records have arrived from the host since the
   * previous call.  If so, returns a string consisting of the record appended
   * to the name under which it was stored; if not, returns null.
   */
  public String autoFetchCheck() throws Exception {
    State fetchedState;
    if(connection.getInputStream().available() == 0)
      return null;
    fetchedState = (State)NwsMessage.receive(connection, this);
    return fetchedState.id + " " + fetchedState.contents[0];
  }


  /**
   * Requests that the memory delete any stored files which have not been
   * accessed or modified in the last <i>idle</i> seconds.
   */
  public void clean(int idle) throws Exception {
    messagePrelude();
    NwsMessage.send(connection, MEMORY_CLEAN, NwsMessage.toBytes(idle));
    NwsMessage.receive(connection, this);
    messagePostlude();
  }


  /** Returns <i>howMany</i> records most recently stored under <i>name</i>. */
  public String[] retrieve(String name,
                           int howMany) throws Exception {
    State fetchedState;
    State requestState = new State(name, howMany);
    messagePrelude();
    NwsMessage.send(connection, FETCH_STATE, requestState.toBytes());
    fetchedState = (State)NwsMessage.receive(connection, this);
    messagePostlude();
    return fetchedState.contents;
  }


  /**
   * Stores <i>contents</i> under <i>name</i> with each element padded with
   * spaces to <i>recordSize</i> characters.
   */
  public void store(String name,
                    int recordSize,
                    String[] contents) throws Exception {
    State storeState = new State(name, recordSize, contents);
    messagePrelude();
    NwsMessage.send(connection, STORE_STATE, storeState.toBytes());
    NwsMessage.receive(connection, this);
    messagePostlude();
  }




  /** See NwsMessage. */
  public Object receive(int message,
                        DataInputStream stream,
                        int dataLength) throws Exception {

	/* Who wrote this abortion? FIXME for a logical control flow.
 	* Remember that NwsMessage.receive strips the header and calls this method
 	* to handle domain-specific messages.  Implement new messages here.
	*/
	
    return (message == AUTOFETCH_ACK) ? null :
           (message == MEMORY_CLEANED) ? null :
           (message == STATE_FETCHED) ? new State(stream) :
           (message == STATE_STORED) ? null :
           super.receive(message, stream, dataLength);
  }


  /**
   * The main() method for this class is a small test program that takes a
   * memory specification and a state name and prints the newest 1000 records.
   */
  public static void main(String[] args) {
    String[] contents;
    if(args.length != 2) {
      System.err.println("Usage: NwsMemory <host> <state>");
      System.exit(1);
    }
    try {
      contents = new NwsMemory(args[0], 8050).retrieve(args[1], 1000);
      for(int i = 0; i < contents.length; i++)
        System.out.println(contents[i].trim());
    } catch(Exception x) { System.err.println(x.toString()); }
  }




  /** The state record transmitted with memory store/fetch requests. */
  protected static class State {

    /** The state name. */
    public String id;
    /** The size of each (fixed-length) record. */
    public int recSize;
    /** The count of records enclosed (store) or to retrieve (fetch) */
    public int recCount;
    /**
     * The generation time of the enclosed records (store) or the oldest to
     * retrieve (fetch).  Presently unused here.
     */
    public double seqNo;
    /** How long to keep the enclosed records (store).  Unused here. */
    public double timeOut;
    /** The records.  Empty for retrieval requests. */
    public String[] contents;

    protected static final int STATE_NAME_SIZE = 128;

    /** Produces a state to get <i>recCount</i> records from state <i>id</i>. */
    public State(String id,
                 int recCount) {
      this.id = id;
      recSize = 0;
      this.recCount = recCount;
      seqNo = 0.0;
      timeOut = 0.0;
      contents = null;
    }

    /**
     * Produces a state to store <i>contents</i>, padded to <i>recSize</i>,
     * under <i>id</i>.
     */
    public State(String id,
                 int recSize,
                 String[] contents) {
      int i;
      String padding = new String();
      this.id = id;
      this.recSize = recSize;
      recCount = contents.length;
      seqNo = System.currentTimeMillis() / 1000;
      timeOut = 315360000.0; /* 10 years */
      this.contents = new String[recCount];
      for(i = 0; i < recSize; i++)
        padding = padding + " ";
      for(i = 0; i < recCount; i++) {
        this.contents[i] =
          (contents[i].length() >= recSize) ?
          contents[i].substring(0, recSize) :
          contents[i] + padding.substring(0, recSize - contents[i].length());
      }
    }

    /** Produces a state initialized by reading <i>stream</i>. */
    public State(DataInputStream stream) throws Exception {
      id = new CString(STATE_NAME_SIZE, stream).toString();
      recSize = stream.readInt();
      recCount = stream.readInt();
      seqNo = stream.readDouble();
      timeOut = stream.readDouble();
      contents = new String[recCount];
      for(int i = 0; i < recCount; i++)
        contents[i] = new CString(recSize, stream).toString();
    }

    /** Returns the fields of the State converted to a byte array. */
    public byte[] toBytes() {
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
      DataOutputStream dataStream = new DataOutputStream(byteStream);
      try {
        dataStream.write(new CString(STATE_NAME_SIZE, id).toBytes());
        dataStream.writeInt(recSize);
        dataStream.writeInt(recCount);
        dataStream.writeDouble(seqNo);
        dataStream.writeDouble(timeOut);
        for(int i = 0; i < recCount; i++)
          dataStream.writeBytes(contents[i]);
      } catch(Exception x) { }
      return byteStream.toByteArray();
    }

  }


}