File: convert_to_pa_rand.c

package info (click to toggle)
libslow5lib 0.7.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 25,084 kB
  • sloc: ansic: 11,825; python: 1,179; sh: 547; makefile: 90; cpp: 40
file content (150 lines) | stat: -rw-r--r-- 4,309 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//loads a batch of reads where the read IDs are in genomic coordinate order (signal+information needed for pA conversion) from file, process the batch (convert to pA and sum), and write output
//only the time for loading a batch is measured
//make zstd=1
//gcc -Wall -O2 -g -I include/ -o convert_to_pa_rand test/bench/convert_to_pa_rand.c lib/libslow5.a  -lm -lz -lzstd -fopenmp
//only the time for loading a batch to memory (Disk I/O + decompression + parsing and filling the memory arrays) is measured

// to generate read id list: samtools view reads.sorted.bam | awk '{print $1}' > rid.txt

#include <stdio.h>
#include <stdlib.h>
#include <slow5/slow5.h>
#include <omp.h>
#include <sys/time.h>

static inline double realtime(void) {
    struct timeval tp;
    struct timezone tzp;
    gettimeofday(&tp, &tzp);
    return tp.tv_sec + tp.tv_usec * 1e-6;
}


int main(int argc, char *argv[]) {

    if(argc != 5) {
        fprintf(stderr, "Usage: %s in_file.blow5 rid_list.txt num_thread batch_size\n", argv[0]);
        return EXIT_FAILURE;
    }

    int batch_size = atoi(argv[4]);
    int num_thread = atoi(argv[3]);
    int ret=batch_size;
    omp_set_num_threads(num_thread);

    int read_count = 0;

    double *sums = malloc(sizeof(uint64_t)*batch_size);
    double tot_time = 0;
    double t0 = 0;

    //read id list
    FILE *fpr = fopen(argv[2],"r");
    if(fpr==NULL){
        fprintf(stderr,"Error in opening file %s for reading\n",argv[2]);
        perror("perr: ");;
        exit(EXIT_FAILURE);
    }

    char **rid = malloc(sizeof(char*)*batch_size);
    char tmp[1024];

    /**** Initialisation and opening of the file ***/
    t0 = realtime();

    slow5_file_t *sp = slow5_open(argv[1],"r");
    if(sp==NULL){
       fprintf(stderr,"Error in opening file\n");
       perror("perr: ");
       exit(EXIT_FAILURE);
    }

    ret = slow5_idx_load(sp);
    if(ret<0){
        fprintf(stderr,"Error in loading index\n");
        exit(EXIT_FAILURE);
    }

    tot_time += realtime() - t0;
    /**** End of init ***/

    while(1){

        int i=0;
        for(i=0; i<batch_size; i++){
            if (fscanf(fpr,"%s",tmp) < 1) {
                break;
            }
            rid[i] = strdup(tmp);
        }
        if(i==0) break;
        int ret = i;
        read_count += ret;

        /**** Fetching a batch (disk loading, decompression, parsing in to memory arrays) ***/

        t0 = realtime();
        slow5_rec_t **rec = (slow5_rec_t**)calloc(batch_size,sizeof(slow5_rec_t*));
        #pragma omp parallel for
        for(int i=0;i<ret;i++){
            if(slow5_get(rid[i], &rec[i], sp) < 0){
                fprintf(stderr,"Error fetching the read %s",rid[i]);
                exit(EXIT_FAILURE);
            }
        }
        tot_time += realtime() - t0;
        /**** Batch fetched ***/

        fprintf(stderr,"batch loaded with %d reads\n",ret);

        //process and print (time not measured as we want to compare to the time it takes to read the file)
        #pragma omp parallel for
        for(int i=0;i<ret;i++){
            uint64_t sum = 0;
            double scale = rec[i]->range / rec[i]->digitisation;
            for(int j=0; j<rec[i]->len_raw_signal; j++){
                sum += ((rec[i]->raw_signal[j] + rec[i]->offset) * scale);
            }
            sums[i] = sum;
        }
        fprintf(stderr,"batch processed with %d reads\n",ret);
        for(int i=0;i<ret;i++){
            fprintf(stdout,"%s\t%f\n",rec[i]->read_id,sums[i]);
        }
        fprintf(stderr,"batch printed with %d reads\n",ret);

        /**** Deinit ***/
        t0 = realtime();
        for(int i=0;i<ret;i++){
            slow5_rec_free(rec[i]);
        }
        free(rec);
        tot_time += realtime() - t0;
        /**** End of Deinit***/

        for(int i=0; i<ret; i++){
            free(rid[i]);
        }

        if(ret<batch_size){ //this indicates nothing left to read
            break;
        }

    }

    /**** Deinit ***/
    t0 = realtime();
    slow5_idx_unload(sp);
    slow5_close(sp);
    tot_time += realtime() - t0;
    /**** End of Deinit***/

    free(sums);
    free(rid);
    fclose(fpr);

    fprintf(stderr,"Reads: %d\n",read_count);
    fprintf(stderr,"Time for getting samples (disc+depress+parse) %f\n", tot_time);

    return 0;
}