1
0
Fork 0
forked from External/grumble

pkg/blobstore: modernize the blobstore package.

This commit is contained in:
Mikkel Krautz 2013-02-04 23:36:45 +01:00
parent 8effbbc6b3
commit 3dc3b25f57
10 changed files with 244 additions and 409 deletions

View file

@ -2,8 +2,7 @@
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
// This package implements a simple disk-persisted content-addressed
// blobstore.
// This package implements a simple disk-persisted content-addressed blobstore.
package blobstore
import (
@ -13,263 +12,179 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
)
var (
// ErrNoSuchKey signals that a blob with the given key does
// not exist in the BlobStore.
ErrNoSuchKey = errors.New("blobstore: no such key")
// ErrBadKey signals that the given key is not well formed.
ErrBadKey = errors.New("blobstore: bad key")
)
// BlobStore represents a simple disk-persisted content addressible
// blob store that uses the file system for persistence.
//
// Blobs in the blobstore are indexed by their SHA1 hash.
//
// The BlobStore is backed by a directory on the filesystem. This
// directory contains subdirectories which contain keys (SHA1 hashes).
// Each subdirectory is named according to the first hex-encoded byte
// of the keys that subdirectory contains.
//
// For example, a file that has the content 'hello world' will have
// the SHA1 hash '2aae6c35c94fcfb415dbe95f408b9ce91ee846ed'. If our
// blobstore's backing directory is called 'blobstore', the blob with
// only 'hello world' in it will be stored as follows:
//
// blobstore/2a/2aae6c35c94fcfb415dbe95f408b9ce91ee846ed
//
// The BlobStore is self-synchronizing, relying on the filesystem
// operations to ensure atomicity. Thus, accessing a single BlobStore
// from multiple goroutines should have no ill side effects.
type BlobStore struct {
dir string
}
var (
ErrBadFile = errors.New("a bad file exists in the blobstore directory. unable to create container directores.")
ErrNoSuchKey = errors.New("no such key")
ErrInvalidKey = errors.New("invalid key")
)
var (
defaultStore *BlobStore
defaultMutex sync.Mutex
)
// Open an existing, or create a new BlobStore at path.
// Path must point to a directory, and must already exist.
// See NewBlobStore for more information.
func Open(path string) (err error) {
defaultMutex.Lock()
defer defaultMutex.Unlock()
if defaultStore != nil {
panic("Default BlobStore already open")
}
defaultStore, err = NewBlobStore(path)
return
// Open opens an existing BlobStore. The path parameter must
// point to a directory that already exists for correct
// operation, however, the Open function does not check that
// this is the case.
func Open(path string) BlobStore {
return BlobStore{dir: path}
}
// Close the open default BlobStore. This removes the lockfile allowing
// other processes to open the BlobStore.
func Close() (err error) {
if defaultStore == nil {
panic("DefaultStore not open")
}
err = defaultStore.Close()
if err != nil {
return
}
defaultStore = nil
return
}
// Lookup a blob by its key and return a buffer containing the contents
// of the blob.
func Get(key string) (buf []byte, err error) {
return defaultStore.Get(key)
}
// Store a blob. If the blob was successfully stored, the returned key
// can be used to retrieve the buf from the BlobStore.
func Put(buf []byte) (key string, err error) {
return defaultStore.Put(buf)
}
// Open an existing, or create a new BlobStore residing at path.
// Path must point to a directory, and must already exist.
func NewBlobStore(path string) (bs *BlobStore, err error) {
// Does the directory exist?
dir, err := os.Open(path)
if err != nil {
return
}
dir.Close()
dirStructureExists := true
// Check whether a 'blobstore' file exists in the directory.
// The existence of the file signals that the directory already
// has the correct hierarchy structure.
bsf, err := os.Open(filepath.Join(path, "blobstore"))
if err != nil {
if os.IsNotExist(err) {
dirStructureExists = false
}
} else {
bsf.Close()
}
if !dirStructureExists {
for i := 0; i < 256; i++ {
outer := filepath.Join(path, hex.EncodeToString([]byte{byte(i)}))
err = os.Mkdir(outer, 0700)
if os.IsExist(err) {
// The path already exists. Stat it to check whether it is indeed
// a directory.
fi, err := os.Stat(outer)
if err != nil {
return nil, err
}
if !fi.IsDir() {
return nil, ErrBadFile
}
} else if err != nil {
return nil, err
}
for j := 0; j < 256; j++ {
inner := filepath.Join(outer, hex.EncodeToString([]byte{byte(j)}))
err = os.Mkdir(inner, 0700)
if os.IsExist(err) {
// The path already exists. Stat it to check whether it is indeed
// a directory.
fi, err := os.Stat(outer)
if err != nil {
return nil, err
}
if !fi.IsDir() {
return nil, ErrBadFile
}
} else if err != nil {
return nil, err
}
}
}
// Add a blobstore file to signal that a correct directory
// structure exists for this blobstore.
bsf, err = os.Create(filepath.Join(path, "blobstore"))
if err != nil {
return nil, err
}
bsf.Close()
}
bs = &BlobStore{
dir: path,
}
return bs, nil
}
// Close an open BlobStore. This removes the lockfile allowing
// other processes to open the BlobStore.
func (bs *BlobStore) Close() (err error) {
return nil
}
// Checks that a given key is a valid key for the BlobStore.
// If it is, it returns the three components that make up the on-disk path
// the given key can be found or should be stored at.
func getKeyComponents(key string) (dir1, dir2, fn string, err error) {
// SHA1 digests are 40 bytes long when hex-encoded
// isValidKey checks whether key is a valid BlobStore key.
func isValidKey(key string) bool {
// SHA1 digests are 40 bytes long when hex-encoded.
if len(key) != 40 {
err = ErrInvalidKey
return
return false
}
// Check whether the string is valid hex-encoding.
_, err = hex.DecodeString(key)
_, err := hex.DecodeString(key)
if err != nil {
err = ErrInvalidKey
return
return false
}
return key[0:2], key[2:4], key[4:], nil
return true
}
// Lookup the path hat a key would have on disk.
// Returns an error if the key is not a valid BlobStore key.
func (bs *BlobStore) pathForKey(key string) (fn string, err error) {
dir1, dir2, rest, err := getKeyComponents(key)
if err != nil {
return
// extractKeyComponents returns the directory and the filename that the
// blob identified by key should be stored under in the BlobStore.
// This function also checks whether the key is valid. If not, it returns
// ErrBadKey.
func extractKeyComponents(key string) (dir string, fn string, err error) {
if !isValidKey(key) {
return "", "", ErrBadKey
}
fn = filepath.Join(bs.dir, dir1, dir2, rest)
return
return key[0:2], key, nil
}
// Lookup a blob by its key and return a buffer containing the contents
// of the blob.
func (bs *BlobStore) Get(key string) (buf []byte, err error) {
fn, err := bs.pathForKey(key)
// Get returns a byte slice containing the contents of
// the blob identified by key. If no such blob is found,
// Get returns ErrNoSuchKey.
func (bs BlobStore) Get(key string) ([]byte, error) {
dir, fn, err := extractKeyComponents(key)
if err != nil {
return
return nil, err
}
file, err := os.Open(fn)
blobfn := filepath.Join(bs.dir, dir, fn)
f, err := os.Open(blobfn)
if os.IsNotExist(err) {
err = ErrNoSuchKey
return
return nil, ErrNoSuchKey
} else if err != nil {
return
return nil, err
}
br, err := newBlobReader(file, key)
br, err := newBlobReader(f, key)
if err != nil {
file.Close()
return
f.Close()
return nil, err
}
defer br.Close()
buf, err = ioutil.ReadAll(br)
buf, err := ioutil.ReadAll(br)
if err != nil {
return
return nil, err
}
return
return buf, nil
}
// Store a blob. If the blob was successfully stored, the returned key
// can be used to retrieve the buf from the BlobStore.
func (bs *BlobStore) Put(buf []byte) (key string, err error) {
// Put puts the contents of blob into the BlobStore. If
// the blob was successfully stored, the returned key can
// be used to retrieve the buf from the BlobStore at a
// later time.
func (bs BlobStore) Put(buf []byte) (key string, err error) {
// Calculate the key for the blob. We can't really delay it more than this,
// since we need to know the key for the blob to check whether it's already on
// disk.
h := sha1.New()
h.Write(buf)
_, err = h.Write(buf)
if err != nil {
return "", err
}
key = hex.EncodeToString(h.Sum(nil))
// Get the components that make up the on-disk
// path for the blob.
dir1, dir2, rest, err := getKeyComponents(key)
if err != nil {
return
}
blobpath := filepath.Join(bs.dir, dir1, dir2, rest)
blobdir := filepath.Join(bs.dir, dir1, dir2)
// Check if the blob already exists.
file, err := os.Open(blobpath)
if err == nil {
// File exists. Job's done.
file.Close()
return
} else {
if os.IsNotExist(err) {
// No such file exists on disk. Ready to rock!
} else {
return
}
}
// Create a temporary file to write to. Once we're done, we
// can atomically rename the file to the correct key.
file, err = ioutil.TempFile(blobdir, rest)
if err != nil {
return
}
tmpfn := file.Name()
_, err = file.Write(buf)
if err != nil {
return
}
err = file.Sync()
dir, fn, err := extractKeyComponents(key)
if err != nil {
return "", err
}
err = file.Close()
blobdir := filepath.Join(bs.dir, dir)
blobpath := filepath.Join(blobdir, fn)
// Check if the blob already exists.
_, err = os.Stat(blobpath)
if err == nil {
// The file already exists. Our job is done.
return key, nil
} else if os.IsNotExist(err) {
// The blob does not exist on disk yet.
// Fallthrough.
} else if err != nil {
return "", err
}
// Ensure that blobdir exist.
err = os.Mkdir(blobdir, 0750)
if err != nil && !os.IsExist(err) {
return "", err
}
// Create a temporary file to write to.
//
// Once we're done, we can atomically rename the file
// to the correct key.
//
// This method is racy: two callers can attempt to write
// the same blob at the same time. This shouldn't affect
// the consistency of the final blob, but worst case, we've
// done some extra work.
f, err := ioutil.TempFile(blobdir, fn)
if err != nil {
return "", err
}
tmpfn := f.Name()
_, err = f.Write(buf)
if err != nil {
f.Close()
return "", err
}
err = f.Sync()
if err != nil {
f.Close()
return "", err
}
err = f.Close()
if err != nil {
return "", err
}
@ -277,7 +192,7 @@ func (bs *BlobStore) Put(buf []byte) (key string, err error) {
err = os.Rename(tmpfn, blobpath)
if err != nil {
os.Remove(tmpfn)
return
return "", err
}
return key, nil