File: chunk.c

package info (click to toggle)
zsv 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 49,160 kB
  • sloc: ansic: 175,811; cpp: 56,301; sh: 3,623; makefile: 3,048; javascript: 577; cs: 90; awk: 70; python: 41; sql: 15
file content (240 lines) | stat: -rw-r--r-- 7,583 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// /src/app/utils/chunk.c: implements /src/app/utils/chunk.h

#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "chunk.h"

/**
 * @brief Checks if a character is a newline character ('\n' or '\r').
 * @param c The character to check.
 * @return int 1 if newline, 0 otherwise.
 */
static int zsv_is_newline(char c) {
  return (c == '\n' || c == '\r');
}

/**
 * @brief Scans forward from an initial offset to find the first position after a newline sequence.
 *
 * @param fp The open file pointer.
 * @param initial_offset The starting point of the search (nominal boundary).
 * @param boundary The absolute maximum file size (total_size).
 * @param only_crlf If non-zero, only treat \r\n as a newline.
 * @return zsv_file_pos The position after the newline sequence, or -1 if not found.
 */
static zsv_file_pos zsv_find_chunk_start(FILE *fp, zsv_file_pos initial_offset, zsv_file_pos boundary, int only_crlf) {
  char c;
  // Seek to the initial offset.
  if (fseek(fp, initial_offset, SEEK_SET) != 0) {
    return -1; // Seek error
  }

  // Scan forward for the start of a newline sequence
  while (ftell(fp) < boundary && fread(&c, 1, 1, fp) == 1) {
    if (only_crlf) {
      if (c == '\r') {
        // We found a CR. Check immediately if the next char is LF.
        char next;
        if (ftell(fp) < boundary && fread(&next, 1, 1, fp) == 1) {
          if (next == '\n') {
            // Found \r\n sequence. The chunk starts immediately after.
            return ftell(fp);
          }
          // The next char was NOT \n.
          // We must rewind one byte so the loop processes 'next' correctly
          // (in case 'next' is itself a \r starting a valid sequence).
          fseek(fp, -1, SEEK_CUR);
        }
      }
    } else {
      if (zsv_is_newline(c)) {
        // Found the start of a sequence. Scan past all consecutive newline characters.
        zsv_file_pos position_after_newline = ftell(fp);

        while (position_after_newline < boundary && fread(&c, 1, 1, fp) == 1) {
          if (zsv_is_newline(c)) {
            position_after_newline = ftell(fp); // Keep tracking position past the sequence
          } else {
            // Found the first non-newline character.
            // The new start is at the current position (one byte past the last read)
            // so we return the start of that character (ftell - 1).
            return ftell(fp) - 1;
          }
        }
        // If inner loop breaks due to EOF, return -1
        return -1;
      }
    }
  }

  // Reached EOF/boundary without finding a valid split point
  return -1;
}

static int zsv_read_first_line_at_offset(const char *filename, zsv_file_pos offset, char *buffer, size_t buf_size) {
  FILE *fp = fopen(filename, "rb");
  if (fp == NULL) {
    perror("zsv_read_first_line_at_offset: Failed to open file");
    return -1;
  }

  if (offset < 0 || fseek(fp, offset, SEEK_SET) != 0) {
    fprintf(stderr, "zsv_read_first_line_at_offset: Error: Invalid offset or fseek failed at %lld\n",
            (long long)offset);
    fclose(fp);
    return -1;
  }

  // Use fgets. It handles both \n and \r\n line endings appropriately.
  if (fgets(buffer, (int)buf_size, fp) == NULL) {
    if (feof(fp)) {
      buffer[0] = '\0'; // Empty chunk
    } else {
      perror("zsv_read_first_line_at_offset: fgets failed");
      fclose(fp);
      return -1;
    }
  }

  // Remove the trailing newline sequence (CRLF or LF) for clean output (DRY cleanup logic)
  size_t len = strlen(buffer);
  if (len > 0) {
    // Check for LF
    if (buffer[len - 1] == '\n') {
      buffer[--len] = '\0';
    }
    // Check for CR (handles both bare CR and the CR in CRLF)
    if (len > 0 && buffer[len - 1] == '\r') {
      buffer[len - 1] = '\0';
    }
  }

  fclose(fp);
  return 0;
}

// --- Public Library Implementations ---

struct zsv_chunk_position *zsv_guess_file_chunks(const char *filename, uint64_t N, uint64_t min_size,
                                                 zsv_file_pos initial_offset
#ifndef ZSV_NO_ONLY_CRLF
                                                 ,
                                                 int only_crlf
#endif
) {

#ifdef ZSV_NO_ONLY_CRLF
  int only_crlf = 0;
#endif
  if (N == 0)
    return NULL;

  // Open in binary mode ('rb') is crucial for accurate byte counts.
  FILE *fp = fopen(filename, "rb");
  if (fp == NULL) {
    perror("zsv_guess_file_chunks: Failed to open file");
    return NULL;
  }

  // 1. Get total file size using fstat()
  struct stat st;
  if (fstat(fileno(fp), &st) == -1) {
    perror("zsv_guess_file_chunks: fstat failed");
    fclose(fp);
    return NULL;
  }
  zsv_file_pos total_size = (zsv_file_pos)st.st_size;
  if (total_size < initial_offset) {
    perror("zsv_guess_file_chunks: initial_offset exceeds file size");
    fclose(fp);
    return NULL;
  }
  total_size -= initial_offset;

  if (total_size < (zsv_file_pos)min_size) {
    fprintf(stderr, "file size too small for parallelization\n");
    fclose(fp);
    return NULL;
  }

  // Allocate memory for the N chunk positions
  struct zsv_chunk_position *chunks = (struct zsv_chunk_position *)malloc(N * sizeof(*chunks));
  if (chunks == NULL) {
    perror("zsv_guess_file_chunks: malloc failed");
    fclose(fp);
    return NULL;
  }

  if (initial_offset)
    fseek(fp, initial_offset, SEEK_SET);

  zsv_file_pos base_size = total_size / N;
  zsv_file_pos current_offset = initial_offset;

  for (uint64_t i = 0; i < N; ++i) {
    chunks[i].start = current_offset;

    // Calculate the initial nominal boundary for this chunk
    zsv_file_pos nominal_boundary = (i == N - 1) ? total_size : (zsv_file_pos)((i + 1) * base_size);

    if (i < N - 1) {
      // Adjust the boundary for all but the last chunk
      // Pass the only_crlf flag down to the helper
      zsv_file_pos new_start_offset = zsv_find_chunk_start(fp, nominal_boundary, total_size, only_crlf);

      if (new_start_offset < 0) {
        // Warning: Could not find a valid split after nominal boundary
        // We use the nominal boundary, which might break a line
        chunks[i].end = nominal_boundary - 1;
        current_offset = nominal_boundary;
      } else {
        chunks[i].end = new_start_offset - 1;
        current_offset = new_start_offset;
      }
    } else {
      // The last chunk always ends at the total_size - 1 byte
      chunks[i].end = total_size + initial_offset > 0 ? total_size + initial_offset - 1 : 0;
    }

    // Defensive check for inverted start/end
    if (chunks[i].start > chunks[i].end && total_size > 0)
      chunks[i].end = chunks[i].start;
  }

  fclose(fp);
  return chunks;
}

void zsv_free_chunks(struct zsv_chunk_position *chunks) {
  if (chunks) {
    free(chunks);
  }
}

const char *zsv_chunk_status_str(enum zsv_chunk_status stat) {
  switch (stat) {
  case zsv_chunk_status_ok:
    return NULL;
  case zsv_chunk_status_no_file_input:
    return "Parallelization requires a file input";
  case zsv_chunk_status_overwrite:
    return "Parallelization cannot be used with overwrite";
  case zsv_chunk_status_max_rows:
    return "Parallelization cannot be used with -L,--limit-rows";
  }
  return NULL;
}

enum zsv_chunk_status zsv_chunkable(const char *inputpath, struct zsv_opts *opts) {
  if (!inputpath)
    return zsv_chunk_status_no_file_input;
  struct zsv_opt_overwrite o = {0};
  if (memcmp(&opts->overwrite, &o, sizeof(o)) || opts->overwrite_auto)
    return zsv_chunk_status_overwrite;
  if (opts->max_rows)
    return zsv_chunk_status_max_rows;
  return zsv_chunk_status_ok;
}