1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
#include "SoloFeature.h"
#include "streamFuns.h"
//#include "TimeFunctions.h"
//#include "SequenceFuns.h"
//#include "Stats.h"
//#include "GlobalVariables.h"
void SoloFeature::redistributeReadsByCB()
{//redistribute reads in files by CB - each file with the approximately the same number of reads, each CB is on one file only
/* SoloFeature vars that have to be setup:
* nCB
* readFeatSum->cbReadCount[]
*/
//find boundaries for cells
uint64 nReadRec=std::accumulate(readFeatSum->cbReadCount.begin(), readFeatSum->cbReadCount.end(), 0LLU);
//for ( auto &cbrc : readFeatSum->cbReadCount )
// nReadRec += cbrc;
uint64 nReadRecBin=nReadRec/pSolo.redistrReadsNfiles;
P.inOut->logMain << " Redistributing reads into "<< pSolo.redistrReadsNfiles <<"files; nReadRec="<< nReadRec <<"; nReadRecBin="<< nReadRecBin <<endl;
redistrFilesCBfirst.push_back(0);
redistrFilesCBindex.resize(nCB);
uint64 nreads=0;
uint32 ind=0;
for (uint32 icb=0; icb<nCB; icb++){
redistrFilesCBindex[icb]=ind;
nreads += readFeatSum->cbReadCount[indCB[icb]];
if (nreads>=nReadRecBin) {
ind++;
redistrFilesCBfirst.push_back(icb+1);
redistrFilesNreads.push_back(nreads);
nreads=0;
};
};
if (nreads>0) {
redistrFilesCBfirst.push_back(nCB);
redistrFilesNreads.push_back(nreads);
};
//open output files
redistrFilesStreams.resize(redistrFilesNreads.size());
for (uint32 ii=0; ii<redistrFilesNreads.size(); ii++) {
//open file with flagDelete=true
redistrFilesStreams[ii] = &fstrOpen(P.outFileTmp + "solo"+SoloFeatureTypes::Names[featureType]+"_redistr_"+std::to_string(ii), ERROR_OUT, P, true);
};
//main cycle
for (int ii=0; ii<P.runThreadN; ii++) {
readFeatAll[ii]->streamReads->clear();//this is needed if eof was reached before
readFeatAll[ii]->streamReads->seekg(0,ios::beg);
while ( true ) {
string line1;
getline(*readFeatAll[ii]->streamReads,line1);
if (line1.empty()) {
break;
};
istringstream line1stream(line1);
uint64 cb1, umi;
line1stream >> umi >> cb1 >> cb1;
if (featureType==SoloFeatureTypes::SJ)
line1stream >> cb1;
line1stream >> cb1;
*redistrFilesStreams[redistrFilesCBindex[indCBwl[cb1]]] << line1 <<'\n';
};
//TODO: delete streamReads files one by one to save disk space
};
//close files
//for (uint32 ii=0; ii<pSolo.redistrReadsNfiles; ii++)
// redistrFilesStreams[ii]->flush();
};
|