1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
package gocql
import (
"flag"
"fmt"
"log"
"net"
"strings"
"sync"
"testing"
"time"
)
var (
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
flagCassVersion cassVersion
clusterHosts []string
)
func init() {
flag.Var(&flagCassVersion, "gocql.cversion", "the cassandra version being tested against")
flag.Parse()
clusterHosts = strings.Split(*flagCluster, ",")
log.SetFlags(log.Lshortfile | log.LstdFlags)
}
func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
if *flagRunSslTest {
cluster.SslOpts = &SslOptions{
CertPath: "testdata/pki/gocql.crt",
KeyPath: "testdata/pki/gocql.key",
CaPath: "testdata/pki/ca.crt",
EnableHostVerification: false,
}
}
return cluster
}
var initOnce sync.Once
func createTable(s *Session, table string) error {
// lets just be really sure
if err := s.control.awaitSchemaAgreement(); err != nil {
log.Printf("error waiting for schema agreement pre create table=%q err=%v\n", table, err)
return err
}
if err := s.Query(table).RetryPolicy(nil).Exec(); err != nil {
log.Printf("error creating table table=%q err=%v\n", table, err)
return err
}
if err := s.control.awaitSchemaAgreement(); err != nil {
log.Printf("error waiting for schema agreement post create table=%q err=%v\n", table, err)
return err
}
return nil
}
func createCluster() *ClusterConfig {
cluster := NewCluster(clusterHosts...)
cluster.ProtoVersion = *flagProto
cluster.CQLVersion = *flagCQL
cluster.Timeout = *flagTimeout
cluster.Consistency = Quorum
cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
if *flagRetry > 0 {
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
}
switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
}
cluster = addSslOptions(cluster)
return cluster
}
func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
c := *cluster
c.Keyspace = "system"
c.Timeout = 30 * time.Second
session, err := c.CreateSession()
if err != nil {
panic(err)
}
defer session.Close()
defer tb.Log("closing keyspace session")
err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace)
if err != nil {
panic(fmt.Sprintf("unable to drop keyspace: %v", err))
}
err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s
WITH replication = {
'class' : 'SimpleStrategy',
'replication_factor' : %d
}`, keyspace, *flagRF))
if err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
}
}
func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
// Drop and re-create the keyspace once. Different tests should use their own
// individual tables, but can assume that the table does not exist before.
initOnce.Do(func() {
createKeyspace(tb, cluster, "gocql_test")
})
cluster.Keyspace = "gocql_test"
session, err := cluster.CreateSession()
if err != nil {
tb.Fatal("createSession:", err)
}
if err := session.control.awaitSchemaAgreement(); err != nil {
tb.Fatal(err)
}
return session
}
func createSession(tb testing.TB) *Session {
cluster := createCluster()
return createSessionFromCluster(cluster, tb)
}
// createTestSession is hopefully moderately useful in actual unit tests
func createTestSession() *Session {
config := NewCluster()
config.NumConns = 1
config.Timeout = 0
config.DisableInitialHostLookup = true
config.IgnorePeerAddr = true
config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
session := &Session{
cfg: *config,
connCfg: &ConnConfig{
Timeout: 10 * time.Millisecond,
Keepalive: 0,
},
policy: config.PoolConfig.HostSelectionPolicy,
}
session.pool = config.PoolConfig.buildPool(session)
return session
}
func staticAddressTranslator(newAddr net.IP, newPort int) AddressTranslator {
return AddressTranslatorFunc(func(addr net.IP, port int) (net.IP, int) {
return newAddr, newPort
})
}
func assertTrue(t *testing.T, description string, value bool) {
if !value {
t.Errorf("expected %s to be true", description)
}
}
func assertEqual(t *testing.T, description string, expected, actual interface{}) {
if expected != actual {
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}
func assertNil(t *testing.T, description string, actual interface{}) {
if actual != nil {
t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual)
}
}
func assertNotNil(t *testing.T, description string, actual interface{}) {
if actual == nil {
t.Errorf("expected %s not to be (nil)", description)
}
}
|