1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
/*
This file is part of BioD.
Copyright (C) 2018 Pjotr Prins <pjotr.prins@thebird.nl>
*/
module bio.std.decompress;
/**
Streaming line reader which can be used for gzipped files. Note the
current edition (still) uses the garbage collector. It may help to
switch it off or to use the BioD decompressor used by bgzf.
For a comparison with gzip a 2GB file decompressed with
real 0m53.701s
user 0m53.820s
sys 0m0.572s
while gzip took
real 0m11.528s
user 0m10.288s
sys 0m0.936s
So, that is something to aim for.
Conversion can happen between different encodings, provided the
line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
ByLineImpl and readln function from std.stdio.
*/
import std.algorithm;
// import std.concurrency;
import std.conv;
import std.exception;
import std.file;
import std.parallelism;
import std.stdio: File;
import std.zlib: UnCompress;
struct GzipbyLine(R) {
File f;
UnCompress decompress;
R line;
uint _bufsize;
this(string gzipfn, uint bufsize=0x4000) {
enforce(gzipfn.isFile);
f = File(gzipfn,"r");
decompress = new UnCompress();
_bufsize = bufsize;
}
@disable this(this); // disable copy semantics;
int opApply(scope int delegate(int line, R) dg) {
int line = 0;
// chunk_byLine takes a buffer and splits on \n.
R chunk_byLine(R head, R rest) {
auto split = findSplitAfter(rest,"\n");
// If a new line is found split the in left and right.
auto left = split[0]; // includes eol splitter
auto right = split[1];
if (left.length > 0) { // we have a match!
dg(line++, head ~ left);
return chunk_byLine([], right);
}
// no match
return head ~ right;
}
R tail; // tail of previous buffer
foreach (ubyte[] buffer; f.byChunk(_bufsize))
{
auto buf = cast(R)decompress.uncompress(buffer);
tail = chunk_byLine(tail,buf);
}
if (tail.length > 0) dg(line++, tail);
return 0;
}
}
unittest {
import std.algorithm.comparison : equal;
// writeln("Testing GzipbyLine");
int[] a = [ 1, 2, 4, 7, 7, 2, 4, 7, 3, 5];
auto b = findSplitAfter(a, [7]);
assert(equal(b[0],[1, 2, 4, 7]));
assert(equal(b[1],[7, 2, 4, 7, 3, 5]));
auto b1 = findSplitAfter(b[1], [7]);
assert(equal(b1[0],[7]));
assert(equal(b1[1],[2, 4, 7, 3, 5]));
auto b2 = findSplitAfter([2, 4, 3], [7]);
assert(equal(b2[0],cast(ubyte[])[]));
assert(equal(b2[1],[2,4,3]));
uint chars = 0;
int lines = 0;
/*
foreach(line, ubyte[] s; GzipbyLine!(ubyte[])("test/data/BXD_geno.txt.gz")) {
// test file contains 7320 lines 4707218 characters
// write(cast(string)s);
chars += s.length;
lines = line;
}
*/
// These fail on recent versions of ldc
// assert(lines == 7319,"genotype lines " ~ to!string(lines+1)); // fails with ldc2 < 1.10!
// assert(chars == 4707218,"chars " ~ to!string(chars));
}
/**
Mmfile threaded version of streaming line reader which can be used
for gzipped files. Note the current edition is slower than
GzipbyLine above and (still) uses the garbage collector. It may
help to switch it off or to use the BioD decompressor used by bgzf.
Conversion can happen between different encodings, provided the
line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
ByLineImpl and readln function from std.stdio.
*/
import std.mmfile;
import core.thread;
struct GzipbyLineThreaded(R) {
string fn;
UnCompress decompress;
R line;
// Nullable!ubyte[] uncompressed_buf;
uint _bufsize;
this(string gzipfn, uint bufsize=0x4000) {
enforce(gzipfn.isFile);
fn = gzipfn;
decompress = new UnCompress();
_bufsize = bufsize;
}
@disable this(this); // disable copy semantics;
int opApply(scope int delegate(int line, R) dg) {
int line = 0;
// chunk_byLine takes a buffer and splits on \n.
R chunk_byLine(R head, R rest) {
auto split = findSplitAfter(rest,"\n");
// If a new line is found split the in left and right.
auto left = split[0]; // includes eol splitter
auto right = split[1];
if (left.length > 0) { // we have a match!
dg(line++, head ~ left);
return chunk_byLine([], right);
}
// no match
return head ~ right;
}
R decompressor(ubyte[] buffer) {
return cast(R)decompress.uncompress(buffer);
}
auto mmf = new MmFile(fn);
immutable mmf_length = mmf.length();
long rest = mmf_length;
R tail; // tail of previous buffer
// Decompress the first chunk
auto buffer1 = cast(ubyte[])mmf[0.._bufsize];
rest -= buffer1.length;
auto buf = decompressor(buffer1);
uint chunknum = 1;
while(rest>0) {
// Get the next chunk
ulong pos2 = (chunknum+1)*_bufsize;
if (pos2 > mmf_length) pos2 = cast(ulong)mmf_length;
auto buffer2 = cast(ubyte[])mmf[chunknum*_bufsize..mmf_length];
rest -= buffer2.length;
// Set up decompressing the next chunk
auto t = task(&decompressor, buffer2);
// auto t = task!decompressor(buffer2);
t.executeInNewThread();
// now invoke the delegate
tail = chunk_byLine(tail,buf);
buf = t.yieldForce();
chunknum += 1;
}
tail = chunk_byLine(tail,buf);
if (tail.length > 0) dg(line++, tail);
return 0;
}
}
unittest {
int lines = 0;
uint chars = 0;
/*
foreach(line, ubyte[] s; GzipbyLineThreaded!(ubyte[])("test/data/BXD_geno.txt.gz")) {
// test file contains 7320 lines 4707218 characters
// write(cast(string)s);
chars += s.length;
lines = line;
}
*/
/*
These fail on recent versions of ldc
assert(lines == 7319,"genotype lines " ~ to!string(lines+1));
assert(chars == 4707218,"chars " ~ to!string(chars));
*/
}
|