1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from tests.integration import use_singledc, TestCluster
from cassandra.policies import ColDesc
from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, \
AES256_KEY_SIZE_BYTES, AES256_BLOCK_SIZE_BYTES
def setup_module():
use_singledc()
class ColumnEncryptionPolicyTest(unittest.TestCase):
def _recreate_keyspace(self, session):
session.execute("drop keyspace if exists foo")
session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))")
def _create_policy(self, key, iv = None):
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, "int")
return (col_desc, cl_policy)
def test_end_to_end_prepared(self):
# We only currently perform testing on a single type/expected value pair since CLE functionality is essentially
# independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's
# encoded when coming back; the actual types of the data involved don't impact us.
expected = 0
key = os.urandom(AES256_KEY_SIZE_BYTES)
(_, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)
prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)")
for i in range(100):
session.execute(prepared, (i, i))
# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
(encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEqual(expected, encrypted)
self.assertEqual(expected, unencrypted)
# Confirm the same behaviour from a subsequent prepared statement as well
prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering")
(encrypted,unencrypted) = session.execute(prepared, [expected]).one()
self.assertEqual(expected, encrypted)
self.assertEqual(expected, unencrypted)
def test_end_to_end_simple(self):
expected = 1
key = os.urandom(AES256_KEY_SIZE_BYTES)
(col_desc, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)
# Use encode_and_encrypt helper function to populate date
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy.encode_and_encrypt(col_desc, i)
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))
# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
(encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEqual(expected, encrypted)
self.assertEqual(expected, unencrypted)
# Confirm the same behaviour from a subsequent prepared statement as well
prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering")
(encrypted,unencrypted) = session.execute(prepared, [expected]).one()
self.assertEqual(expected, encrypted)
self.assertEqual(expected, unencrypted)
def test_end_to_end_different_cle_contexts_different_ivs(self):
"""
Test to validate PYTHON-1350. We should be able to decode the data from two different contexts (with two different IVs)
since the IV used to decrypt the data is actually now stored with the data.
"""
expected = 2
key = os.urandom(AES256_KEY_SIZE_BYTES)
# Simulate the creation of two AES256 policies at two different times. Python caches
# default param args at function definition time so a single value will be used any time
# the default val is used. Upshot is that within the same test we'll always have the same
# IV if we rely on the default args, so manually introduce some variation here to simulate
# what actually happens if you have two distinct sessions created at two different times.
iv1 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(col_desc1, cl_policy1) = self._create_policy(key, iv=iv1)
cluster1 = TestCluster(column_encryption_policy=cl_policy1)
session1 = cluster1.connect()
self._recreate_keyspace(session1)
# Use encode_and_encrypt helper function to populate date
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy1.encode_and_encrypt(col_desc1, i)
session1.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))
session1.shutdown()
cluster1.shutdown()
# Explicitly clear the class-level cache here; we're trying to simulate a second connection from a completely new process and
# that would entail not re-using any cached ciphers
AES256ColumnEncryptionPolicy._build_cipher.cache_clear()
cache_info = cl_policy1.cache_info()
self.assertEqual(cache_info.currsize, 0)
iv2 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(_, cl_policy2) = self._create_policy(key, iv=iv2)
cluster2 = TestCluster(column_encryption_policy=cl_policy2)
session2 = cluster2.connect()
(encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEqual(expected, encrypted)
self.assertEqual(expected, unencrypted)
def test_end_to_end_different_cle_contexts_different_policies(self):
"""
Test to validate PYTHON-1356. Class variables used to pass CLE policy down to protocol handler shouldn't persist.
"""
expected = 3
key = os.urandom(AES256_KEY_SIZE_BYTES)
(col_desc, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)
# Use encode_and_encrypt helper function to populate date
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected))
# We now open a new session _without_ the CLE policy specified. We should _not_ be able to read decrypted bits from this session.
cluster2 = TestCluster()
session2 = cluster2.connect()
# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
(encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEqual(cl_policy.encode_and_encrypt(col_desc, expected), encrypted)
self.assertEqual(expected, unencrypted)
# Confirm the same behaviour from a subsequent prepared statement as well
prepared = session2.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering")
(encrypted,unencrypted) = session2.execute(prepared, [expected]).one()
self.assertEqual(cl_policy.encode_and_encrypt(col_desc, expected), encrypted)
|