File: NormStreamSend.java

package info (click to toggle)
norm 1.5.9%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,680 kB
  • sloc: cpp: 123,494; xml: 7,536; tcl: 5,460; makefile: 3,442; python: 1,898; java: 1,750; ansic: 642; sh: 21; csh: 8
file content (110 lines) | stat: -rw-r--r-- 3,009 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.IOException;
import mil.navy.nrl.norm.NormEvent;
import mil.navy.nrl.norm.NormInstance;
import mil.navy.nrl.norm.NormNode;
import mil.navy.nrl.norm.NormSession;
import mil.navy.nrl.norm.NormStream;
import mil.navy.nrl.norm.enums.NormEventType;

public class NormStreamSend {
	static final long REPAIR_WINDOW_SIZE = 1024 * 1024;
	static final long SESSION_BUFFER_SIZE = 1024 * 1024;
	static final int SEGMENT_SIZE = 1400;
	static final int BLOCK_SIZE = 64;
	static final int PARITY_SEGMENTS = 16;
	static final String DEST_ADDRESS = "224.1.2.3";
	static final int DEST_PORT = 6003;

	public static void main(String[] args) {
		NormInstance instance = null;
		NormSession session = null;
		NormStream stream = null;
		String destAddress = DEST_ADDRESS;
		int destPort = DEST_PORT;

		try {
			int length = 0;
			int offset = 0;
			byte[] buf = new byte[65536];

			if (args.length > 0) {
				destAddress = args[0];
			}

			if (args.length > 1) {
				destPort = Integer.parseInt(args[1]);
			}

			instance = new NormInstance();
			session = instance.createSession(destAddress, destPort,
											 NormNode.NORM_NODE_ANY);

			String ccStr = System.getProperty("Norm.CC", "off");
			if (ccStr.equalsIgnoreCase("on")) {
				session.setCongestionControl(true, true);
				System.out.println("Set Congestion Control to " + ccStr);
			}

			session.startSender(1, SESSION_BUFFER_SIZE, 
								SEGMENT_SIZE, BLOCK_SIZE, PARITY_SEGMENTS);
			stream = session.streamOpen(REPAIR_WINDOW_SIZE);

			while (-1 != (length = System.in.read(buf, 0, buf.length))) {
				int numWritten = 0;
				offset = 0;

				while (length != (numWritten = stream.write(buf, offset, length))) {
					length -= numWritten;
					offset += numWritten;

					NormEvent event = instance.getNextEvent();
					NormEventType eventType = event.getType();

					while ((eventType != NormEventType.NORM_TX_QUEUE_EMPTY) && 
						   (eventType != NormEventType.NORM_TX_QUEUE_VACANCY)) {
						event = instance.getNextEvent();
						eventType = event.getType();
					}
				}

				stream.markEom();

				//System.err.println("Wrote " + numWritten);
				//System.err.println("... Done!");

				// TODO: Create a new buf each time I'm successful writing
				// all of it?
			}

			stream.flush();
			System.err.println("End of file!");
		}
		catch (IOException ex) {
			System.err.println(ex);
		}
		catch (NumberFormatException ex) {
			System.err.println("Usage: NormStreamSend [host-name [port]]");
			System.err.println("Default host-name: " + DEST_ADDRESS);
			System.err.println("Default port: " + DEST_PORT);
		}

		if (null != stream) {
			System.err.println("Closing stream");
			stream.close(true);
		}

		if (null != session) {
			System.err.println("Stopping sender");
			session.stopSender();
			System.err.println("Destroying session");
			session.destroySession();
		}

		if (null != instance) {
			System.err.println("Destroying instance");
			instance.destroyInstance();
		}

		System.err.println("That's all folks!");
	}
}