1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
"""
Common helpers for working with ggml + numpy
"""
from ggml import ffi, lib
from typing import Union, Optional
import numpy as np
def init(mem_size: int, mem_buffer: ffi.CData = ffi.NULL, no_alloc: bool = False) -> ffi.CData:
"""
Initialize a ggml context, which will be freed automatically when the pointer is garbage collected.
"""
params = ffi.new('struct ggml_init_params*')
params.mem_size = mem_size
params.mem_buffer = mem_buffer
params.no_alloc = no_alloc
return ffi.gc(lib.ggml_init(params[0]), lib.ggml_free)
TensorLike = Union[ffi.CData, np.ndarray]
def copy(from_tensor: TensorLike, to_tensor: TensorLike, allow_requantize: bool = True):
"""
Copy the contents of one tensor to another, doing any necessary (de/re)quantization transparently.
Works across numpy & ggml tensors, but they must have the same shape (and be contiguous).
Parameters
----------
from_tensor : TensorLike
The tensor to copy from (a numpy array or possibly-quantized ggml tensor)
to_tensor : TensorLike
The tensor to copy to (a numpy array or possibly-quantized ggml tensor)
allow_requantize : bool
If False, will throw an error if requantization is required (i.e. both from_tensor
and to_tensor are quantized with different quantization types)
"""
if id(from_tensor) == id(to_tensor):
return
__expect_same_layout("source", from_tensor, "destination", to_tensor)
__check_shape_consistent_with_type(from_tensor)
__check_shape_consistent_with_type(to_tensor)
from_type = __get_type(from_tensor)
to_type = __get_type(to_tensor)
if from_type == to_type:
ffi.memmove(__get_data(to_tensor), __get_data(from_tensor), __get_nbytes(from_tensor))
else:
assert allow_requantize or not lib.ggml_is_quantized(from_type) or not lib.ggml_is_quantized(to_type), \
f"Requantizing from {__type_name(from_type)} to {__type_name(to_type)} is disabled. Force with allow_requantize=True"
__set_floats(to_tensor, __get_floats(from_tensor))
def numpy(tensor: ffi.CData, allow_copy: Union[bool, np.ndarray] = False, allow_requantize=False) -> np.ndarray:
"""
Convert a ggml tensor to a numpy array.
If the tensor isn't quantized, the returned numpy array will be a view over its data.
If it is quantized (and allow_copy is True), the copy will involve dequantization and the returned array will
be a copy of the original tensor (any changes to the numpy array won't then be reflected back to the tensor).
Parameters
----------
tensor : ffi.CData
The tensor to convert to a numpy array
allow_copy : bool or np.ndarray
If False, will throw an error if the tensor is quantized (since dequantization requires extra memory).
If True, will dequantize the tensor and return a copy of the data in a new float32 numpy array.
If an np.ndarray, will copy the data into the given array (which must be the same shape as the tensor) when dequantization is needed
allow_requantize : bool
If allow_copy is a tensor with a different quantization type than the source tensor, will throw an error unless allow_requantize is True.
"""
shape = __get_shape(tensor)
if lib.ggml_is_quantized(tensor.type):
if allow_copy == False:
raise ValueError(f"{__describe(tensor)} is quantized, conversion to numpy requires a copy (pass allow_copy=True; changes to the numpy array won't affect the original).")
elif isinstance(allow_copy, np.ndarray):
__expect_same_layout("source tensor", tensor, "dequantization output tensor", allow_copy)
destination = allow_copy
else:
destination = np.empty(shape, dtype=np.float32)
copy(tensor, destination, allow_requantize=allow_requantize)
return destination
else:
dtype = __type_to_dtype(tensor.type)
if not dtype:
raise NotImplementedError(f'Cannot convert {__describe(tensor)} to numpy')
assert __is_contiguous(tensor), f"Cannot convert {__describe(tensor)} to numpy (support contiguous tensors only)"
nbytes = lib.ggml_nelements(tensor) * lib.ggml_type_size(tensor.type)
array = np.frombuffer(ffi.buffer(lib.ggml_get_data(tensor), nbytes), dtype=dtype)
array.shape = shape
return array
def __type_name(type: int) -> str:
name = lib.ggml_type_name(type)
return ffi.string(name).decode('utf-8') if name else None
__k_quant_types = set([
lib.GGML_TYPE_Q2_K,
lib.GGML_TYPE_Q3_K,
lib.GGML_TYPE_Q4_K,
lib.GGML_TYPE_Q5_K,
lib.GGML_TYPE_Q6_K,
lib.GGML_TYPE_Q8_K,
])
__type_to_dtype_dict = {
lib.GGML_TYPE_I8: np.int8,
lib.GGML_TYPE_I16: np.int16,
lib.GGML_TYPE_I32: np.int32,
lib.GGML_TYPE_F16: np.float16,
lib.GGML_TYPE_F32: np.float32,
}
def __type_to_dtype(type: int) -> Optional[np.dtype]: return __type_to_dtype_dict.get(type)
def __dtype_to_type(dtype: np.dtype):
if dtype == np.float32: return lib.GGML_TYPE_F32
elif dtype == np.float16: return lib.GGML_TYPE_F16
elif dtype == np.int32: return lib.GGML_TYPE_I32
elif dtype == np.int16: return lib.GGML_TYPE_I16
elif dtype == np.int8: return lib.GGML_TYPE_I8
else: raise ValueError(f"Unsupported dtype: {dtype}")
def __describe(tensor: ffi.CType): return f'Tensor[{__type_name(__get_type(tensor))}, {__get_shape(tensor)}]'
def __get_type(tensor: TensorLike): return __dtype_to_type(tensor.dtype) if isinstance(tensor, np.ndarray) else tensor.type
def __get_shape(x: TensorLike): return x.shape if isinstance(x, np.ndarray) else tuple([x.ne[i] for i in range(x.n_dims)])
def __get_strides(x: TensorLike): return x.strides if isinstance(x, np.ndarray) else tuple([x.nb[i] for i in range(x.n_dims)])
def __get_data(x: TensorLike) -> ffi.CData: return ffi.from_buffer(x) if isinstance(x, np.ndarray) else lib.ggml_get_data(x)
def __get_nbytes(tensor: TensorLike): return tensor.nbytes if isinstance(tensor, np.ndarray) else lib.ggml_nbytes(tensor)
def __get_nelements(tensor: TensorLike): return tensor.size if isinstance(tensor, np.ndarray) else lib.ggml_nelements(tensor)
def __is_contiguous(tensor: TensorLike): return tensor.flags['C_CONTIGUOUS'] if isinstance(tensor, np.ndarray) else lib.ggml_is_contiguous(tensor)
def __get_floats(tensor: TensorLike) -> ffi.CData:
data, type = __get_data(tensor), __get_type(tensor)
if type == lib.GGML_TYPE_F32:
return ffi.cast('float*', data)
else:
nelements = __get_nelements(tensor)
floats = ffi.new('float[]', nelements)
if type == lib.GGML_TYPE_F16:
lib.ggml_fp16_to_fp32_row(ffi.cast('uint16_t*', data), floats, nelements)
elif lib.ggml_is_quantized(type):
qtype = lib.ggml_internal_get_type_traits(type)
assert qtype.to_float, f"Type {__type_name(type)} is not supported by ggml"
qtype.to_float(data, floats, nelements)
else:
raise NotImplementedError(f'Cannot read floats from {__describe(tensor)}')
return floats
def __set_floats(tensor: TensorLike, f32_data: ffi.CData) -> None:
data, type, nbytes = __get_data(tensor), __get_type(tensor), __get_nbytes(tensor)
if type == lib.GGML_TYPE_F32:
ffi.memmove(data, f32_data, nbytes)
else:
nelements = __get_nelements(tensor)
if type == lib.GGML_TYPE_F16:
lib.ggml_fp32_to_fp16_row(f32_data, ffi.cast('uint16_t*', data), nelements)
elif lib.ggml_is_quantized(type):
qtype = lib.ggml_internal_get_type_traits(type)
assert qtype.from_float, f"Type {__type_name(type)} is not supported by ggml"
qtype.from_float(f32_data, data, nelements)
else:
raise NotImplementedError(f'Cannot write floats to {__describe(tensor)}')
def __expect_same_layout(name1: str, tensor1: TensorLike, name2: str, tensor2: TensorLike):
shape1, shape2 = __get_shape(tensor1), __get_shape(tensor2)
assert shape1 == shape2, f"Shape mismatch: {name1} has {shape1} but {name2} has {shape2}"
assert __is_contiguous(tensor1) and __is_contiguous(tensor2), f"Only contiguous tensors are supported (got {name1} with strides {__get_strides(tensor1)} and {name2} with strides {__get_strides(tensor2)})"
def __check_shape_consistent_with_type(tensor: TensorLike):
type = __get_type(tensor)
if not lib.ggml_is_quantized(type):
return
shape = __get_shape(tensor)
block_size = lib.ggml_blck_size(type)
assert not (block_size == 0 and type in __k_quant_types), f"Can't quantize, native library was not compiled with USE_K_QUANTS!"
assert block_size > 0, f"Invalid block size {block_size} for type {__type_name(type)}"
for i, d in enumerate(shape):
assert d % block_size == 0, f"Dimension {i} of {__describe(tensor)} is not divisible by {block_size}, required for quantization."
|