File: test_caching_file_system_wrapper.cpp

package info (click to toggle)
duckdb 1.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 299,196 kB
  • sloc: cpp: 865,414; ansic: 57,292; python: 18,871; sql: 12,663; lisp: 11,751; yacc: 7,412; lex: 1,682; sh: 747; makefile: 558
file content (494 lines) | stat: -rw-r--r-- 18,775 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
#include "catch.hpp"
#include "duckdb/common/file_system.hpp"
#include "duckdb/common/local_file_system.hpp"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/opener_file_system.hpp"
#include "duckdb/common/string.hpp"
#include "duckdb/common/vector.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/storage/caching_file_system_wrapper.hpp"
#include "test_helpers.hpp"

#include <thread>

namespace {
constexpr idx_t TEST_BUFFER_SIZE = 200;
} // namespace

namespace duckdb {

// RAII wrapper for test file creation and cleanup
class TestFileGuard {
public:
	TestFileGuard(const string &filename, const string &content) : file_path(TestCreatePath(filename)) {
		auto local_fs = FileSystem::CreateLocal();
		auto handle = local_fs->OpenFile(file_path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE);
		handle->Write(QueryContext(), const_cast<char *>(content.data()), content.size(), 0);
		handle->Sync();
	}

	~TestFileGuard() {
		auto local_fs = FileSystem::CreateLocal();
		local_fs->TryRemoveFile(file_path);
	}

	const string &GetPath() const {
		return file_path;
	}

private:
	string file_path;
};

// A test filesystem that tracks read operations in the order of invocation.
class TrackingFileSystem : public LocalFileSystem {
public:
	struct ReadCall {
		string path;
		idx_t location;
		idx_t size;
	};

	mutable std::mutex read_calls_mutex;
	vector<ReadCall> read_calls;

	string GetName() const override {
		return "TrackingFileSystem";
	}
	void Read(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override {
		const lock_guard<mutex> lock(read_calls_mutex);
		read_calls.push_back({handle.GetPath(), location, UnsafeNumericCast<idx_t>(nr_bytes)});
		LocalFileSystem::Read(handle, buffer, nr_bytes, location);
	}

	// Clear all read invocations track.
	void Clear() {
		const lock_guard<mutex> lock(read_calls_mutex);
		read_calls.clear();
	}

	// Get read operation counts with the given operation to match.
	size_t GetReadCount(const string &path, idx_t location, idx_t size) const {
		const lock_guard<mutex> lock(read_calls_mutex);
		size_t count = 0;
		for (const auto &call : read_calls) {
			if (call.path == path && call.location == location && call.size == size) {
				++count;
			}
		}
		return count;
	}

	// Tracking filesystem can only deal files in the testing directory.
	bool CanHandleFile(const string &path) override {
		return StringUtil::StartsWith(path, TestDirectoryPath());
	}

	// Tracking filesystem is a derived class of local filesystem and could seek.
	bool CanSeek() override {
		return true;
	}
};

TEST_CASE("CachingFileSystemWrapper write operations not allowed", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	const string test_content = "This is test content for write testing.";
	TestFileGuard test_file("test_caching_write.txt", test_content);

	// Open file for reading through caching wrapper
	OpenFileInfo file_info(test_file.GetPath());
	file_info.extended_info = make_shared_ptr<ExtendedOpenFileInfo>();
	file_info.extended_info->options["validate_external_file_cache"] = Value::BOOLEAN(false);
	auto handle = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);

	// Test that write operations are not allowed - CachingFileSystemWrapper is read-only
	const string write_data = "Attempted write data";
	string write_buffer(100, '\0');
	memcpy(const_cast<char *>(write_buffer.data()), write_data.c_str(), write_data.size());

	// Try to write at a location, which should throw NotImplementedException
	REQUIRE_THROWS_AS(caching_wrapper->Write(*handle, &write_buffer[0], write_data.size(), /*location=*/0),
	                  NotImplementedException);

	// Try truncate, which should also throw NotImplementedException
	REQUIRE_THROWS_AS(caching_wrapper->Truncate(*handle, 0), NotImplementedException);

	// Try FileSync, which should also throw NotImplementedException
	REQUIRE_THROWS_AS(caching_wrapper->FileSync(*handle), NotImplementedException);

	// Try Trim, which should also throw NotImplementedException
	REQUIRE_THROWS_AS(caching_wrapper->Trim(*handle, 0, 10), NotImplementedException);

	handle.reset();

	// Test that opening file with write flags is rejected
	REQUIRE_THROWS_AS(caching_wrapper->OpenFile(test_file.GetPath(), FileFlags::FILE_FLAGS_WRITE),
	                  NotImplementedException);
	REQUIRE_THROWS_AS(
	    caching_wrapper->OpenFile(test_file.GetPath(), FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_WRITE),
	    NotImplementedException);
}

TEST_CASE("CachingFileSystemWrapper caches reads", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto tracking_fs_ptr = tracking_fs.get();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	const string test_content = "This is test content for caching verification. It should only be read once.";
	TestFileGuard test_file("test_caching_file.txt", test_content);

	// Test 1: Read the same content multiple times, which should only hit underlying FS once
	{
		tracking_fs_ptr->Clear();

		// Create OpenFileInfo with validation disabled to allow caching to work
		OpenFileInfo file_info(test_file.GetPath());
		file_info.extended_info = make_shared_ptr<ExtendedOpenFileInfo>();
		file_info.extended_info->options["validate_external_file_cache"] = Value::BOOLEAN(false);

		// First read
		auto handle1 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer1(TEST_BUFFER_SIZE, '\0');
		handle1->Read(QueryContext(), &buffer1[0], test_content.size(), /*location=*/0);
		handle1.reset();

		// Second read of the same location
		auto handle2 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer2(TEST_BUFFER_SIZE, '\0');
		handle2->Read(QueryContext(), &buffer2[0], test_content.size(), /*location=*/0);
		handle2.reset();

		// Third read of the same location
		auto handle3 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer3(TEST_BUFFER_SIZE, '\0');
		handle3->Read(QueryContext(), &buffer3[0], test_content.size(), /*location=*/0);
		handle3.reset();

		// Verify content is correct
		REQUIRE(buffer1.substr(0, test_content.size()) == test_content);
		REQUIRE(buffer2.substr(0, test_content.size()) == test_content);
		REQUIRE(buffer3.substr(0, test_content.size()) == test_content);

		// Verify the underlying filesystem was only called once for this read
		auto read_count = tracking_fs_ptr->GetReadCount(test_file.GetPath(), /*location=*/0, test_content.size());
		REQUIRE(read_count == 1);
	}

	// Test 2: Read different locations, with each request hitting underlying FS once
	{
		// Use a different file for this test to avoid interference from Test 1's cache
		const string test_content2 = "This is test content for chunked read testing. It has enough content.";
		TestFileGuard test_file2("test_caching_file2.txt", test_content2);

		tracking_fs_ptr->Clear();

		// Create OpenFileInfo with validation disabled
		OpenFileInfo file_info(test_file2.GetPath());
		file_info.extended_info = make_shared_ptr<ExtendedOpenFileInfo>();
		file_info.extended_info->options["validate_external_file_cache"] = Value::BOOLEAN(false);

		const idx_t chunk_size = 20;
		auto handle1 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer1(TEST_BUFFER_SIZE, '\0');
		handle1->Read(QueryContext(), &buffer1[0], chunk_size, /*location=*/0);
		handle1.reset();

		auto handle2 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer2(TEST_BUFFER_SIZE, '\0');
		handle2->Read(QueryContext(), &buffer2[0], chunk_size, chunk_size);
		handle2.reset();

		// Read first chunk again - should use cache
		auto handle3 = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);
		string buffer3(TEST_BUFFER_SIZE, '\0');
		handle3->Read(QueryContext(), &buffer3[0], chunk_size, /*location=*/0);
		handle3.reset();

		// Verify underlying FS access
		REQUIRE(tracking_fs_ptr->GetReadCount(test_file2.GetPath(), 0, chunk_size) == 1);
		REQUIRE(tracking_fs_ptr->GetReadCount(test_file2.GetPath(), chunk_size, chunk_size) == 1);
	}
}

TEST_CASE("CachingFileSystemWrapper sequential reads", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto tracking_fs_ptr = tracking_fs.get();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	const string test_content = "This is test content for sequential read testing.";
	TestFileGuard test_file("test_caching_sequential.txt", test_content);

	// Test sequential reads using location-based reads
	{
		tracking_fs_ptr->Clear();

		auto handle = caching_wrapper->OpenFile(test_file.GetPath(), FileFlags::FILE_FLAGS_READ);
		string buffer(TEST_BUFFER_SIZE, '\0');

		// First read from position 0
		handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/10, /*location=*/0);

		// Second read from position 10
		handle->Read(QueryContext(), &buffer[10], /*nr_bytes=*/10, /*location=*/10);

		// Verify content
		REQUIRE(buffer.substr(0, 20) == test_content.substr(0, 20));

		handle.reset();
	}
}

TEST_CASE("CachingFileSystemWrapper seek operations", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	const string test_content = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
	TestFileGuard test_file("test_caching_seek.txt", test_content);

	// Open file for reading through caching wrapper
	OpenFileInfo file_info(test_file.GetPath());
	file_info.extended_info = make_shared_ptr<ExtendedOpenFileInfo>();
	file_info.extended_info->options["validate_external_file_cache"] = Value::BOOLEAN(false);
	auto handle = caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ);

	string buffer(100, '\0');

	// Test 1: Basic seek and read
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/5, /*location=*/0);
	REQUIRE(buffer.substr(0, 5) == "01234");

	// Seek to position 10
	caching_wrapper->Seek(*handle, /*location=*/10);
	REQUIRE(caching_wrapper->SeekPosition(*handle) == 10);

	// Read from position 10
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/5, /*location=*/10);
	REQUIRE(buffer.substr(0, 5) == "ABCDE");

	// Seek to position 5
	caching_wrapper->Seek(*handle, /*location=*/5);
	REQUIRE(caching_wrapper->SeekPosition(*handle) == 5);

	// Read from position 5
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/5, /*location=*/5);
	REQUIRE(buffer.substr(0, 5) == "56789");

	// Test 2: Reset.
	caching_wrapper->Reset(*handle);
	REQUIRE(caching_wrapper->SeekPosition(*handle) == 0);

	// Read from beginning again
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/5, /*location=*/0);
	REQUIRE(buffer.substr(0, 5) == "01234");

	// Test 3: Multiple seek + get offset + read operations
	struct SeekReadTest {
		idx_t seek_pos;
		idx_t read_size;
		string expected;
	};

	vector<SeekReadTest> tests = {
	    {0, 3, "012"}, {5, 4, "5678"}, {10, 5, "ABCDE"}, {15, 4, "FGHI"}, {20, 3, "KLM"}, {25, 4, "PQRS"},
	};

	for (const auto &test : tests) {
		caching_wrapper->Seek(*handle, test.seek_pos);
		REQUIRE(caching_wrapper->SeekPosition(*handle) == test.seek_pos);
		handle->Read(QueryContext(), &buffer[0], test.read_size, test.seek_pos);
		REQUIRE(buffer.substr(0, test.read_size) == test.expected);
	}

	// Test 4: Read after seek
	caching_wrapper->Seek(*handle, 30);
	REQUIRE(caching_wrapper->SeekPosition(*handle) == 30);
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/6, /*location=*/30);
	REQUIRE(buffer.substr(0, 6) == "UVWXYZ");

	// Test 5: Seek back and verify position
	caching_wrapper->Seek(*handle, 12);
	REQUIRE(caching_wrapper->SeekPosition(*handle) == 12);
	handle->Read(QueryContext(), &buffer[0], /*nr_bytes=*/3, /*location=*/12);
	REQUIRE(buffer.substr(0, 3) == "CDE");

	handle.reset();
}

TEST_CASE("CachingFileSystemWrapper list operations", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	// Create a test directory
	auto test_dir = TestCreatePath("test_list_dir");
	auto local_fs = FileSystem::CreateLocal();
	local_fs->CreateDirectory(test_dir);

	// Create several test files in the directory.
	vector<string> expected_files = {"file1.txt", "file2.txt", "file3.txt", "file4.txt"};
	vector<string> file_paths;
	for (const auto &filename : expected_files) {
		auto file_path = local_fs->JoinPath(test_dir, filename);
		file_paths.push_back(file_path);
		auto handle = local_fs->OpenFile(file_path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE);
		const string content = "Test content for " + filename;
		handle->Write(QueryContext(), const_cast<char *>(content.data()), content.size(), /*location=*/0);
		handle->Sync();
		handle.reset();
	}

	// List files using the caching wrapper
	vector<string> actual_files;
	caching_wrapper->ListFiles(test_dir, [&actual_files](const string &path, bool is_dir) {
		if (!is_dir) {
			actual_files.emplace_back(path);
		}
	});

	std::sort(expected_files.begin(), expected_files.end());
	std::sort(actual_files.begin(), actual_files.end());
	REQUIRE(actual_files.size() == expected_files.size());
	for (size_t idx = 0; idx < expected_files.size(); ++idx) {
		REQUIRE(actual_files[idx] == expected_files[idx]);
	}

	for (const auto &file_path : file_paths) {
		local_fs->TryRemoveFile(file_path);
	}
	local_fs->RemoveDirectory(test_dir);
}

TEST_CASE("CachingFileSystemWrapper read with parallel accesses", "[file_system][caching]") {
	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto tracking_fs = make_uniq<TrackingFileSystem>();
	auto caching_wrapper =
	    make_shared_ptr<CachingFileSystemWrapper>(*tracking_fs, db_instance, CachingMode::ALWAYS_CACHE);

	const string test_content =
	    "Test content for parallel read access. This is a longer string to allow multiple reads.";
	TestFileGuard test_file("test_caching_parallel.txt", test_content);
	constexpr idx_t THREAD_COUNT = 2;

	// Open file with parallel access flag - single handle shared by all threads
	OpenFileInfo file_info(test_file.GetPath());
	file_info.extended_info = make_shared_ptr<ExtendedOpenFileInfo>();
	file_info.extended_info->options["validate_external_file_cache"] = Value::BOOLEAN(false);
	auto shared_handle =
	    caching_wrapper->OpenFile(file_info, FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_PARALLEL_ACCESS);

	// Use two threads to read from the same file handle in parallel using pread semantics
	vector<std::thread> threads;
	std::mutex results_mutex;
	vector<bool> results(THREAD_COUNT, false);

	const idx_t chunk_size = 20;
	for (size_t idx = 0; idx < THREAD_COUNT; ++idx) {
		threads.emplace_back([&, idx]() {
			const idx_t read_location = idx * chunk_size;
			string buffer(TEST_BUFFER_SIZE, '\0');
			shared_handle->Read(QueryContext(), &buffer[0], chunk_size, read_location);
			bool result = (buffer.substr(0, chunk_size) == test_content.substr(read_location, chunk_size));
			{
				const lock_guard<mutex> lock(results_mutex);
				results[idx] = result;
			}
		});
	}
	for (auto &thd : threads) {
		REQUIRE(thd.joinable());
		thd.join();
	}

	// Verify both threads read correctly from the same handle
	REQUIRE(results[0]);
	REQUIRE(results[1]);

	shared_handle.reset();
}

// Testing scenario: mimic open file with duckdb instance, which open a file goes through opener filesystem, meanwhile
// with caching enabled.
//
// Example usage in production:
// auto &fs = FileSystem::GetFileSystem(context);
// auto file_handle = fs.OpenFile(path, flag);
TEST_CASE("Open file in opener filesystem cache modes", "[file_system][caching]") {
	const string test_content = "File used for caching enabled testing";
	TestFileGuard test_file("test_caching_parallel.txt", test_content);

	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto &opener_filesystem = db_instance.GetFileSystem().Cast<OpenerFileSystem>();
	auto &vfs = opener_filesystem.GetFileSystem();
	vfs.RegisterSubSystem(make_uniq<TrackingFileSystem>());

	// Shared variable both all caching modes.
	string buffer(TEST_BUFFER_SIZE, '\0');
	const auto &external_file_cache = db_instance.GetExternalFileCache();

	auto run_case = [&](CachingMode mode) {
		FileOpenFlags flags {FileFlags::FILE_FLAGS_READ};
		flags.SetCachingMode(mode);

		// Perform read operation and check correctness.
		auto handle = opener_filesystem.OpenFile(test_file.GetPath(), flags);
		handle->Read(QueryContext(), &buffer[0], test_content.length(), /*location=*/0);
		REQUIRE(buffer.substr(0, test_content.length()) == test_content);

		// Check seeability.
		REQUIRE(handle->CanSeek());
	};

	SECTION("cache enabled") {
		run_case(CachingMode::ALWAYS_CACHE);
		// Check external cache file has something cached.
		REQUIRE(!external_file_cache.GetCachedFileInformation().empty());
	}

	SECTION("cache disabled") {
		run_case(CachingMode::NO_CACHING);
		// Check external cache file has nothing cached.
		REQUIRE(external_file_cache.GetCachedFileInformation().empty());
	}
}

// Testing scenario: read end offset exceeds file size, caching filesystem is expected to truncate.
TEST_CASE("Request over-sized range read", "[file_system][caching]") {
	const string test_content = "File used for over-sized read testing";
	TestFileGuard test_file("test_oversized_read.txt", test_content);

	DuckDB db(":memory:");
	auto &db_instance = *db.instance;
	auto &opener_filesystem = db_instance.GetFileSystem().Cast<OpenerFileSystem>();
	auto &vfs = opener_filesystem.GetFileSystem();
	vfs.RegisterSubSystem(make_uniq<TrackingFileSystem>());

	FileOpenFlags flags {FileFlags::FILE_FLAGS_READ};
	flags.SetCachingMode(CachingMode::ALWAYS_CACHE);

	// Perform read operation and check correctness.
	auto handle = opener_filesystem.OpenFile(test_file.GetPath(), flags);
	string buffer(TEST_BUFFER_SIZE, '\0');
	const idx_t actual_read = handle->Read(QueryContext(), &buffer[0], test_content.length() + 1);
	REQUIRE(actual_read == test_content.length());
	REQUIRE(buffer.substr(0, test_content.length()) == test_content);
}

} // namespace duckdb