diff --git a/archiver.go b/archiver.go index 85bddc2af..e85f8c321 100644 --- a/archiver.go +++ b/archiver.go @@ -1,6 +1,8 @@ package khepri import ( + "errors" + "fmt" "io" "os" "path/filepath" @@ -24,7 +26,8 @@ type Archiver struct { key *Key ch *ContentHandler - bl *BlobList // blobs used for the current snapshot + bl *BlobList // blobs used for the current snapshot + parentBl *BlobList // blobs from the parent snapshot fileToken chan struct{} blobToken chan struct{} @@ -171,20 +174,27 @@ func (arch *Archiver) SaveFile(node *Node) error { return arrar.Annotate(err, "SaveFile() read small file") } - blob, err := arch.ch.Save(backend.Data, buf[:n]) - if err != nil { - return arrar.Annotate(err, "SaveFile() save chunk") + if err == io.EOF { + // use empty blob list for empty files + blobs = Blobs{} + } else { + blob, err := arch.ch.Save(backend.Data, buf[:n]) + if err != nil { + return arrar.Annotate(err, "SaveFile() save chunk") + } + + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) + + blobs = Blobs{blob} } - - arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) - - blobs = Blobs{blob} } else { // else store all chunks chnker := chunker.New(file) chans := [](<-chan Blob){} defer chnker.Free() + chunks := 0 + for { buf := GetChunkBuf("blob chunker") chunk, err := chnker.Next(buf) @@ -198,6 +208,8 @@ func (arch *Archiver) SaveFile(node *Node) error { return arrar.Annotate(err, "SaveFile() chunker.Next()") } + chunks++ + // acquire token, start goroutine to save chunk token := <-arch.blobToken resCh := make(chan Blob, 1) @@ -223,6 +235,10 @@ func (arch *Archiver) SaveFile(node *Node) error { for _, ch := range chans { blobs = append(blobs, <-ch) } + + if len(blobs) != chunks { + return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs)) + } } node.Content = make([]backend.ID, len(blobs)) @@ -234,7 +250,46 @@ func (arch *Archiver) SaveFile(node *Node) error { return nil } -func (arch *Archiver) loadTree(dir string) (*Tree, error) { +func (arch *Archiver) populateFromOldTree(tree, oldTree Tree) error { + // update content from old tree + err := tree.PopulateFrom(oldTree) + if err != nil { + return err + } + + // add blobs to bloblist + for _, node := range tree { + if node.Content != nil { + for _, blobID := range node.Content { + blob, err := arch.parentBl.Find(Blob{ID: blobID}) + if err != nil { + return err + } + + arch.bl.Insert(blob) + } + } + } + + return nil +} + +func (arch *Archiver) loadTree(dir string, oldTreeID backend.ID) (*Tree, error) { + var ( + oldTree Tree + err error + ) + + if oldTreeID != nil { + // load old tree + oldTree, err = LoadTree(arch.ch, oldTreeID) + if err != nil { + return nil, arrar.Annotate(err, "load old tree") + } + + debug("old tree: %v\n", oldTree) + } + // open and list path fd, err := os.Open(dir) defer fd.Close() @@ -247,8 +302,8 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) { return nil, err } + // build new tree tree := Tree{} - for _, entry := range entries { path := filepath.Join(dir, entry.Name()) @@ -262,14 +317,39 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) { return nil, err } - tree = append(tree, node) + err = tree.Insert(node) + if err != nil { + return nil, err + } if entry.IsDir() { - node.Tree, err = arch.loadTree(path) + oldSubtree, err := oldTree.Find(node.Name) + if err != nil && err != ErrNodeNotFound { + return nil, err + } + + var oldSubtreeID backend.ID + if err == nil { + oldSubtreeID = oldSubtree.Subtree + } + + node.Tree, err = arch.loadTree(path, oldSubtreeID) if err != nil { return nil, err } } + } + + // populate with content from oldTree + err = arch.populateFromOldTree(tree, oldTree) + if err != nil { + return nil, err + } + + for _, node := range tree { + if node.Type == "file" && node.Content != nil { + continue + } switch node.Type { case "file": @@ -287,7 +367,34 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) { return &tree, nil } -func (arch *Archiver) LoadTree(path string) (*Tree, error) { +func (arch *Archiver) LoadTree(path string, parentSnapshot backend.ID) (*Tree, error) { + var oldTree Tree + + if parentSnapshot != nil { + // load old tree from snapshot + snapshot, err := LoadSnapshot(arch.ch, parentSnapshot) + if err != nil { + return nil, arrar.Annotate(err, "load old snapshot") + } + + if snapshot.Content == nil { + return nil, errors.New("snapshot without tree!") + } + + // load old bloblist from snapshot + arch.parentBl, err = LoadBlobList(arch.ch, snapshot.Map) + if err != nil { + return nil, err + } + + oldTree, err = LoadTree(arch.ch, snapshot.Content) + if err != nil { + return nil, arrar.Annotate(err, "load old tree") + } + + debug("old tree: %v\n", oldTree) + } + // reset global stats arch.updateStats = Stats{} @@ -302,14 +409,38 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { } if node.Type != "dir" { - arch.Stats.Files = 1 - arch.Stats.Bytes = node.Size + t := &Tree{node} + + // populate with content from oldTree + err = arch.populateFromOldTree(*t, oldTree) + if err != nil { + return nil, err + } + + // if no old node has been found, update stats + if node.Content == nil && node.Subtree == nil { + arch.Stats.Files = 1 + arch.Stats.Bytes = node.Size + } + arch.update(arch.ScannerStats, arch.Stats) - return &Tree{node}, nil + + return t, nil } arch.Stats.Directories = 1 - node.Tree, err = arch.loadTree(path) + + var oldSubtreeID backend.ID + oldSubtree, err := oldTree.Find(node.Name) + if err != nil && err != ErrNodeNotFound { + return nil, arrar.Annotate(err, "search node in old tree") + } + + if err == nil { + oldSubtreeID = oldSubtree.Subtree + } + + node.Tree, err = arch.loadTree(path, oldSubtreeID) if err != nil { return nil, arrar.Annotate(err, "loadTree()") } @@ -356,6 +487,13 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { wg.Wait() + // check for invalid file nodes + for _, node := range *t { + if node.Type == "file" && node.Content == nil { + return Blob{}, fmt.Errorf("node %v has empty content", node.Name) + } + } + blob, err := arch.SaveJSON(backend.Tree, t) if err != nil { return Blob{}, err @@ -364,11 +502,12 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return blob, nil } -func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { +func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { // reset global stats arch.updateStats = Stats{} sn := NewSnapshot(dir) + sn.Parent = parentSnapshot blob, err := arch.saveTree(t) if err != nil { diff --git a/backend/local.go b/backend/local.go index dc7752cc1..d266c506c 100644 --- a/backend/local.go +++ b/backend/local.go @@ -236,6 +236,10 @@ func (b *Local) filename(t Type, id ID) string { // Get returns the content stored under the given ID. If the data doesn't match // the requested ID, ErrWrongData is returned. func (b *Local) Get(t Type, id ID) ([]byte, error) { + if id == nil { + return nil, errors.New("unable to load nil ID") + } + // try to open file file, err := os.Open(b.filename(t, id)) defer file.Close() diff --git a/backend/sftp.go b/backend/sftp.go index 704f7c450..a189076b7 100644 --- a/backend/sftp.go +++ b/backend/sftp.go @@ -344,6 +344,10 @@ func (r *SFTP) filename(t Type, id ID) string { // Get returns the content stored under the given ID. If the data doesn't match // the requested ID, ErrWrongData is returned. func (r *SFTP) Get(t Type, id ID) ([]byte, error) { + if id == nil { + return nil, errors.New("unable to load nil ID") + } + // try to open file file, err := r.c.Open(r.filename(t, id)) defer file.Close() diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index a7f329dda..964e8c439 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -12,6 +12,10 @@ import ( "golang.org/x/crypto/ssh/terminal" ) +func init() { + commands["backup"] = commandBackup +} + func format_bytes(c uint64) string { b := float64(c) @@ -53,11 +57,22 @@ func print_tree2(indent int, t *khepri.Tree) { } func commandBackup(be backend.Server, key *khepri.Key, args []string) error { - if len(args) != 1 { - return errors.New("usage: backup [dir|file]") + if len(args) < 1 || len(args) > 2 { + return errors.New("usage: backup [dir|file] [snapshot-id]") } + var parentSnapshotID backend.ID + var err error + target := args[0] + if len(args) > 1 { + parentSnapshotID, err = backend.FindSnapshot(be, args[1]) + if err != nil { + return fmt.Errorf("invalid id %q: %v", args[1], err) + } + + fmt.Printf("found parent snapshot %v\n", parentSnapshotID) + } arch, err := khepri.NewArchiver(be, key) if err != nil { @@ -89,7 +104,7 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { // return true // } - t, err := arch.LoadTree(target) + t, err := arch.LoadTree(target, parentSnapshotID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) return err @@ -144,7 +159,7 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { }(ch) } - sn, id, err := arch.Snapshot(target, t) + sn, id, err := arch.Snapshot(target, t, parentSnapshotID) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } diff --git a/cmd/khepri/cmd_cat.go b/cmd/khepri/cmd_cat.go index 47befd91a..7e892dddf 100644 --- a/cmd/khepri/cmd_cat.go +++ b/cmd/khepri/cmd_cat.go @@ -10,6 +10,10 @@ import ( "github.com/fd0/khepri/backend" ) +func init() { + commands["cat"] = commandCat +} + func commandCat(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 2 { return errors.New("usage: cat [blob|tree|snapshot|key|lock] ID") diff --git a/cmd/khepri/cmd_fsck.go b/cmd/khepri/cmd_fsck.go index 63ea29680..b8a2ce716 100644 --- a/cmd/khepri/cmd_fsck.go +++ b/cmd/khepri/cmd_fsck.go @@ -1,76 +1,123 @@ package main -import "github.com/fd0/khepri/backend" +import ( + "errors" + "fmt" -// func fsck_tree(be backend.Server, id backend.ID) (bool, error) { -// log.Printf(" checking dir %s", id) + "github.com/fd0/khepri" + "github.com/fd0/khepri/backend" +) -// buf, err := be.GetBlob(id) -// if err != nil { -// return false, err -// } +func init() { + commands["fsck"] = commandFsck +} -// tree := &khepri.Tree{} -// err = json.Unmarshal(buf, tree) -// if err != nil { -// return false, err -// } +func fsckFile(ch *khepri.ContentHandler, IDs []backend.ID) error { + for _, id := range IDs { + debug("checking data blob %v\n", id) -// if !id.Equal(backend.IDFromData(buf)) { -// return false, nil -// } - -// return true, nil -// } - -// func fsck_snapshot(be backend.Server, id backend.ID) (bool, error) { -// log.Printf("checking snapshot %s", id) - -// sn, err := khepri.LoadSnapshot(be, id) -// if err != nil { -// return false, err -// } - -// return fsck_tree(be, sn.Content) -// } - -func commandFsck(be backend.Server, args []string) error { - // var snapshots backend.IDs - // var err error - - // if len(args) != 0 { - // snapshots = make(backend.IDs, 0, len(args)) - - // for _, arg := range args { - // id, err := backend.ParseID(arg) - // if err != nil { - // log.Fatal(err) - // } - - // snapshots = append(snapshots, id) - // } - // } else { - // snapshots, err = be.ListRefs() - - // if err != nil { - // log.Fatalf("error reading list of snapshot IDs: %v", err) - // } - // } - - // log.Printf("checking %d snapshots", len(snapshots)) - - // for _, id := range snapshots { - // ok, err := fsck_snapshot(be, id) - - // if err != nil { - // log.Printf("error checking snapshot %s: %v", id, err) - // continue - // } - - // if !ok { - // log.Printf("snapshot %s failed", id) - // } - // } + // load content + _, err := ch.Load(backend.Data, id) + if err != nil { + return err + } + } + + return nil +} + +func fsckTree(ch *khepri.ContentHandler, id backend.ID) error { + debug("checking tree %v\n", id) + + tree, err := khepri.LoadTree(ch, id) + if err != nil { + return err + } + + for i, node := range tree { + if node.Name == "" { + return fmt.Errorf("node %v of tree %v has no name", i, id) + } + + if node.Type == "" { + return fmt.Errorf("node %q of tree %v has no type", node.Name, id) + } + + switch node.Type { + case "file": + if node.Content == nil { + return fmt.Errorf("file node %q of tree %v has no content", node.Name, id) + } + + err := fsckFile(ch, node.Content) + if err != nil { + return err + } + case "dir": + if node.Subtree == nil { + return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id) + } + + err := fsckTree(ch, node.Subtree) + if err != nil { + return err + } + } + } + + return nil +} + +func fsck_snapshot(be backend.Server, key *khepri.Key, id backend.ID) error { + debug("checking snapshot %v\n", id) + + ch, err := khepri.NewContentHandler(be, key) + if err != nil { + return err + } + + sn, err := ch.LoadSnapshot(id) + if err != nil { + return err + } + + if sn.Content == nil { + return fmt.Errorf("snapshot %v has no content", sn.ID) + } + + if sn.Map == nil { + return fmt.Errorf("snapshot %v has no map", sn.ID) + } + + return fsckTree(ch, sn.Content) +} + +func commandFsck(be backend.Server, key *khepri.Key, args []string) error { + if len(args) == 0 { + return errors.New("usage: fsck [all|snapshot-id]") + } + + if len(args) == 1 && args[0] != "all" { + snapshotID, err := backend.FindSnapshot(be, args[0]) + if err != nil { + return fmt.Errorf("invalid id %q: %v", args[0], err) + } + + return fsck_snapshot(be, key, snapshotID) + } + + list, err := be.List(backend.Snapshot) + if err != nil { + return err + } + + for _, snapshotID := range list { + err := fsck_snapshot(be, key, snapshotID) + + if err != nil { + return err + } + } return nil } diff --git a/cmd/khepri/cmd_key.go b/cmd/khepri/cmd_key.go index ad4814c2b..4aab50c4f 100644 --- a/cmd/khepri/cmd_key.go +++ b/cmd/khepri/cmd_key.go @@ -10,6 +10,10 @@ import ( "github.com/fd0/khepri/backend" ) +func init() { + commands["key"] = commandKey +} + func list_keys(be backend.Server, key *khepri.Key) error { tab := NewTable() tab.Header = fmt.Sprintf(" %-10s %-10s %-10s %s", "ID", "User", "Host", "Created") diff --git a/cmd/khepri/cmd_list.go b/cmd/khepri/cmd_list.go index f8fe4c132..6965cd022 100644 --- a/cmd/khepri/cmd_list.go +++ b/cmd/khepri/cmd_list.go @@ -8,6 +8,10 @@ import ( "github.com/fd0/khepri/backend" ) +func init() { + commands["list"] = commandList +} + func commandList(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 1 { return errors.New("usage: list [data|trees|snapshots|keys|locks]") diff --git a/cmd/khepri/cmd_ls.go b/cmd/khepri/cmd_ls.go index 5ab98ace7..89b5adfa5 100644 --- a/cmd/khepri/cmd_ls.go +++ b/cmd/khepri/cmd_ls.go @@ -10,6 +10,10 @@ import ( "github.com/fd0/khepri/backend" ) +func init() { + commands["ls"] = commandLs +} + func print_node(prefix string, n *khepri.Node) string { switch n.Type { case "file": diff --git a/cmd/khepri/cmd_restore.go b/cmd/khepri/cmd_restore.go index de747c8e2..921fdcf70 100644 --- a/cmd/khepri/cmd_restore.go +++ b/cmd/khepri/cmd_restore.go @@ -9,6 +9,10 @@ import ( "github.com/fd0/khepri/backend" ) +func init() { + commands["restore"] = commandRestore +} + func commandRestore(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 2 { return errors.New("usage: restore ID dir") diff --git a/cmd/khepri/cmd_snapshots.go b/cmd/khepri/cmd_snapshots.go index 5cec0e195..98db23021 100644 --- a/cmd/khepri/cmd_snapshots.go +++ b/cmd/khepri/cmd_snapshots.go @@ -72,6 +72,10 @@ func reltime(t time.Time) string { } } +func init() { + commands["snapshots"] = commandSnapshots +} + func commandSnapshots(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 0 { return errors.New("usage: snapshots") diff --git a/cmd/khepri/main.go b/cmd/khepri/main.go index 5df73b881..a774f5205 100644 --- a/cmd/khepri/main.go +++ b/cmd/khepri/main.go @@ -32,7 +32,7 @@ func errx(code int, format string, data ...interface{}) { type commandFunc func(backend.Server, *khepri.Key, []string) error -var commands map[string]commandFunc +var commands = make(map[string]commandFunc) func readPassword(env string, prompt string) string { @@ -126,15 +126,6 @@ func create(u string) (backend.Server, error) { } func init() { - commands = make(map[string]commandFunc) - commands["backup"] = commandBackup - commands["restore"] = commandRestore - commands["list"] = commandList - commands["snapshots"] = commandSnapshots - commands["cat"] = commandCat - commands["ls"] = commandLs - commands["key"] = commandKey - // set GOMAXPROCS to number of CPUs runtime.GOMAXPROCS(runtime.NumCPU()) } diff --git a/contenthandler.go b/contenthandler.go index e1ab13749..9661a44af 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -8,6 +8,8 @@ import ( "github.com/fd0/khepri/backend" ) +var ErrWrongData = errors.New("wrong data decrypt, checksum does not match") + type ContentHandler struct { be backend.Server key *Key @@ -173,6 +175,11 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { return nil, errors.New("Invalid length") } + // check SHA256 sum + if !id.Equal(backend.Hash(buf)) { + return nil, ErrWrongData + } + return buf, nil } diff --git a/snapshot.go b/snapshot.go index db6f81f7d..1e592a178 100644 --- a/snapshot.go +++ b/snapshot.go @@ -12,6 +12,7 @@ import ( type Snapshot struct { Time time.Time `json:"time"` + Parent backend.ID `json:"parent,omitempty"` Content backend.ID `json:"content"` Map backend.ID `json:"map"` Dir string `json:"dir"` diff --git a/test/test-backup.sh b/test/test-backup.sh index d42dbe85a..b208eaab2 100755 --- a/test/test-backup.sh +++ b/test/test-backup.sh @@ -5,4 +5,11 @@ run khepri init run khepri backup "${BASE}/fake-data" run khepri restore "$(basename "$KHEPRI_REPOSITORY"/snapshots/*)" "${BASE}/fake-data-restore" dirdiff "${BASE}/fake-data" "${BASE}/fake-data-restore/fake-data" + +SNAPSHOT=$(run khepri list snapshots) +run khepri backup "${BASE}/fake-data" $SNAPSHOT +run khepri restore "$(basename "$KHEPRI_REPOSITORY"/snapshots/*)" "${BASE}/fake-data-restore-incremental" +dirdiff "${BASE}/fake-data" "${BASE}/fake-data-restore-incremental/fake-data" + +run khepri fsck all cleanup diff --git a/test/test-key-add-remove.sh b/test/test-key-add-remove.sh index 9d691c2ed..1fa5fa927 100755 --- a/test/test-key-add-remove.sh +++ b/test/test-key-add-remove.sh @@ -35,6 +35,8 @@ for i in {1..3}; do OLD_PWD=bar$i done +KHEPRI_PASSWORD=$OLD_PWD run khepri fsck all + cleanup FAILED=0 diff --git a/tree.go b/tree.go index 1635db19a..b5b92e885 100644 --- a/tree.go +++ b/tree.go @@ -1,9 +1,11 @@ package khepri import ( + "errors" "fmt" "os" "os/user" + "sort" "strconv" "strings" "syscall" @@ -31,7 +33,7 @@ type Node struct { Links uint64 `json:"links,omitempty"` LinkTarget string `json:"linktarget,omitempty"` Device uint64 `json:"device,omitempty"` - Content []backend.ID `json:"content,omitempty"` + Content []backend.ID `json:"content"` Subtree backend.ID `json:"subtree,omitempty"` Tree *Tree `json:"-"` @@ -39,6 +41,11 @@ type Node struct { path string } +var ( + ErrNodeNotFound = errors.New("named node not found") + ErrNodeAlreadyInTree = errors.New("node already present") +) + type Blob struct { ID backend.ID `json:"id,omitempty"` Size uint64 `json:"size,omitempty"` @@ -69,6 +76,79 @@ func (t Tree) String() string { return strings.Join(s, "\n") } +func LoadTree(ch *ContentHandler, id backend.ID) (Tree, error) { + if id == nil { + return nil, nil + } + + tree := Tree{} + err := ch.LoadJSON(backend.Tree, id, &tree) + if err != nil { + return nil, err + } + + return tree, nil +} + +// PopulateFrom copies subtrees and content from other when it hasn't changed. +func (t Tree) PopulateFrom(other Tree) error { + for _, node := range t { + // only copy entries for files + if node.Type != "file" { + continue + } + + // find entry in other tree + oldNode, err := other.Find(node.Name) + + // if the node could not be found, proceed to the next + if err == ErrNodeNotFound { + continue + } + + // compare content + if node.SameContent(oldNode) { + // copy Content + node.Content = oldNode.Content + } + } + + return nil +} + +func (t *Tree) Insert(node *Node) error { + pos, _, err := t.find(node.Name) + if err == nil { + // already present + return ErrNodeAlreadyInTree + } + + // insert blob + // https://code.google.com/p/go-wiki/wiki/bliceTricks + *t = append(*t, &Node{}) + copy((*t)[pos+1:], (*t)[pos:]) + (*t)[pos] = node + + return nil +} + +func (t Tree) find(name string) (int, *Node, error) { + pos := sort.Search(len(t), func(i int) bool { + return t[i].Name >= name + }) + + if pos < len(t) && t[pos].Name == name { + return pos, t[pos], nil + } + + return pos, nil, ErrNodeNotFound +} + +func (t Tree) Find(name string) (*Node, error) { + _, node, err := t.find(name) + return node, err +} + func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { stat, ok := fi.Sys().(*syscall.Stat_t) if !ok { @@ -265,6 +345,28 @@ func (node *Node) CreateAt(ch *ContentHandler, path string) error { return nil } +func (node Node) SameContent(olderNode *Node) bool { + // if this node has a type other than "file", treat as if content has changed + if node.Type != "file" { + return false + } + + // if the name or type has changed, this is surely something different + if node.Name != olderNode.Name || node.Type != olderNode.Type { + return false + } + + // if timestamps or inodes differ, content has changed + if node.ModTime != olderNode.ModTime || + node.ChangeTime != olderNode.ChangeTime || + node.Inode != olderNode.Inode { + return false + } + + // otherwise the node is assumed to have the same content + return true +} + func (b Blob) Free() { if b.ID != nil { b.ID.Free()