File: decompress.d

package info (click to toggle)
sambamba 1.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,528 kB
  • sloc: sh: 220; python: 166; ruby: 147; makefile: 103
file content (218 lines) | stat: -rw-r--r-- 5,840 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/*
    This file is part of BioD.

    Copyright (C) 2018 Pjotr Prins <pjotr.prins@thebird.nl>
*/

module bio.std.decompress;

/**
   Streaming line reader which can be used for gzipped files. Note the
   current edition (still) uses the garbage collector. It may help to
   switch it off or to use the BioD decompressor used by bgzf.

   For a comparison with gzip a 2GB file decompressed with

   real    0m53.701s
   user    0m53.820s
   sys     0m0.572s

   while gzip took

   real    0m11.528s
   user    0m10.288s
   sys     0m0.936s

   So, that is something to aim for.

   Conversion can happen between different encodings, provided the
   line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
   ByLineImpl and readln function from std.stdio.
*/

import std.algorithm;
// import std.concurrency;
import std.conv;
import std.exception;
import std.file;
import std.parallelism;
import std.stdio: File;
import std.zlib: UnCompress;

struct GzipbyLine(R) {

  File f;
  UnCompress decompress;
  R line;
  uint _bufsize;

  this(string gzipfn, uint bufsize=0x4000) {
    enforce(gzipfn.isFile);
    f = File(gzipfn,"r");
    decompress = new UnCompress();
    _bufsize = bufsize;
  }

  @disable this(this); // disable copy semantics;

  int opApply(scope int delegate(int line, R) dg) {

    int line = 0;
    // chunk_byLine takes a buffer and splits on \n.
    R chunk_byLine(R head, R rest) {
      auto split = findSplitAfter(rest,"\n");
      // If a new line is found split the in left and right.
      auto left = split[0]; // includes eol splitter
      auto right = split[1];
      if (left.length > 0) { // we have a match!
        dg(line++, head ~ left);
        return chunk_byLine([], right);
      }
      // no match
      return head ~ right;
    }

    R tail; // tail of previous buffer
    foreach (ubyte[] buffer; f.byChunk(_bufsize))
    {
      auto buf = cast(R)decompress.uncompress(buffer);
      tail = chunk_byLine(tail,buf);
    }
    if (tail.length > 0) dg(line++, tail);
    return 0;
  }
}


unittest {

  import std.algorithm.comparison : equal;

  // writeln("Testing GzipbyLine");
  int[] a = [ 1, 2, 4, 7, 7, 2, 4, 7, 3, 5];
  auto b = findSplitAfter(a, [7]);
  assert(equal(b[0],[1, 2, 4, 7]));
  assert(equal(b[1],[7, 2, 4, 7, 3, 5]));
  auto b1 = findSplitAfter(b[1], [7]);
  assert(equal(b1[0],[7]));
  assert(equal(b1[1],[2, 4, 7, 3, 5]));
  auto b2 = findSplitAfter([2, 4, 3], [7]);
  assert(equal(b2[0],cast(ubyte[])[]));
  assert(equal(b2[1],[2,4,3]));

  uint chars = 0;
  int lines = 0;
  /*
  foreach(line, ubyte[] s; GzipbyLine!(ubyte[])("test/data/BXD_geno.txt.gz")) {
    // test file contains 7320 lines 4707218 characters
    // write(cast(string)s);
    chars += s.length;
    lines = line;
  }
  */
  // These fail on recent versions of ldc
  // assert(lines == 7319,"genotype lines " ~ to!string(lines+1)); // fails with ldc2 < 1.10!
  // assert(chars == 4707218,"chars " ~ to!string(chars));
}

/**
   Mmfile threaded version of streaming line reader which can be used
   for gzipped files. Note the current edition is slower than
   GzipbyLine above and (still) uses the garbage collector. It may
   help to switch it off or to use the BioD decompressor used by bgzf.

   Conversion can happen between different encodings, provided the
   line terminator is ubyte = '\n'. GzipbyLine logic is modeled on
   ByLineImpl and readln function from std.stdio.
*/

import std.mmfile;
import core.thread;

struct GzipbyLineThreaded(R) {

  string fn;
  UnCompress decompress;
  R line;
  // Nullable!ubyte[] uncompressed_buf;
  uint _bufsize;

  this(string gzipfn, uint bufsize=0x4000) {
    enforce(gzipfn.isFile);
    fn = gzipfn;
    decompress = new UnCompress();
    _bufsize = bufsize;
  }

  @disable this(this); // disable copy semantics;

  int opApply(scope int delegate(int line, R) dg) {

    int line = 0;
    // chunk_byLine takes a buffer and splits on \n.
    R chunk_byLine(R head, R rest) {
      auto split = findSplitAfter(rest,"\n");
      // If a new line is found split the in left and right.
      auto left = split[0]; // includes eol splitter
      auto right = split[1];
      if (left.length > 0) { // we have a match!
        dg(line++, head ~ left);
        return chunk_byLine([], right);
      }
      // no match
      return head ~ right;
    }

    R decompressor(ubyte[] buffer) {
      return cast(R)decompress.uncompress(buffer);
    }

    auto mmf = new MmFile(fn);
    immutable mmf_length = mmf.length();
    long rest = mmf_length;
    R tail; // tail of previous buffer

    // Decompress the first chunk
    auto buffer1 = cast(ubyte[])mmf[0.._bufsize];
    rest -= buffer1.length;
    auto buf = decompressor(buffer1);

    uint chunknum = 1;
    while(rest>0) {
      // Get the next chunk
      ulong pos2 = (chunknum+1)*_bufsize;
      if (pos2 > mmf_length) pos2 = cast(ulong)mmf_length;
      auto buffer2 = cast(ubyte[])mmf[chunknum*_bufsize..mmf_length];
      rest -= buffer2.length;
      // Set up decompressing the next chunk
      auto t = task(&decompressor, buffer2);
      // auto t = task!decompressor(buffer2);
      t.executeInNewThread();
      // now invoke the delegate
      tail = chunk_byLine(tail,buf);
      buf = t.yieldForce();
      chunknum += 1;
    }
    tail = chunk_byLine(tail,buf);
    if (tail.length > 0) dg(line++, tail);
    return 0;
  }
}

unittest {
  int lines = 0;
  uint chars = 0;
  /*
  foreach(line, ubyte[] s; GzipbyLineThreaded!(ubyte[])("test/data/BXD_geno.txt.gz")) {
    // test file contains 7320 lines 4707218 characters
    // write(cast(string)s);
    chars += s.length;
    lines = line;
  }
  */
  /*
  These fail on recent versions of ldc
  assert(lines == 7319,"genotype lines " ~ to!string(lines+1));
  assert(chars == 4707218,"chars " ~ to!string(chars));
  */
}