1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#include "catch.hpp"
#include "test_helpers.hpp"
#include "duckdb/main/appender.hpp"
#include "duckdb/common/serializer/memory_stream.hpp"
#include "duckdb/parser/statement/logical_plan_statement.hpp"
#include "duckdb/common/serializer/binary_serializer.hpp"
#include "duckdb/common/serializer/binary_deserializer.hpp"
// whatever
#include <signal.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdio.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#ifdef __MVS__
#define _XOPEN_SOURCE_EXTENDED 1
#include <strings.h>
#endif
using namespace duckdb;
using namespace std;
TEST_CASE("Test using a remote optimizer pass in case thats important to someone", "[extension]") {
pid_t pid = fork();
int port = 4242;
if (pid == 0) { // child process
// sockets, man, how do they work?!
struct sockaddr_in servaddr, cli;
auto sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd == -1) {
printf("Failed to set up socket in child process: %s", strerror(errno));
exit(1);
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
servaddr.sin_port = htons(port);
auto res = ::bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
if (res != 0) {
printf("Failed to bind socket in child process: %s", strerror(errno));
exit(1);
}
res = listen(sockfd, 5);
if (res != 0) {
printf("Failed to listen to socked in child process: %s", strerror(errno));
exit(1);
}
socklen_t len = sizeof(cli);
auto connfd = accept(sockfd, (struct sockaddr *)&cli, &len);
if (connfd < 0) {
printf("Failed to set up socket in child process: %s", strerror(errno));
exit(1);
}
DBConfig config;
config.SetOptionByName("allow_unsigned_extensions", true);
DuckDB db2(nullptr, &config);
Connection con2(db2);
auto load_parquet = con2.Query("LOAD parquet");
if (load_parquet->HasError()) {
printf("Failed to load Parquet in child process: %s", load_parquet->GetError().c_str());
exit(1);
}
while (true) {
idx_t bytes;
REQUIRE(read(connfd, &bytes, sizeof(idx_t)) == sizeof(idx_t));
if (bytes == 0) {
break;
}
auto buffer = malloc(bytes);
REQUIRE(buffer);
REQUIRE(read(connfd, buffer, bytes) == ssize_t(bytes));
// Non-owning stream
MemoryStream stream(data_ptr_cast(buffer), bytes);
con2.BeginTransaction();
BinaryDeserializer deserializer(stream);
deserializer.Set<ClientContext &>(*con2.context);
deserializer.Begin();
auto plan = LogicalOperator::Deserialize(deserializer);
deserializer.End();
plan->ResolveOperatorTypes();
con2.Commit();
auto statement = make_uniq<LogicalPlanStatement>(std::move(plan));
auto result = con2.Query(std::move(statement));
auto &collection = result->Collection();
idx_t num_chunks = collection.ChunkCount();
REQUIRE(write(connfd, &num_chunks, sizeof(idx_t)) == sizeof(idx_t));
for (auto &chunk : collection.Chunks()) {
Allocator allocator;
MemoryStream target(allocator);
BinarySerializer serializer(target);
serializer.Begin();
chunk.Serialize(serializer);
serializer.End();
auto data = target.GetData();
idx_t len = target.GetPosition();
REQUIRE(write(connfd, &len, sizeof(idx_t)) == sizeof(idx_t));
REQUIRE(write(connfd, data, len) == ssize_t(len));
}
}
exit(0);
} else if (pid > 0) { // parent process
DBConfig config;
config.SetOptionByName("allow_unsigned_extensions", true);
DuckDB db1(nullptr, &config);
Connection con1(db1);
auto load_parquet = con1.Query("LOAD 'parquet'");
if (load_parquet->HasError()) {
// Do not execute the test.
if (kill(pid, SIGKILL) != 0) {
FAIL();
}
return;
}
REQUIRE_NO_FAIL(con1.Query("LOAD '" DUCKDB_BUILD_DIRECTORY
"/test/extension/loadable_extension_optimizer_demo.duckdb_extension'"));
REQUIRE_NO_FAIL(con1.Query("SET waggle_location_host='127.0.0.1'"));
REQUIRE_NO_FAIL(con1.Query("SET waggle_location_port=4242"));
usleep(10000); // need to wait a bit till socket is up
// check if the child PID is still there
if (kill(pid, 0) != 0) {
// child is gone!
printf("Failed to execute remote optimizer test - child exited unexpectedly");
FAIL();
}
REQUIRE_NO_FAIL(con1.Query(
"SELECT first_name FROM PARQUET_SCAN('data/parquet-testing/userdata1.parquet') GROUP BY first_name"));
if (kill(pid, SIGKILL) != 0) {
FAIL();
}
} else {
FAIL();
}
}
|