Fix backend format for disk-cache - not to use FS format.json (#5732)

This commit is contained in:
Krishna Srinivas
2018-03-29 14:38:26 -07:00
committed by Dee Koder
parent 328076f773
commit 804a4f9c15
12 changed files with 204 additions and 166 deletions

View File

@@ -18,15 +18,12 @@ package cmd
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"sort"
"strconv"
"strings"
@@ -115,7 +112,6 @@ func backendDownError(err error) bool {
// until an online drive is found.If object is not found, fall back to the first online cache drive
// closest to the hash index, so that object can be recached.
func (c diskCache) getCachedFSLoc(ctx context.Context, bucket, object string) (*cacheFSObjects, error) {
index := c.hashIndex(bucket, object)
numDisks := len(c.cfs)
// save first online cache disk closest to the hint index
@@ -145,7 +141,6 @@ func (c diskCache) getCachedFSLoc(ctx context.Context, bucket, object string) (*
// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
// as a circular buffer and walk through them starting at hash index until an online drive is found.
func (c diskCache) getCacheFS(ctx context.Context, bucket, object string) (*cacheFSObjects, error) {
index := c.hashIndex(bucket, object)
numDisks := len(c.cfs)
for k := 0; k < numDisks; k++ {
@@ -162,8 +157,7 @@ func (c diskCache) getCacheFS(ctx context.Context, bucket, object string) (*cach
// Compute a unique hash sum for bucket and object
func (c diskCache) hashIndex(bucket, object string) int {
key := fmt.Sprintf("%x", sha256.Sum256([]byte(path.Join(bucket, object))))
return int(crc32.Checksum([]byte(key), crc32.IEEETable)) % len(c.cfs)
return crcHashMod(pathJoin(bucket, object), len(c.cfs))
}
// construct a metadata k-v map
@@ -496,7 +490,6 @@ func (c cacheObjects) listBuckets(ctx context.Context) (buckets []BucketInfo, er
// Returns list of buckets from cache or the backend. If the backend is down, buckets
// available on cache are served.
func (c cacheObjects) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
listBucketsFn := c.ListBucketsFn
buckets, err = listBucketsFn(ctx)
if err != nil {
@@ -510,7 +503,6 @@ func (c cacheObjects) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
// Returns bucket info from cache if backend is down.
func (c cacheObjects) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
getBucketInfoFn := c.GetBucketInfoFn
bucketInfo, err = getBucketInfoFn(ctx, bucket)
if backendDownError(err) {
@@ -525,7 +517,6 @@ func (c cacheObjects) GetBucketInfo(ctx context.Context, bucket string) (bucketI
// Delete Object deletes from cache as well if backend operation succeeds
func (c cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
if err = c.DeleteObjectFn(ctx, bucket, object); err != nil {
return
}
@@ -619,7 +610,6 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h
// NewMultipartUpload - Starts a new multipart upload operation to backend and cache.
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
newMultipartUploadFn := c.NewMultipartUploadFn
if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
@@ -643,7 +633,6 @@ func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object str
// PutObjectPart - uploads part to backend and cache simultaneously.
func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) {
putObjectPartFn := c.PutObjectPartFn
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
@@ -713,7 +702,6 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
// AbortMultipartUpload - aborts multipart upload on backend and cache.
func (c cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error {
abortMultipartUploadFn := c.AbortMultipartUploadFn
if c.isCacheExclude(bucket, object) {
@@ -737,7 +725,6 @@ func (c cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object,
// CompleteMultipartUpload - completes multipart upload operation on backend and cache.
func (c cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) {
completeMultipartUploadFn := c.CompleteMultipartUploadFn
if c.isCacheExclude(bucket, object) {
@@ -802,33 +789,32 @@ func (c cacheObjects) DeleteBucket(ctx context.Context, bucket string) (err erro
// newCache initializes the cacheFSObjects for the "drives" specified in config.json
// or the global env overrides.
func newCache(c CacheConfig) (*diskCache, error) {
func newCache(config CacheConfig) (*diskCache, error) {
var cfsObjects []*cacheFSObjects
formats, err := loadAndValidateCacheFormat(c.Drives)
formats, err := loadAndValidateCacheFormat(config.Drives)
if err != nil {
errorIf(err, "Cache drives validation error")
return nil, err
}
if len(formats) == 0 {
return nil, errors.New("Cache drives validation error")
}
for i, dir := range c.Drives {
for i, dir := range config.Drives {
// skip cacheFSObjects creation for cache drives missing a format.json
if formats[i] == nil {
cfsObjects = append(cfsObjects, nil)
continue
}
c, err := newCacheFSObjects(dir, c.Expiry, cacheMaxDiskUsagePct)
if err != nil {
return nil, err
}
if err := checkAtimeSupport(dir); err != nil {
return nil, errors.New("Atime support required for disk caching")
}
cache, err := newCacheFSObjects(dir, config.Expiry, cacheMaxDiskUsagePct)
if err != nil {
return nil, err
}
// Start the purging go-routine for entries that have expired
go c.purge()
go cache.purge()
// Start trash purge routine for deleted buckets.
go c.purgeTrash()
cfsObjects = append(cfsObjects, c)
go cache.purgeTrash()
cfsObjects = append(cfsObjects, cache)
}
return &diskCache{cfs: cfsObjects}, nil
}
@@ -857,17 +843,16 @@ func checkAtimeSupport(dir string) (err error) {
}
// Returns cacheObjects for use by Server.
func newServerCacheObjects(c CacheConfig) (CacheObjectLayer, error) {
func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
dcache, err := newCache(c)
dcache, err := newCache(config)
if err != nil {
return nil, err
}
return &cacheObjects{
cache: dcache,
exclude: c.Exclude,
exclude: config.Exclude,
listPool: newTreeWalkPool(globalLookupTimeout),
GetObjectFn: func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
return newObjectLayerFn().GetObject(ctx, bucket, object, startOffset, length, writer, etag)