1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 1997, 2013 Oracle and/or its affiliates. All rights reserved.
*
*/
#include "ex_sql_utils.h"
/*
* Create dozens of writer threads to insert data in parallel.
* DBSQL will not call the busy callback. It blocks instead.
*/
typedef struct {
const char* db_name; /* The filename of db. */
int num_of_records; /* The number of records to insert. */
int thread_sn; /* Serial number of thread. */
} thread_attr;
/*
* Define the writer thread's workload.
* The writer would insert 5000 records in its thread. Commit if succeeded
* and rollback if failed.
*/
static void*
writer(arg)
void *arg;
{
const char* sql;
int txn_begin;
int num_of_records;
int thread_sn;
int i, rc;
sqlite3* db;
sqlite3_stmt* stmt;
txn_begin = 0; /* Mark that explicit txn does not begin yet. */
/* Open database. */
sqlite3_open(((thread_attr *)arg)->db_name, &db);
error_handler(db);
/* Fetch attributes. */
num_of_records = ((thread_attr *)arg)->num_of_records;
thread_sn = ((thread_attr *)arg)->thread_sn;
/* Prepare the statement for use, many times over. */
sql = "INSERT INTO university VALUES"
"(147, 'Tsinghua University China', 'tsinghua.edu.cn',"
"'cn', 'Asia', 237,63,432,303);";
rc = sqlite3_prepare_v2(db, sql, (int)strlen(sql), &stmt, NULL);
if (rc != SQLITE_OK)
goto cleanup;
/*
* When we insert data many times over, we shall use explicit
* transaction to speed up the operations.
*/
rc = sqlite3_exec(db, "BEGIN TRANSACTION", NULL, 0, NULL);
if (rc != SQLITE_OK)
goto cleanup;
txn_begin = 1; /* Mark that explicit txn began. */
for (i = 0; i < num_of_records; i++) {
rc = sqlite3_step(stmt);
/*
* Even if we encounter errors, the statement still has
* to be reset. Otherwise following rollback always
* hits SQLITE_BUSY
*/
sqlite3_reset(stmt);
if (rc != SQLITE_DONE) {
/* We can not return here. Rollback is required. */
goto cleanup;
break;
}
}
/* Commit if no errors. */
rc = sqlite3_exec(db, "COMMIT TRANSACTION", NULL, 0, NULL);
if (rc != SQLITE_OK)
goto cleanup;
cleanup:
/* Error handle. */
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
fprintf(stderr, "ERROR: %s. ERRCODE: %d.\n",
sqlite3_errmsg(db), rc);
/* Rollback if explict txn had begined. */
if (txn_begin)
sqlite3_exec(db, "ROLLBACK TRANSACTION", NULL, 0, NULL);
}
/* Final cleanup. */
sqlite3_finalize(stmt);
sqlite3_close(db);
return NULL;
}
/* Example body. */
static int
ex_sql_multi_thread(db, db_name)
db_handle *db;
const char* db_name;
{
const char* sql;
int nthreads;
int ninsert;
int i;
thread_attr attr;
os_thread_t pid;
nthreads = 20;
ninsert = 5000;
/* Display current status. */
echo_info("Check existing record number of the table");
sql = "SELECT count(*) FROM university;";
exec_sql(db, sql);
/*
* Create n threads and write in parallel.
*/
echo_info("Now we begin to insert records by multi-writers.");
attr.db_name = db_name;
attr.num_of_records = ninsert;
for (i = 0; i < nthreads; i++) {
attr.thread_sn = i;
if (os_thread_create(&pid, writer, (void *)&attr)) {
register_thread_id(pid);
printf("%02dth writer starts to write %d rows\n",
i + 1, ninsert);
sqlite3_sleep(20); /* Milliseconds. */
} else {
fprintf(stderr, "Failed to create thread\n");
}
}
join_threads();
/* Display result. */
echo_info("Check existing record number of the table");
sql = "SELECT count(*) FROM university;";
exec_sql(db, sql);
return 0;
}
int
main()
{
db_handle *db;
const char* db_name = "ex_sql_multi_thread.db";
/* Check if current lib is threadsafe. */
if(!sqlite3_threadsafe()) {
fprintf(stderr,
"ERROR: The libsqlite version is NOT threadsafe!\n");
exit(EXIT_FAILURE);
}
/* Setup environment and preload data. */
db = setup(db_name);
load_table_from_file(db, university_sample_data, 1/* Silent */);
/* Run example. */
ex_sql_multi_thread(db, db_name);
/* End. */
cleanup(db);
return 0;
}
|