File: GenericFile_HDFS.h

package info (click to toggle)
snap-aligner 2.0.3%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,652 kB
  • sloc: cpp: 41,051; ansic: 5,239; python: 227; makefile: 85; sh: 28
file content (106 lines) | stat: -rwxr-xr-x 2,329 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*++

Module Name:

    GenericFile_HDFS.h

Abstract:

    Generic IO class for SNAP that can read from HDFS.

Authors:

    Jeremy Elson, February 2014

Environment:

    User mode service.

Revision History:


--*/

#pragma once

#include "GenericFile.h"
#include "Util.h"

#include "hdfs.h"

#include <vector>

class GenericFile_HDFS : public GenericFile
{
public:
	static GenericFile_HDFS *open(const char *filename, Mode mode);

	virtual size_t read(void *ptr, size_t count);
	virtual int getchar();
	virtual char *gets(char *buf, size_t count);
	virtual int advance(long long offset);
    int seek(long long offset);
	virtual void close();
	virtual ~GenericFile_HDFS();

private:
	// private constructor -- must use factory
	GenericFile_HDFS();

	// private methods and data	
	static int _initFlag;
	static int _staticInit();
	static ExclusiveLock _staticLock;
	size_t _readMultiThreaded(void *ptr, size_t count);
	size_t _readSingleThreaded(void *ptr, size_t count);

	// this is static because of an apparent bug in the HDFS library
	// that prevents clients from holding more than one handle. If you
	// open two connections to the same filesystem, then close one,
	// the other is also closed. Ugh. As a work-around we'll just keep
	// one global.
	// See: https://issues.apache.org/jira/browse/HDFS-925
	static hdfsFS _fs; 
	
	hdfsFile _file;

	class _HdfsWorkItem {
	public:
		void *ptr;
		tOffset startOffset;
		size_t count;

		_HdfsWorkItem(void *ptrArg, tOffset startOffsetArg, size_t countArg)
		{
			this->ptr = ptrArg;
			this->startOffset = startOffsetArg;
			this->count = countArg;
		}
	};

	class _HdfsWorkQueue {
	public:
		_HdfsWorkQueue(GenericFile_HDFS *gFile, void *ptr, tOffset startOffset, size_t count);
		size_t size();
		_HdfsWorkItem *getOne();
		GenericFile_HDFS *getFile() { return _gFile; }
		void createNWaiter(size_t n);
		void wait();
		void signalThreadDone();
		void signalError() { _error = true; }
		bool isErrorThrown() { return _error; }
		~_HdfsWorkQueue();

	private:
		GenericFile_HDFS *_gFile;
		ExclusiveLock _workQueueLock;
		NWaiter *_nWaiter;
		std::vector<_HdfsWorkItem *> _workItemList;
		bool _error;
	};

	static void _hdfsReaderThread(void *workQueueObject);
	static const size_t _MAX_READ_THREADS = 32;
	static const size_t _READ_CHUNK_SIZE = 16*1024*1024; // 16MB
};