File: test_capi_streaming.cpp

package info (click to toggle)
duckdb 1.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 299,196 kB
  • sloc: cpp: 865,414; ansic: 57,292; python: 18,871; sql: 12,663; lisp: 11,751; yacc: 7,412; lex: 1,682; sh: 747; makefile: 558
file content (184 lines) | stat: -rw-r--r-- 5,729 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include "capi_tester.hpp"
#include "duckdb.h"

using namespace duckdb;
using namespace std;

TEST_CASE("Test streaming results in C API", "[capi]") {
	CAPITester tester;
	CAPIPrepared prepared;
	CAPIPending pending;
	duckdb::unique_ptr<CAPIResult> result;

	// open the database in in-memory mode
	REQUIRE(tester.OpenDatabase(nullptr));
	REQUIRE(prepared.Prepare(tester, "SELECT i::UINT32 FROM range(1000000) tbl(i)"));
	REQUIRE(pending.PendingStreaming(prepared));

	while (true) {
		auto state = pending.ExecuteTask();
		REQUIRE(state != DUCKDB_PENDING_ERROR);
		if (state == DUCKDB_PENDING_RESULT_READY) {
			break;
		}
	}

	result = pending.Execute();
	REQUIRE(result);
	REQUIRE(!result->HasError());
	auto chunk = result->StreamChunk();

	idx_t value = duckdb::DConstants::INVALID_INDEX;
	idx_t result_count = 0;
	while (chunk) {
		auto old_value = value;

		auto vector = chunk->GetVector(0);
		uint32_t *data = (uint32_t *)duckdb_vector_get_data(vector);
		value = data[0];
		if (old_value != duckdb::DConstants::INVALID_INDEX) {
			// We select from a range, so we can expect every starting value of a new chunk to be higher than the last
			// one.
			REQUIRE(value > old_value);
		}
		REQUIRE(chunk->size() > 0);
		result_count += chunk->size();
		REQUIRE(result_count <= 1000000);
		chunk = result->StreamChunk();
	}
}

TEST_CASE("Test other methods on streaming results in C API", "[capi]") {
	CAPITester tester;
	CAPIPrepared prepared;
	CAPIPending pending;
	duckdb::unique_ptr<CAPIResult> result;

	// open the database in in-memory mode
	REQUIRE(tester.OpenDatabase(nullptr));
	REQUIRE(prepared.Prepare(tester, "SELECT i::UINT32 FROM range(1000000) tbl(i)"));
	REQUIRE(pending.PendingStreaming(prepared));

	while (true) {
		auto state = pending.ExecuteTask();
		REQUIRE(state != DUCKDB_PENDING_ERROR);
		if (state == DUCKDB_PENDING_RESULT_READY) {
			break;
		}
	}

	// Once we've done this, the StreamQueryResult is made
	result = pending.Execute();
	REQUIRE(result);
	REQUIRE(!result->HasError());
	REQUIRE(result->IsStreaming());

	// interrogate the result with various methods
	auto chunk_count = result->ChunkCount();
	REQUIRE(chunk_count == 0);
	auto column_count = result->ColumnCount();
	(void)column_count;
	auto column_name = result->ColumnName(0);
	(void)column_name;
	auto column_type = result->ColumnType(0);
	(void)column_type;
	auto error_message = result->ErrorMessage();
	REQUIRE(error_message == nullptr);
	auto fetched_chunk = result->FetchChunk(0);
	REQUIRE(fetched_chunk == nullptr);
	auto has_error = result->HasError();
	REQUIRE(has_error == false);
	auto row_count = result->row_count();
	REQUIRE(row_count == 0);
	auto rows_changed = result->rows_changed();
	REQUIRE(rows_changed == 0);

	// this succeeds because the result is materialized if a stream-result method hasn't being used yet
	auto column_data = result->ColumnData<uint32_t>(0);
	REQUIRE(column_data != nullptr);

	// this materializes the result
	auto is_null = result->IsNull(0, 0);
	REQUIRE(is_null == false);
}

TEST_CASE("Test streaming arrow results in C API", "[capi][arrow]") {
	CAPITester tester;
	CAPIPrepared prepared;
	CAPIPending pending;
	duckdb::unique_ptr<CAPIResult> result;

	// open the database in in-memory mode
	REQUIRE(tester.OpenDatabase(nullptr));
	REQUIRE(prepared.Prepare(tester, "SELECT i::UINT32 FROM range(1000000) tbl(i)"));
	REQUIRE(pending.PendingStreaming(prepared));

	while (true) {
		auto state = pending.ExecuteTask();
		REQUIRE(state != DUCKDB_PENDING_ERROR);
		if (state == DUCKDB_PENDING_RESULT_READY) {
			break;
		}
	}

	result = pending.Execute();
	REQUIRE(result);
	REQUIRE(!result->HasError());
	auto chunk = result->StreamChunk();

	// Check handle null out_array
	duckdb_result_arrow_array(result->InternalResult(), chunk->GetChunk(), nullptr);

	int nb_row = 0;
	while (chunk) {
		ArrowArray *arrow_array = new ArrowArray();
		duckdb_result_arrow_array(result->InternalResult(), chunk->GetChunk(), (duckdb_arrow_array *)&arrow_array);
		nb_row += arrow_array->length;
		chunk = result->StreamChunk();
		arrow_array->release(arrow_array);
		delete arrow_array;
	}
	REQUIRE(nb_row == 1000000);
}

TEST_CASE("Test query progress and interrupt in C API", "[capi]") {
	CAPITester tester;
	CAPIPrepared prepared;
	CAPIPending pending;
	duckdb::unique_ptr<CAPIResult> result;

	// test null handling
	REQUIRE(duckdb_query_progress(nullptr).percentage == -1.0);
	duckdb_interrupt(nullptr);

	// open the database in in-memory mode
	REQUIRE(tester.OpenDatabase(nullptr));
	REQUIRE_NO_FAIL(tester.Query("SET threads=1"));
	REQUIRE_NO_FAIL(tester.Query("create table tbl as select range a, mod(range,10) b from range(10000);"));
	REQUIRE_NO_FAIL(tester.Query("create table tbl_2 as select range a from range(10000);"));
	REQUIRE_NO_FAIL(tester.Query("set enable_progress_bar=true;"));
	REQUIRE_NO_FAIL(tester.Query("set enable_progress_bar_print=false;"));
	// test no progress before query
	REQUIRE(duckdb_query_progress(tester.connection).percentage == -1.0);
	// test zero progress with query
	REQUIRE(prepared.Prepare(tester, "select count(*) from tbl where a = (select min(a) from tbl_2)"));
	REQUIRE(pending.PendingStreaming(prepared));
	REQUIRE(duckdb_query_progress(tester.connection).percentage == 0.0);

	// test progress
	while (duckdb_query_progress(tester.connection).percentage == 0.0) {
		auto state = pending.ExecuteTask();
		REQUIRE(state == DUCKDB_PENDING_RESULT_NOT_READY);
	}
	REQUIRE(duckdb_query_progress(tester.connection).percentage >= 0.0);

	// test interrupt
	duckdb_interrupt(tester.connection);
	while (true) {
		auto state = pending.ExecuteTask();
		REQUIRE(state != DUCKDB_PENDING_RESULT_READY);
		if (state == DUCKDB_PENDING_ERROR) {
			break;
		}
	}
}