Skip to content

Instantly share code, notes, and snippets.

@eevans

eevans/main.go Secret

Created May 9, 2018 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eevans/79b4622ed91bdb5b755973324add0963 to your computer and use it in GitHub Desktop.
Save eevans/79b4622ed91bdb5b755973324add0963 to your computer and use it in GitHub Desktop.
package main
import (
"encoding/hex"
"errors"
"fmt"
"github.com/fatih/color"
"github.com/gocql/gocql"
"github.com/vaughan0/go-ini"
"gopkg.in/alecthomas/kingpin.v2"
"log"
"math"
"os"
"os/signal"
"time"
)
var (
// Command line arguments
cqlshrc = kingpin.Flag("cqlshrc", "Full path to cqlshrc file.").Default("cqlshrc").String()
hostname = kingpin.Flag("hostname", "Cassandra host.").Default("localhost").String()
port = kingpin.Flag("port", "Cassandra port.").Default("9042").Int()
keyspace = kingpin.Flag("keyspace", "Cassandra keyspace.").Required().String()
timeout = kingpin.Flag("timeout", "Cassandra timeout.").Default("3s").String()
pageState = kingpin.Flag("page-state", "Hex-encoded query page state token.").String()
dryRun = kingpin.Flag("dry-run", "Do not delete; Log only.").Bool()
logFile = kingpin.Flag("log-file", "Write logs to file.").String()
concurrency = kingpin.Flag("concurrency", "Maximum number of concurrent DELETEs.").Default("5").Int()
// Console colors
yellow = color.New(color.FgYellow).SprintFunc()
cyan = color.New(color.FgCyan).SprintFunc()
red = color.New(color.FgRed).SprintFunc()
// Buckets of row-age counts
histoBuckets = make([]int64, 0)
shutdownRequested = false
numDeletes = 0
semaphore chan bool
)
type Cqlshrc struct {
Username string
Password string
Ca string
}
func NewCqlshrc(filename string) (*Cqlshrc, error) {
config, err := ini.LoadFile(filename)
if err != nil {
return nil, errors.New(fmt.Sprintf("Unable to parse cqlshrc: %s", err))
}
result := &Cqlshrc{}
if username, ok := config.Get("authentication", "username"); ok {
result.Username = username
}
if password, ok := config.Get("authentication", "password"); ok {
result.Password = password
}
if cert, ok := config.Get("ssl", "certfile"); ok {
result.Ca = cert
}
return result, nil
}
func CreateSession(hostname string, port int, keyspace string, cqlshrc string) (*gocql.Session, error) {
cluster := gocql.NewCluster(hostname)
cluster.Port = port
cluster.Keyspace = keyspace
cluster.Consistency = gocql.LocalQuorum
rc, err := NewCqlshrc(cqlshrc)
if err != nil {
return nil, err
}
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: rc.Username,
Password: rc.Password,
}
cluster.SslOpts = &gocql.SslOptions{
CaPath: rc.Ca,
}
if queryTimeout, err := time.ParseDuration(*timeout); err == nil {
cluster.Timeout = queryTimeout
} else {
log.Fatal("Cannot parse Cassandra timeout: ", err)
}
return cluster.CreateSession()
}
func rangeDelete(target *row, delTime *time.Time, client *gocql.Session, numRetries int) error {
var exec func(r *row, t *time.Time, c *gocql.Session, retries int, attempts int) error
exec = func(r *row, t *time.Time, c *gocql.Session, retries int, attempts int) error {
deleteTid := gocql.UUIDFromTime(*t)
if *dryRun {
log.Printf(`DELETE FROM "%s".data WHERE "_domain" = '%s' AND key = '%s' AND rev = %d AND tid < %s; -- (tid < %s)`,
*keyspace,
r.domain,
r.key,
r.rev,
deleteTid,
*t)
return nil
} else {
err := c.Query(fmt.Sprintf(`DELETE FROM "%s".data WHERE "_domain" = ? AND key = ? AND rev = ? AND tid < ?`, *keyspace),
r.domain,
r.key,
r.rev,
deleteTid).Exec()
if err != nil {
if (retries - attempts) > 0 {
attempts++
log.Printf("Warning: DELETE of (%s, %s)/%d/%s failed (%s), retrying (retry #%d)...",
r.domain,
r.key,
r.rev,
deleteTid,
err,
attempts)
return exec(r, t, c, retries, attempts)
}
} else {
numDeletes++
}
return err
}
}
return exec(target, delTime, client, numRetries, 0)
}
type row struct {
domain string
key string
rev int
tid gocql.UUID
created time.Time
}
func processRows(rows []row, client *gocql.Session) {
if len(rows) >= 2 {
// FIXME: Maybe don't hard-code the recency window values
if (rows[0].created.Unix() - rows[len(rows)-1].created.Unix()) > 86400 {
// All DELETEs can be issued 1d-ago from the most recent render of the latest revision
deleteFrom := rows[0].created.AddDate(0, 0, -1)
currentRev := -1
for i := 0; i < len(rows); i++ {
if currentRev == rows[i].rev {
continue
} else {
currentRev = rows[i].rev
}
semaphore <- true
go func(r *row, t *time.Time, c *gocql.Session) {
defer func() { <-semaphore }()
// FIXME: Maybe don't hard-code the number of retries
if err := rangeDelete(r, t, c, 3); err != nil {
log.Printf("Error executing DELETE (%#v): %s", *r, err)
}
}(&rows[i], &deleteFrom, client)
}
}
// FIXME: This can probably be folded in to a single loop above
for i := 1; i < len(rows); i++ {
previous := rows[i-1]
current := rows[i]
seconds := previous.created.Unix() - current.created.Unix()
bucket := int(seconds / (86400 * 7))
// Expand the bucket list as-needed
for len(histoBuckets) < (bucket + 1) {
histoBuckets = append(histoBuckets, 0)
}
histoBuckets[bucket]++
}
}
}
func main() {
// Trap keyboard interrupts, and request shutdown
channel := make(chan os.Signal, 1)
signal.Notify(channel, os.Interrupt)
go func() {
for sig := range channel {
fmt.Println(red(fmt.Sprintf("Caught %s, shutting down!", sig)))
shutdownRequested = true
}
}()
kingpin.Version("1.0.0")
kingpin.Parse()
// Setup file logging if requested
if len(*logFile) > 0 {
f, err := os.OpenFile(*logFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
defer f.Close()
log.SetOutput(f)
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile | log.LUTC)
}
log.Println("Starting up...")
var started time.Time = time.Now()
client, err := CreateSession(*hostname, *port, *keyspace, *cqlshrc)
if err != nil {
log.Fatal(err)
}
defer client.Close()
log.Printf("Connected to %s:%d", *hostname, *port)
var domain string
var key string
var rev int
var tid gocql.UUID
var created time.Time
var previous, current row
semaphore = make(chan bool, *concurrency)
partitions := 0
count := 0
rows := make([]row, 0)
query := client.Query(fmt.Sprintf(`SELECT "_domain",key,rev,tid,dateOf(tid) FROM "%s".data`, *keyspace))
if len(*pageState) > 0 {
if token, err := hex.DecodeString(*pageState); err == nil {
log.Println("Setting page state to:", *pageState)
query.PageState(token)
} else {
log.Fatal("Cannot decode query page state: ", *pageState)
}
}
iter := query.Iter()
Outer:
for {
for iter.Scan(&domain, &key, &rev, &tid, &created) {
current = row{domain, key, rev, tid, created}
if count == 0 {
rows = append(rows, current)
previous = current
count++
continue
}
// The start of a new partition
if previous.domain != current.domain || previous.key != current.key {
processRows(rows, client)
rows = make([]row, 0)
partitions++
}
rows = append(rows, current)
previous = current
count++
if shutdownRequested {
break Outer
}
if math.Mod(float64(count), 10000) == 0 {
elapsed := float64(time.Now().Unix() - started.Unix())
rowsRate := float64(count) / elapsed
deleteRate := float64(numDeletes) / elapsed
log.Printf("%d partitions scanned, %d rows (%.2f rows/sec), %d deletes issued (%.2f deletes/sec)",
partitions,
count,
rowsRate,
numDeletes,
deleteRate)
}
if math.Mod(float64(count), 100000) == 0 {
log.Println("Current page state:", hex.EncodeToString(iter.PageState()))
}
}
// In case auto-paging was disabled (which it will be, if we specified a pageState above,
// then we must resort to manual paging.
if len(iter.PageState()) > 0 {
iter = query.PageState(iter.PageState()).Iter()
} else {
break
}
}
// Block until all in-flight range deletes have completed
for i := 0; i < cap(semaphore); i++ {
semaphore <- true
}
if err := iter.Close(); err != nil {
log.Fatal("Error closing iterator: ", err)
}
fmt.Println()
fmt.Printf("Partitions scanned: %d\n", partitions)
fmt.Printf("Rows processed: %d\n", count)
fmt.Printf("Deletes issued: %d\n", numDeletes)
fmt.Printf("Page state: %s\n", hex.EncodeToString(iter.PageState()))
fmt.Println()
histoCount := 0.0
for i := 0; i < len(histoBuckets); i++ {
histoCount += float64(histoBuckets[i])
}
fmt.Printf("| %-6s | %-10s | %-7s |\n", "Weeks", "Count", "Percent")
for i := 0; i < len(histoBuckets); i++ {
fmt.Printf("| %-6d | %-10d | %-7.2f |\n", i, histoBuckets[i], (float64(histoBuckets[i]) / float64(histoCount) * 100))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment