File: tpdkg_test.py

package info (click to toggle)
liboprf 0.9.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,720 kB
  • sloc: ansic: 19,331; python: 1,920; makefile: 418
file content (114 lines) | stat: -rwxr-xr-x 3,996 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python

"""
Test for TP DKG wrapper of pyoprf/liboprf

  SPDX-FileCopyrightText: 2024, Marsiske Stefan
  SPDX-License-Identifier: LGPL-3.0-or-later

  Copyright (c) 2024, Marsiske Stefan.
  All rights reserved.

  This file is part of liboprf.

  liboprf is free software: you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public License
  as published by the Free Software Foundation, either version 3 of
  the License, or (at your option) any later version.

  liboprf is distributed in the hope that it will be
  useful, but WITHOUT ANY WARRANTY; without even the implied
  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  See the GNU Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with liboprf. If not, see <http://www.gnu.org/licenses/>.

"""

import pyoprf, pysodium, ctypes

n = 5
t = 3
ts_epsilon = 5

# enable verbose logging for tp-dkg
libc = ctypes.cdll.LoadLibrary('libc.so.6')
cstderr = ctypes.c_void_p.in_dll(libc, 'stderr')
log_file = ctypes.c_void_p.in_dll(pyoprf.liboprf,'log_file')
log_file.value = cstderr.value

# create some long-term keypairs
peer_lt_pks = []
peer_lt_sks = []
for _ in range(n):
    pk, sk = pysodium.crypto_sign_keypair()
    peer_lt_pks.append(pk)
    peer_lt_sks.append(sk)

# initialize the TP and get the first message
tp, msg0 = pyoprf.tpdkg_start_tp(n, t, ts_epsilon, "pyoprf tpdkg test", peer_lt_pks)

print(f"n: {pyoprf.tpdkg_tpstate_n(tp)}, t: {pyoprf.tpdkg_tpstate_t(tp)}, sid: {bytes(c for c in pyoprf.tpdkg_tpstate_sessionid(tp)).hex()}")

# initialize all peers with the 1st message from TP

peers=[]
for i in range(n):
    peer = pyoprf.tpdkg_peer_start(ts_epsilon, peer_lt_sks[i], msg0)
    peers.append(peer)

for i in range(n):
    assert(pyoprf.tpdkg_peerstate_sessionid(peers[i]) == pyoprf.tpdkg_tpstate_sessionid(tp))
    assert(peer_lt_sks[i] == pyoprf.tpdkg_peerstate_lt_sk(peers[i]))

peer_msgs = []
while pyoprf.tpdkg_tp_not_done(tp):
    ret, sizes = pyoprf.tpdkg_tp_input_sizes(tp)
    # peer_msgs = (recv(size) for size in sizes)
    msgs = b''.join(peer_msgs)

    cur_step = pyoprf.tpdkg_tpstate_step(tp)
    try:
      tp_out = pyoprf.tpdkg_tp_next(tp, msgs)
      #print(f"tp: msg[{tp[0].step}]: {tp_out.raw.hex()}")
    except Exception as e:
      cheaters, cheats = pyoprf.tpdkg_get_cheaters(tp)
      print(f"Warning during the distributed key generation the peers misbehaved: {sorted(cheaters)}")
      for k, v in cheats:
          print(f"\tmisbehaving peer: {k} was caught: {v}")
      raise ValueError(f"{e} | tp step {cur_step}")

    peer_msgs = []
    while(len(b''.join(peer_msgs))==0 and pyoprf.tpdkg_peer_not_done(peers[0])):
        for i in range(n):
            if(len(tp_out)>0):
                msg = pyoprf.tpdkg_tp_peer_msg(tp, tp_out, i)
                #print(f"tp -> peer[{i+1}] {msg.hex()}")
            else:
                msg = ''
            out = pyoprf.tpdkg_peer_next(peers[i], msg)
            if(len(out)>0):
                peer_msgs.append(out)
                #print(f"peer[{i+1}] -> tp {peer_msgs[-1].hex()}")
        tp_out = ''

# we are done, let's check the shares

shares = [pyoprf.tpdkg_peerstate_share(peers[i]) for i in range(n)]
for i, share in enumerate(shares):
    print(f"share[{i+1}] {share.hex()}")

v0 = pyoprf.thresholdmult([bytes([i+1])+pysodium.crypto_scalarmult_ristretto255_base(shares[i][1:]) for i in (0,1,2)])
v1 = pyoprf.thresholdmult([bytes([i+1])+pysodium.crypto_scalarmult_ristretto255_base(shares[i][1:]) for i in (2,0,3)])
assert v0 == v1
v2 = pyoprf.thresholdmult([bytes([i+1])+pysodium.crypto_scalarmult_ristretto255_base(shares[i][1:]) for i in (2,1,4)])
assert v0 == v2

secret = pyoprf.dkg_reconstruct(shares[:t])
#print("secret", secret.hex())
assert v0 == pysodium.crypto_scalarmult_ristretto255_base(secret)

# clean up allocated buffers
for i in range(n):
    pyoprf.tpdkg_peer_free(peers[i])