-
-
Save eevans/79b4622ed91bdb5b755973324add0963 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/hex" | |
"errors" | |
"fmt" | |
"github.com/fatih/color" | |
"github.com/gocql/gocql" | |
"github.com/vaughan0/go-ini" | |
"gopkg.in/alecthomas/kingpin.v2" | |
"log" | |
"math" | |
"os" | |
"os/signal" | |
"time" | |
) | |
var ( | |
// Command line arguments | |
cqlshrc = kingpin.Flag("cqlshrc", "Full path to cqlshrc file.").Default("cqlshrc").String() | |
hostname = kingpin.Flag("hostname", "Cassandra host.").Default("localhost").String() | |
port = kingpin.Flag("port", "Cassandra port.").Default("9042").Int() | |
keyspace = kingpin.Flag("keyspace", "Cassandra keyspace.").Required().String() | |
timeout = kingpin.Flag("timeout", "Cassandra timeout.").Default("3s").String() | |
pageState = kingpin.Flag("page-state", "Hex-encoded query page state token.").String() | |
dryRun = kingpin.Flag("dry-run", "Do not delete; Log only.").Bool() | |
logFile = kingpin.Flag("log-file", "Write logs to file.").String() | |
concurrency = kingpin.Flag("concurrency", "Maximum number of concurrent DELETEs.").Default("5").Int() | |
// Console colors | |
yellow = color.New(color.FgYellow).SprintFunc() | |
cyan = color.New(color.FgCyan).SprintFunc() | |
red = color.New(color.FgRed).SprintFunc() | |
// Buckets of row-age counts | |
histoBuckets = make([]int64, 0) | |
shutdownRequested = false | |
numDeletes = 0 | |
semaphore chan bool | |
) | |
type Cqlshrc struct { | |
Username string | |
Password string | |
Ca string | |
} | |
func NewCqlshrc(filename string) (*Cqlshrc, error) { | |
config, err := ini.LoadFile(filename) | |
if err != nil { | |
return nil, errors.New(fmt.Sprintf("Unable to parse cqlshrc: %s", err)) | |
} | |
result := &Cqlshrc{} | |
if username, ok := config.Get("authentication", "username"); ok { | |
result.Username = username | |
} | |
if password, ok := config.Get("authentication", "password"); ok { | |
result.Password = password | |
} | |
if cert, ok := config.Get("ssl", "certfile"); ok { | |
result.Ca = cert | |
} | |
return result, nil | |
} | |
func CreateSession(hostname string, port int, keyspace string, cqlshrc string) (*gocql.Session, error) { | |
cluster := gocql.NewCluster(hostname) | |
cluster.Port = port | |
cluster.Keyspace = keyspace | |
cluster.Consistency = gocql.LocalQuorum | |
rc, err := NewCqlshrc(cqlshrc) | |
if err != nil { | |
return nil, err | |
} | |
cluster.Authenticator = gocql.PasswordAuthenticator{ | |
Username: rc.Username, | |
Password: rc.Password, | |
} | |
cluster.SslOpts = &gocql.SslOptions{ | |
CaPath: rc.Ca, | |
} | |
if queryTimeout, err := time.ParseDuration(*timeout); err == nil { | |
cluster.Timeout = queryTimeout | |
} else { | |
log.Fatal("Cannot parse Cassandra timeout: ", err) | |
} | |
return cluster.CreateSession() | |
} | |
func rangeDelete(target *row, delTime *time.Time, client *gocql.Session, numRetries int) error { | |
var exec func(r *row, t *time.Time, c *gocql.Session, retries int, attempts int) error | |
exec = func(r *row, t *time.Time, c *gocql.Session, retries int, attempts int) error { | |
deleteTid := gocql.UUIDFromTime(*t) | |
if *dryRun { | |
log.Printf(`DELETE FROM "%s".data WHERE "_domain" = '%s' AND key = '%s' AND rev = %d AND tid < %s; -- (tid < %s)`, | |
*keyspace, | |
r.domain, | |
r.key, | |
r.rev, | |
deleteTid, | |
*t) | |
return nil | |
} else { | |
err := c.Query(fmt.Sprintf(`DELETE FROM "%s".data WHERE "_domain" = ? AND key = ? AND rev = ? AND tid < ?`, *keyspace), | |
r.domain, | |
r.key, | |
r.rev, | |
deleteTid).Exec() | |
if err != nil { | |
if (retries - attempts) > 0 { | |
attempts++ | |
log.Printf("Warning: DELETE of (%s, %s)/%d/%s failed (%s), retrying (retry #%d)...", | |
r.domain, | |
r.key, | |
r.rev, | |
deleteTid, | |
err, | |
attempts) | |
return exec(r, t, c, retries, attempts) | |
} | |
} else { | |
numDeletes++ | |
} | |
return err | |
} | |
} | |
return exec(target, delTime, client, numRetries, 0) | |
} | |
type row struct { | |
domain string | |
key string | |
rev int | |
tid gocql.UUID | |
created time.Time | |
} | |
func processRows(rows []row, client *gocql.Session) { | |
if len(rows) >= 2 { | |
// FIXME: Maybe don't hard-code the recency window values | |
if (rows[0].created.Unix() - rows[len(rows)-1].created.Unix()) > 86400 { | |
// All DELETEs can be issued 1d-ago from the most recent render of the latest revision | |
deleteFrom := rows[0].created.AddDate(0, 0, -1) | |
currentRev := -1 | |
for i := 0; i < len(rows); i++ { | |
if currentRev == rows[i].rev { | |
continue | |
} else { | |
currentRev = rows[i].rev | |
} | |
semaphore <- true | |
go func(r *row, t *time.Time, c *gocql.Session) { | |
defer func() { <-semaphore }() | |
// FIXME: Maybe don't hard-code the number of retries | |
if err := rangeDelete(r, t, c, 3); err != nil { | |
log.Printf("Error executing DELETE (%#v): %s", *r, err) | |
} | |
}(&rows[i], &deleteFrom, client) | |
} | |
} | |
// FIXME: This can probably be folded in to a single loop above | |
for i := 1; i < len(rows); i++ { | |
previous := rows[i-1] | |
current := rows[i] | |
seconds := previous.created.Unix() - current.created.Unix() | |
bucket := int(seconds / (86400 * 7)) | |
// Expand the bucket list as-needed | |
for len(histoBuckets) < (bucket + 1) { | |
histoBuckets = append(histoBuckets, 0) | |
} | |
histoBuckets[bucket]++ | |
} | |
} | |
} | |
func main() { | |
// Trap keyboard interrupts, and request shutdown | |
channel := make(chan os.Signal, 1) | |
signal.Notify(channel, os.Interrupt) | |
go func() { | |
for sig := range channel { | |
fmt.Println(red(fmt.Sprintf("Caught %s, shutting down!", sig))) | |
shutdownRequested = true | |
} | |
}() | |
kingpin.Version("1.0.0") | |
kingpin.Parse() | |
// Setup file logging if requested | |
if len(*logFile) > 0 { | |
f, err := os.OpenFile(*logFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer f.Close() | |
log.SetOutput(f) | |
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile | log.LUTC) | |
} | |
log.Println("Starting up...") | |
var started time.Time = time.Now() | |
client, err := CreateSession(*hostname, *port, *keyspace, *cqlshrc) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer client.Close() | |
log.Printf("Connected to %s:%d", *hostname, *port) | |
var domain string | |
var key string | |
var rev int | |
var tid gocql.UUID | |
var created time.Time | |
var previous, current row | |
semaphore = make(chan bool, *concurrency) | |
partitions := 0 | |
count := 0 | |
rows := make([]row, 0) | |
query := client.Query(fmt.Sprintf(`SELECT "_domain",key,rev,tid,dateOf(tid) FROM "%s".data`, *keyspace)) | |
if len(*pageState) > 0 { | |
if token, err := hex.DecodeString(*pageState); err == nil { | |
log.Println("Setting page state to:", *pageState) | |
query.PageState(token) | |
} else { | |
log.Fatal("Cannot decode query page state: ", *pageState) | |
} | |
} | |
iter := query.Iter() | |
Outer: | |
for { | |
for iter.Scan(&domain, &key, &rev, &tid, &created) { | |
current = row{domain, key, rev, tid, created} | |
if count == 0 { | |
rows = append(rows, current) | |
previous = current | |
count++ | |
continue | |
} | |
// The start of a new partition | |
if previous.domain != current.domain || previous.key != current.key { | |
processRows(rows, client) | |
rows = make([]row, 0) | |
partitions++ | |
} | |
rows = append(rows, current) | |
previous = current | |
count++ | |
if shutdownRequested { | |
break Outer | |
} | |
if math.Mod(float64(count), 10000) == 0 { | |
elapsed := float64(time.Now().Unix() - started.Unix()) | |
rowsRate := float64(count) / elapsed | |
deleteRate := float64(numDeletes) / elapsed | |
log.Printf("%d partitions scanned, %d rows (%.2f rows/sec), %d deletes issued (%.2f deletes/sec)", | |
partitions, | |
count, | |
rowsRate, | |
numDeletes, | |
deleteRate) | |
} | |
if math.Mod(float64(count), 100000) == 0 { | |
log.Println("Current page state:", hex.EncodeToString(iter.PageState())) | |
} | |
} | |
// In case auto-paging was disabled (which it will be, if we specified a pageState above, | |
// then we must resort to manual paging. | |
if len(iter.PageState()) > 0 { | |
iter = query.PageState(iter.PageState()).Iter() | |
} else { | |
break | |
} | |
} | |
// Block until all in-flight range deletes have completed | |
for i := 0; i < cap(semaphore); i++ { | |
semaphore <- true | |
} | |
if err := iter.Close(); err != nil { | |
log.Fatal("Error closing iterator: ", err) | |
} | |
fmt.Println() | |
fmt.Printf("Partitions scanned: %d\n", partitions) | |
fmt.Printf("Rows processed: %d\n", count) | |
fmt.Printf("Deletes issued: %d\n", numDeletes) | |
fmt.Printf("Page state: %s\n", hex.EncodeToString(iter.PageState())) | |
fmt.Println() | |
histoCount := 0.0 | |
for i := 0; i < len(histoBuckets); i++ { | |
histoCount += float64(histoBuckets[i]) | |
} | |
fmt.Printf("| %-6s | %-10s | %-7s |\n", "Weeks", "Count", "Percent") | |
for i := 0; i < len(histoBuckets); i++ { | |
fmt.Printf("| %-6d | %-10d | %-7.2f |\n", i, histoBuckets[i], (float64(histoBuckets[i]) / float64(histoCount) * 100)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment