File: BeeStorage.py

package info (click to toggle)
egenix-mx-base 3.2.8-1
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 8,420 kB
  • ctags: 6,208
  • sloc: ansic: 22,304; python: 18,124; sh: 137; makefile: 121
file content (1164 lines) | stat: -rw-r--r-- 36,341 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
# -*- coding: latin-1 -*-

""" BeeStorage - Flatfile data storage facility.

    Definitions:
    
      block: minimal amount of storage allocated in the file
      record: Header + content + padding

    Copyright (c) 1998-2000, Marc-Andre Lemburg; mailto:mal@lemburg.com
    Copyright (c) 2000-2014, eGenix.com Software GmbH; mailto:info@egenix.com
    See the documentation for further information on copyrights,
    or contact the author. All Rights Reserved.

"""
import cPickle,cStringIO,struct,exceptions,types,sys,marshal,re
import FileLock,Cache
from mx import Tools
freeze = Tools.freeze
from mx.Log import *

# Blocksize used to improve alignment of records (NOTE: Don't change this
# parameter, or the implementation will corrupt any existing storage file
# using a different block size !!!
BLOCKSIZE = 32

# Default minimal record size (must be multiple of BLOCKSIZE)
MINRECORDSIZE = BLOCKSIZE

# File header size to allocate. This should be large enough to
# hold data for future enhancements.
FILEHEADERSIZE = 1024

# Start of data according to the storage layout
STARTOFDATA = FILEHEADERSIZE + BLOCKSIZE

# Cache management (only used if caches are enabled)
MAXCACHESIZE = 1000
CACHELIMIT = 1000

# Codes
ID      = '\333'
VALID   = '\370'
OLD     = '\373'

# Special markers; these are written to the first block after the
# fileheader (position FILEHEADERSIZE) and must have equal size
COLD_MARKER =   '*cold***'
HOT_MARKER =    '*hot****'

# States
HOT = 1
COLD = 0

# Output debugging info
_debug = 0

### Errors

class Error(exceptions.StandardError):

    """ Baseclass for Errors related to this module.
    """
    pass

class RunRecoveryError(Error):

    """ Error raised in case the storage file is damaged and recovery
        could be possible.
    """
    pass

### Classes

def dummy_callback(old_position,new_position,raw_data):

    """ This callback is used by the .collect() method to
        inform an object using the storage facility of a change
        in record layout.

        raw_data contains the raw data contents of the record.
        Use storage.decode() to decode it.
    """
    return
        
class BeeStorage:

    """ Baseclass for flatfile data storage facilities.

        This class implements a simple database which can store and
        restore objects. The database is designed to be easily
        reconstructable in case something goes terribly wrong.

        File layout:

         [Fileheader] (length FILEHEADERSIZE)
         [Marker block] (length BLOCKSIZE)
         STARTOFDATA: ...[datablock] (length mulitple of BLOCKSIZE)...

        Datablocks layout:
        
         [ID] (1 byte)
         [length of whole block] (4 bytes, little endian)
         [CODE] (1 bytes)
         [raw data]
        
        XXX Todo:

        * Implement write cache.
        * Add more recovery tools
    
    """
    version = '1.2'                     # Version number; increase whenever
                                        # the internal layout changes

    filename = None                     # Filename of the file used
    file = None                         # Open file
    EOF = 0                             # EOF address
    filelock = None                     # BeeStorageLock instance in case
                                        # locking is enabled
    min_recordsize = MINRECORDSIZE      # Minimal record size
    caching = 0                         # Is caching enabled ?
    readonly = 0                        # Operate in read-only mode ?
    state = None                        # State in which the file is in
    is_new = 0                          # Was the file created by the
                                        # constructor, or just reopened ?
    
    # Caches
    header_cache = None
    record_cache = None

    def __init__(self,filename,lock=0,cache=0,min_recordsize=MINRECORDSIZE,
                 readonly=0,recover=0):

        """ Create an instance using filename as data file.
        
            If lock is true, a filelock will be enabled until the file
            is closed. cache controls whether to enable a cache or not
            (should only be used where OS level caches are not
            available).

            min_recordsize can be set to have all records be padded to
            at least this size (might reduce overhead due to
            reallocation of records that have become too small for
            updates).

            readonly can be set to true to have existing files opened
            in read-only mode. Writes will then cause an IOError,
            opening a non-existing file in read-only mode will too.

            Opening the storage file in recover mode will disable some
            of the checks normally done. This flag should only be used
            if a previous normal opening failed with a hint to run
            recovery.

        """#'

        self.readonly = readonly

        if _debug:
            log.call(SYSTEM_DEBUG)

        # Adjust min_recordsize
        if min_recordsize < MINRECORDSIZE:
            min_recordsize = MINRECORDSIZE
        if min_recordsize % BLOCKSIZE != 0:
            min_recordsize = (min_recordsize / BLOCKSIZE + 1) * BLOCKSIZE
        self.min_recordsize = min_recordsize
        
        # Lock the file
        if lock:
            self.filelock = filelock = FileLock.FileLock(filename)
            # This may raise a FileLock.Error
            if recover:
                try:
                    filelock.lock()
                except FileLock.Error:
                    filelock.remove_lock()
                    filelock.lock()
            else:
                filelock.lock()
            
        # Open it:
        #  first try for an existing file, then revert to creating a new one
        if readonly:
            mode = 'rb'
        else:
            mode = 'r+b'
        self.filename = filename
        try:
            # Existing file
            self.file = file = open(filename,mode)
            file.seek(0,2)
            self.EOF = EOF = file.tell()

        except IOError,why:
            if readonly:
                raise IOError,why
            # New file: write header and state marker
            if _debug:
                log(SYSTEM_INFO,'Creating a new storage file %s' % filename)
            self.file = file = open(filename,'w+b')
            self.write_fileheader(file)
            self.mark(COLD)
            EOF = file.tell()
            if EOF % BLOCKSIZE != 0:
                # pad to block boundary
                file.write(' '*(BLOCKSIZE - EOF % BLOCKSIZE))
                EOF = file.tell()
            self.EOF = EOF
            self.is_new = 1

        else:
            self.is_new = 0
            
        # Create caches
        if cache:
            self.caching = 1
            self.header_cache = header_cache = Cache.Cache(MAXCACHESIZE)
            self.record_cache = record_cache = Cache.Cache(MAXCACHESIZE,
                                                           CACHELIMIT)
            self.caches = [header_cache,record_cache]

        # Sanity check
        if EOF % BLOCKSIZE != 0 and not recover:
            raise RunRecoveryError,\
                  'storage file is damaged; run recovery ! (EOF=%i)' % EOF

        # Check mark
        file.seek(FILEHEADERSIZE)
        marker = file.read(len(COLD_MARKER))
        if marker != COLD_MARKER and \
           not (readonly and marker == HOT_MARKER) and \
           not recover:
            raise RunRecoveryError,\
                  'storage file is damaged; run recovery ! (marker=%s)' % \
                  repr(marker)

        # Set state to COLD
        self.state = COLD

        # Header check
        self.check_fileheader(file)

    def mark(self,state=HOT,

             HOT=HOT,HOT_MARKER=HOT_MARKER,COLD_MARKER=COLD_MARKER,
             FILEHEADERSIZE=FILEHEADERSIZE):

        """ Change the state of the storage file.

            The state indicates whether the file has changed in
            a way that needs proper shutdown (HOT). An unchanged
            or stable file should be marked COLD.

            This is an internal interface, use .start/end_transaction()
            for externally setting the state.

        """
        if self.state == state:
            return
        if self.readonly:
            raise Error,'storage is read-only'
        if _debug:
            log(SYSTEM_DEBUG,
                'Marking the file "%s": %s',
                self.filename,((state==HOT)*'HOT' or 'COLD'))
        self.file.seek(FILEHEADERSIZE)
        if state == HOT:
            self.file.write(HOT_MARKER)
        else:
            self.file.write(COLD_MARKER)
        self.state = state

    def recover(self,callback=dummy_callback):

        """ Run recovery.

            callback is a call back function that will be called for
            every valid record and has the same signature as the one
            used for .collect().

            To open a storage file in recovery mode, pass the keyword
            'recover=1' to the constructor.
            
        """
        self.collect(callback,recover=1)

    def start_transaction(self,

                          HOT=HOT):

        """ Start a sequence of storage manipulation commands.

            Note that every write/free command automatically starts
            a transaction sequence.
        """
        self.mark(HOT)

    def end_transaction(self,

                        COLD=COLD):

        """ End a sequence of storage manipulation commands.
        """
        self.mark(COLD)

    def write_fileheader(self,file):

        """ Write a new header to the open file.

            Changes the file's position: moves the file's position to
            the start of the data area.
            
        """
        # The fileheader (also see header_check below):
        fileheader = ('%s version %s\n'
                      'blocksize %i\n' % (self.__class__.__name__,
                                          self.version,
                                          BLOCKSIZE))
        # Pad to FILEHEADERSIZE bytes length
        fileheader = fileheader + \
                     ' '*(FILEHEADERSIZE - len(fileheader) - 1) + '\n'

        # Make sure we start on a block boundary
        if FILEHEADERSIZE % BLOCKSIZE != 0:
            fileheader = fileheader + '\0' * \
                         ((FILEHEADERSIZE / BLOCKSIZE + 1) * BLOCKSIZE \
                          - FILEHEADERSIZE)
        file.seek(0)
        file.write(fileheader)

    header_check = re.compile(('(\w+) version ([\w.]+)\n'
                               'blocksize (\d+)\n'))
    
    def check_fileheader(self,file):

        """ Checks the file header and verifies that all parameters are
            the same.

            Changes the file's position.
        """
        file.seek(0)
        fileheader = file.read(FILEHEADERSIZE)
        if len(fileheader) != FILEHEADERSIZE:
            raise Error,'header is damaged: "%s"' % fileheader
        m = self.header_check.match(fileheader)
        if m is None:
            raise Error,'wrong header format: "%s"' % fileheader
        name,version,blocksize = m.groups()
        if name != self.__class__.__name__:
            raise Error,'wrong storage class: %s (expected %s)' % \
                  (name,self.__class__.__name__)
        if version > self.version:
            raise Error,'wrong version: %s (expected %s)' % \
                  (version,self.version)
        if int(blocksize) != BLOCKSIZE:
            raise Error,'blocksize mismatch: %s (expected %i)'  % \
                  (blocksize,BLOCKSIZE)

    def encode(self,object,

               StringType=types.StringType,type=type):

        """ Encode an object giving a string.

            Since the records are usually larger than the data size,
            it is important to store the string length or at least
            mark the end of data in some way.

            This method must be overloaded in order to implement
            an encoding schemes.
        """
        raise Error,'.encode() needs to be overridden'

    def decode(self,data):

        """ Decode a string giving an object.

            The data string may well be larger than the string
            returned by the .encode method. This method will have to
            determine the true length on its own.

            This method must be overloaded in order to implement
            an encoding scheme.
        """
        raise Error,'.decode() needs to be overridden'

    def clear_cache(self):

        """ Clears the caches used (flushing any data not yet
            written).
        """
        if self.caching:
            #self.flush()
            Tools.method_mapply(self.caches,'clear',())

    def close(self,

              COLD=COLD,method_mapply=Tools.method_mapply):

        """ Flush buffers and close the associated file.
        """
        if self.caching:
            self.flush()
            method_mapply(self.caches,'clear',())
        if self.file:
            # Mark COLD
            if not self.readonly and self.state != COLD:
                self.mark(COLD)
            del self.file
        if self.filelock:
            self.filelock.unlock()
            del self.filelock

    def __del__(self):

        if _debug:
            log(SYSTEM_DEBUG,'del %s',self)
        if self.file:
            self.close()

    def flush(self):

        """ Flush all buffers.
        """
        return

    def __repr__(self):

        return '<%s instance for "%s" at 0x%x>' % (self.__class__.__name__,
                                                   self.filename,
                                                   id(self))

    def read_header(self,position,

                    unpack=struct.unpack,BLOCKSIZE=BLOCKSIZE,
                    ID=ID,headertypes=(OLD,VALID)):

        """ Read the header located at position and return
            a tuple (record size, statebyte, data area size).
            
            statebyte is one of the state constants. record size
            is the total number of bytes reserved for the record,
            data area size the number of bytes in its data area.

            self.file is positioned to point to the records data area.

            May raise errors if the position is invalid.

        """
        if self.caching:
            header = self.header_cache.get(position,None)
            if header is not None:
                self.file.seek(position+6)
                return header

        # Sanity check
        if position % BLOCKSIZE != 0 or \
           position > self.EOF:
            raise Error,'invalid position: %i' % position

        # Read and check header
        file = self.file
        file.seek(position)
        header = file.read(6)
        if not header:
            raise EOFError,'position %i is beyond EOF' % position
        if header[0] != ID or header[5] not in headertypes:
            raise Error,'invalid header at %i: %s' % \
                        (position,repr(header))
        recordsize = unpack('<l',header[1:5])[0]

        header = (recordsize, header[5], recordsize - 6)
        if self.caching:
            self.header_cache.put(position,header)
        return header

    def write_header(self,position,recordsize,rtype=VALID,

                     pack=struct.pack,
                     ID=ID,HOT=HOT):

        """ Write a plain header to position.

            The header will mark the record as being of size recordsize
            and having rtype. No data part is written; the file pointer is
            positioned to the start of the data part.

            The header cache is updated, yet marking the file as HOT
            is *not* done. Sanity checks are not performed either.

            This method is intended for internal use only.

        """
        file = self.file
        file.seek(position)
        file.write(''.join((ID,pack('<l',recordsize),rtype)))
        
        if self.caching:
            self.header_cache.put(position,(recordsize,rtype,recordsize-6))

    def write_record(self,data,position,minsize=0,rtype=VALID,

                     BLOCKSIZE=BLOCKSIZE,pack=struct.pack,
                     ID=ID,HOT=HOT):

        """ Write a record of given rtype (defaults to VALID)
            containing data to position.

            data is not encoded; caches are not used.  position may be
            EOF in which case the data is appended to the storage file
            (with proper padding). minsize can be set to a value
            greater than len(data) to have the allocation mechanism
            reserve more space for data in the record.
            
        """
        file = self.file
        EOF = self.EOF
        datalen = len(data)
        if minsize and datalen < minsize:
            datalen = minsize

        # Mark HOT
        if self.state != HOT:
            self.mark(HOT)

        # Record is to be updated
        recordsize = None
        datasize = None
        if position < EOF:
            recordsize,rt,datasize = self.read_header(position)
            if datasize < datalen:
                # Mark valid record as old
                if rt == VALID:
                    if _debug:
                        log(SYSTEM_INFO,
                            'Could not update %i record in place: '
                            'old size = %i, datalen = %i, data = %r',
                            position,datasize,datalen,data)
                    self.free(position)
                # revert to appending...
                position = EOF
                recordsize = None
                datasize = None
                
        elif position > EOF:
            raise ValueError('cannot write to position outside file: '
                             'position=%r, EOF=%r' %
                             (position, EOF))

        # Calculate recordsize, if not available
        if recordsize is None:
            recordsize = datalen + 6
            if recordsize < self.min_recordsize:
                recordsize = self.min_recordsize
            if recordsize % BLOCKSIZE != 0:
                recordsize = (recordsize / BLOCKSIZE + 1) * BLOCKSIZE
            datasize = recordsize - 6

        # Write the header + data + padding
        file.seek(position)
        file.write(''.join(
            (ID,pack('<l', recordsize), rtype,     # Header
             data,                                 # Data
             '\0' * (datasize - datalen)           # Padding
             )))
        if self.caching:
            self.header_cache.put(position, (recordsize, rtype, datasize))

        # Update EOF
        if position >= EOF:
            self.EOF = file.tell()
            if _debug:
                log(SYSTEM_DEBUG,'New EOF = %i',self.EOF)
        if _debug:
            log(SYSTEM_DEBUG,'Data written to position %i: %r',
                position, data)

        return position

    def read_record(self,position,rtype=VALID):

        """ Read the raw data from record position having the given
            rtype (defaults to VALID).

            An error is raised in case the record does not have the
            correct rtype or is not found.  The data is not decoded;
            caches are not used.
            
        """
        file = self.file
        recordsize,rt,datasize = self.read_header(position)
        if rtype != rt:
            raise Error(
                'record has wrong type, expected %r, found %r' %
                (rtype, rt))
        
        # Read the record
        return file.read(datasize)

    def free(self,position,

             OLD=OLD,HOT=HOT):

        """ Deletes an already written record by marking it OLD.

            The next garbage collection will make the change permanent
            and free the occupied space.

        """
        if self.state != HOT:
            self.mark(HOT)
        file = self.file
        file.seek(position + 5)
        file.write(OLD)

        if self.caching:
            Tools.method_mapply(self.caches,'delete',(position,))

    # Aliases
    delete = free
    __delitem__ = free

    def write(self,obj,position=None):

        """ Write the encoded object to the file and
            return the file address where the data was written.

            If position is given or None, the object is assumed to be
            replacing an old data record. The implementation tries to
            use the old record for writing the new data. In case it
            doesn't fit the old record is marked OLD and another
            record is used.  The return value will be different from
            the passed position in the latter case.

            Note: Records that are marked OLD will be collected by the
            next run of the garbage collection.
            
        """
        data = self.encode(obj)

        if position is None:
            position = self.EOF
        position = self.write_record(data,position)

        if self.caching:
            self.record_cache.put(position,data)

        return position

    # Aliases
    append = write
    add = write

    def __setitem__(self,position,obj):

        self.write(obj,position)

    def read(self,position,

             NotCached=Cache.NotCached):

        """ Load an object from the file at the given position.
        """
        if self.caching:
            data = self.record_cache.get(position,NotCached)
            if data is not NotCached:
                return self.decode(data)

        file = self.file
        recordsize,rtype,datasize = self.read_header(position)
        data = file.read(datasize)

        if self.caching:
            self.record_cache.put(position,data)

        return self.decode(data)

    # Alias
    __getitem__ = read
        
    def find_records(self,start=STARTOFDATA,stop=sys.maxint):

        """ Scans the data file for valid, old and invalid records and
            returns a list of positions to these records.

            start and end can be given to narrow down the search
            space.
            
        """
        EOF = self.EOF
        if start < STARTOFDATA:
            start = STARTOFDATA
        if stop > EOF:
            stop = EOF
        position = start
        valid = []
        invalid = []
        old = []
        read_header = self.read_header
        old_append = old.append
        valid_append = valid.append
        invalid_append = invalid.append
        # Adjust position to next block boundary
        if position % BLOCKSIZE != 0:
            position = (position / BLOCKSIZE + 1) * BLOCKSIZE

        while position < stop:
            try:
                recordsize,rtype,datasize = read_header(position)
            except Error:
                # No record found at that position: try next block
                position = position + BLOCKSIZE
                invalid_append((position,BLOCKSIZE))
                continue
            if rtype == VALID:
                valid_append((position,recordsize))
            elif rtype == OLD:
                old_append((position,recordsize))
            position = position + recordsize

        return valid,old,invalid

    def statistics(self):

        """ Scans the data file for valid, old and invalid records and
            returns a tuple valid, old, invalid indicating the number
            of bytes for each class of records/blocks.
        """
        position = STARTOFDATA
        EOF = self.EOF
        valid = 0
        invalid = 0
        old = 0
        read_header = self.read_header
        while position < EOF:
            try:
                recordsize,rtype,datasize = read_header(position)
            except Error:
                # No record found at that position: try next block
                position = position + BLOCKSIZE
                invalid = invalid + BLOCKSIZE
                continue
            if rtype == VALID:
                valid = valid + recordsize
            elif rtype == OLD:
                old = old + recordsize
            position = position + recordsize

        return valid,old,invalid

    def collect(self,callback=dummy_callback,recover=0):

        """ Collect garbage that accumulated since the last .collect()
            run.

            Garbage is collected by moving all VALID records to the
            beginning of the file and then truncating it to the new
            (reduced) size.

            Collecting will be done without using the cache. It also
            starts a new transaction (if not already marked HOT).

            For every move operation the callback is called with arguments
            (old_position,new_position,raw_data). raw_data is the raw
            data stored in the record that is being moved; use .decode[_key]
            to decode it.

            If recover is true, the callback is called for all valid
            records, not just the ones that are actually being moved.

        """
        file = self.file
        if recover:
            # Don't trust self.EOF in recover mode
            file.seek(0,2)
            EOF = file.tell()
            if EOF % BLOCKSIZE != 0:
                # Pad file with \0 bytes
                padsize = BLOCKSIZE - EOF % BLOCKSIZE
                file.write('\0' * padsize)
                EOF = EOF + padsize
        else:
            EOF = self.EOF
        read_header = self.read_header
        source = dest = STARTOFDATA

        # Temporarily disable caching
        caching = self.caching
        if caching:
            self.clear_cache()
            self.caching = 0

        # Mark HOT
        if self.state != HOT:
            self.mark(HOT)

        # First align all VALID records to the "left"
        while source < EOF:
            try:
                recordsize,rtype,datasize = read_header(source)
            except Error:
                # Unallocated space: skip
                source = source + BLOCKSIZE
                if not recover:
                    log(SYSTEM_WARNING,
                        'Skipping unallocated/misaligned block at %i',source)
                continue

            if rtype == VALID:
                if source != dest:
                    # Move record (informing caller via callback)
                    file.seek(source)
                    record = file.read(recordsize)
                    file.seek(dest)
                    file.write(record)
                    callback(source,dest,record[6:])
                elif recover:
                    # Inform caller of all valid records found
                    file.seek(source)
                    record = file.read(recordsize)
                    callback(source,dest,record[6:])
                dest = dest + recordsize

            elif rtype == OLD:
                # Skip record
                pass
            
            # Process next record
            source = source + recordsize

        # Everything behind dest is now considered free space
        try:
            file.truncate(dest)
        except AttributeError:
            # Truncate is not supported: clear out the remaining
            # space to make it invalid and continue processing as if
            # the file were truncated.
            file.seek(dest)
            while dest < EOF:
                file.write('\0'*BLOCKSIZE)
                dest = dest + BLOCKSIZE
        EOF = dest
        if EOF % BLOCKSIZE != 0:
            if recover:
                # In recover mode we simply pad the file to align
                # the file's end to BLOCKSIZE
                file.seek(EOF)
                padsize = BLOCKSIZE - EOF % BLOCKSIZE
                file.write('\0' * padsize)
                EOF = EOF + padsize
            else:
                raise Error,'EOF malaligned after collect()'
        self.EOF = EOF
            
        # Reenable caching
        if caching:
            self.caching = 1

    def backup(self,archive=None,buffersize=8192):

        """ Issues a backup request using archiveext as filename
            extension.

            The archive file is a simple copy of the current storage
            file. If no name is given self.filename + '.backup' is
            used.

            buffersize gives the size of the buffer used for copying
            the file.

        """
        if not archive:
            archive = self.filename + '.backup'
        archfile = open(archive,'wb')

        # Mark HOT
        if self.state != HOT:
            self.mark(HOT)

        # Copy the file
        file = self.file
        file.seek(0)
        while 1:
            buffer = file.read(buffersize)
            if not buffer:
                break
            archfile.write(buffer)
        archfile.close()

###

class PickleMixin:

    """ Pickle encoding.

        Uses binary pickles.
    """
    def encode(self,object,

               dumps=cPickle.dumps):

        """ Encode an object giving a string.

            This method can be overloaded in order to implement
            other encoding schemes.
        """
        return dumps(object,1)

    def decode(self,object,

               loads=cPickle.loads):

        """ Decode a string giving an object.

            This method can be overloaded in order to implement
            other encoding schemes.
        """
        return loads(object)

class BeePickleStorage(PickleMixin,BeeStorage):

    """ Flatfile data storage facility for pickleable objects.
    """
    
freeze(BeePickleStorage)

###

class MarshalMixin:

    """ Marshal encoding.
    """

    def encode(self,object,

               dumps=marshal.dumps):

        """ Encode an object giving a string.

            This method can be overloaded in order to implement
            other encoding schemes.
        """
        return dumps(object)

    def decode(self,object,

               loads=marshal.loads):

        """ Decode a string giving an object.

            This method can be overloaded in order to implement
            other encoding schemes.
        """
        return loads(object)

class BeeMarshalStorage(MarshalMixin,BeeStorage):

    """ Flatfile data storage facility for marshallable objects.
    """

freeze(BeeMarshalStorage)

###

class BeeKeyValueStorage(BeeStorage):

    """ Flatfile storage for key,value pairs.

        keys and values must be pickleable object.

        The main difference between this class and the base class
        is that keys are readable separately from the values, e.g.
        values can be multi-MB big and are only read if this is really
        requested.

        NOTE: The .en/decode methods are NOT used. Uses binary
        pickles.

    """
    key_cache = None

    def __init__(self,*args,**kws):

        apply(BeeStorage.__init__,(self,)+args,kws)
        if self.caching:
            self.key_cache = key_cache = Cache.Cache(MAXCACHESIZE,CACHELIMIT)
            self.caches.append(key_cache)

    def write(self,key,value,position=None,

              dumps=cPickle.dumps):

        """ Write key and value to position. Returns the position under
            which the record was stored.

            If position is None, the implementation chooses a new
            one.
        """
        # Pack key and value into two separate pickles
        data = dumps(key,1) + dumps(value,1)

        # Write the record
        if position is None:
            position = self.EOF
        position = self.write_record(data,position)

        if self.caching:
            self.record_cache.put(position,data)
            self.key_cache.put(position,key)

        return position

    # Aliases
    append = write
    add = write

    def decode_key(self,raw_data,

                   loads=cPickle.loads):

        """ Decode the key part of a raw data record field.
        """
        return loads(raw_data)

    def read_key(self,position,

                 load=cPickle.load,NotCached=Cache.NotCached):

        """ Load the key part of an object from the file at the given
            position.
        """
        if self.caching:
            key = self.key_cache.get(position,NotCached)
            if key is not NotCached:
                return key

        # Position file reader and only read the key part
        self.read_header(position)
        key = load(self.file)

        if self.caching:
            self.key_cache.put(position,key)

        return key
        
    def read(self,position,

             load=cPickle.load,StringIO=cStringIO.StringIO,
             NotCached=Cache.NotCached):

        """ Load an object from the file at the given position and
            return it as tuple (key,value).
        """
        if self.caching:
            record = self.record_cache.get(position,NotCached)
            if record is not NotCached:
                file = StringIO(record)
                key = load(file)
                data = load(file)
                return key,data

        # Read the header and position the file over the data area
        recordsize,rtype,datasize = self.read_header(position)

        if self.caching:
            record = file.read(datasize)
            file = StringIO(record)
            key = load(file)
            data = load(file)
            self.record_cache.put(position,record)
            self.key_cache.put(position,key)
        else:
            file = self.file
            key = load(file)
            data = load(file)

        return key,data

    # Alias
    __getitem__ = read
        
    def read_value(self,position):

        """ Load the value part of an object from the file at the given
            position.
        """
        return self.read(position)[1]
        
freeze(BeeKeyValueStorage)

### tests

if __name__ == '__main__':

    f = BeePickleStorage('test-BeePickleStorage.dat',cache=1,lock=1)
    l = [1,'blabla','Hi there',2.3434,4+7j] + range(1000)
    k = map(f.write,l)
    m = map(f.read,k)
    if l != m:
        print 'BeePickleStorage:\n'
        print 'Results differ:'
        print 'orig:',l
        print 'rest:',m
    else:
        print 'BeePickleStorage works.'
    valid,old,invalid = f.find_records()
    print ' %i valid records, %i old, %i invalid' % (len(valid),len(old),len(invalid))
    print ' r cache hits:',f.record_cache.hits,' misses:',f.record_cache.misses
    print ' h cache hits:',f.header_cache.hits,' misses:',f.header_cache.misses
    
    print ' rewrite...'
    l = [1,'blabla','Hi there',2.3434,4+7j] + ['x'*100] * 1000
    k = map(lambda value,oldpos: f.write(value,oldpos),l,k)
    m = map(f.read,k)
    if l != m:
        print ' Results differ:'
        print '  orig:',l
        print '  rest:',m
    valid,old,invalid = f.find_records()
    print ' %i valid records, %i old, %i invalid' % (len(valid),len(old),len(invalid))
    print ' r cache hits:',f.record_cache.hits,' misses:',f.record_cache.misses
    print ' h cache hits:',f.header_cache.hits,' misses:',f.header_cache.misses

    print ' collect...'
    def callback(old,new,data,k=k):
        index = k.index(old)
        k[index] = new
    f.collect(callback)
    m = map(f.read,k)
    if l != m:
        print ' Results differ:'
        print '  orig:',l
        print '  rest:',m
    valid,old,invalid = f.find_records()
    print ' %i valid records, %i old, %i invalid' % (len(valid),len(old),len(invalid))
    print ' r cache hits:',f.record_cache.hits,' misses:',f.record_cache.misses
    print ' h cache hits:',f.header_cache.hits,' misses:',f.header_cache.misses

    print

    g = BeeKeyValueStorage('test-BeeKeyValueStorage.dat',cache=1)
    d = {}
    for i in range(256):
        d[str(i)] = 'As char: %s, as number: %i, octal: %s' %\
                    (repr(chr(i)),i,oct(i))
    l = []
    for k,v in d.items():
        l.append(g.write(k,v))
    for addr in l:
        k,v = g.read(addr)
        if d[k] != v:
            print 'BeeKeyValueStorage:\n'
            print 'Mismatch for "%s": "%s"' % (k,v)
    print 'BeeKeyValueStorage works.'
    valid,old,invalid = g.find_records()
    print ' %i valid records, %i old, %i invalid' % (len(valid),len(old),len(invalid))
    print ' r cache hits:',g.record_cache.hits,' misses:',g.record_cache.misses
    print ' h cache hits:',g.header_cache.hits,' misses:',g.header_cache.misses
    print

    del f,g