Add functionality to make cache limit configurable (#5991)

This commit is contained in:
Annanay Agarwal
2018-06-25 22:54:12 +05:30
committed by kannappanr
parent f46ee54194
commit 78abe5234e
20 changed files with 327 additions and 13 deletions

View File

@@ -28,10 +28,10 @@ import (
)
// Initialize cache FS objects.
func initCacheFSObjects(disk string, t *testing.T) (*cacheFSObjects, error) {
func initCacheFSObjects(disk string, cacheMaxUse int, t *testing.T) (*cacheFSObjects, error) {
newTestConfig(globalMinioDefaultRegion)
var err error
obj, err := newCacheFSObjects(disk, globalCacheExpiry, 100)
obj, err := newCacheFSObjects(disk, globalCacheExpiry, cacheMaxUse)
if err != nil {
t.Fatal(err)
}
@@ -39,10 +39,10 @@ func initCacheFSObjects(disk string, t *testing.T) (*cacheFSObjects, error) {
}
// inits diskCache struct for nDisks
func initDiskCaches(drives []string, t *testing.T) (*diskCache, error) {
func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) (*diskCache, error) {
var cfs []*cacheFSObjects
for _, d := range drives {
obj, err := initCacheFSObjects(d, t)
obj, err := initCacheFSObjects(d, cacheMaxUse, t)
if err != nil {
return nil, err
}
@@ -59,7 +59,46 @@ func TestGetCacheFS(t *testing.T) {
if err != nil {
t.Fatal(err)
}
d, err := initDiskCaches(fsDirs, t)
d, err := initDiskCaches(fsDirs, 100, t)
if err != nil {
t.Fatal(err)
}
bucketName := "testbucket"
objectName := "testobject"
ctx := context.Background()
// find cache drive where object would be hashed
index := d.hashIndex(bucketName, objectName)
// turn off drive by setting online status to false
d.cfs[index].online = false
cfs, err := d.getCacheFS(ctx, bucketName, objectName)
if n == 1 && err == errDiskNotFound {
continue
}
if err != nil {
t.Fatal(err)
}
i := -1
for j, f := range d.cfs {
if f == cfs {
i = j
break
}
}
if i != (index+1)%n {
t.Fatalf("expected next cache location to be picked")
}
}
}
// test whether a drive being offline causes
// getCacheFS to fetch next online drive
func TestGetCacheFSMaxUse(t *testing.T) {
for n := 1; n < 10; n++ {
fsDirs, err := getRandomDisks(n)
if err != nil {
t.Fatal(err)
}
d, err := initDiskCaches(fsDirs, globalCacheMaxUse, t)
if err != nil {
t.Fatal(err)
}
@@ -141,7 +180,7 @@ func TestDiskCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
d, err := initDiskCaches(fsDirs, t)
d, err := initDiskCaches(fsDirs, 100, t)
if err != nil {
t.Fatal(err)
}
@@ -209,6 +248,87 @@ func TestDiskCache(t *testing.T) {
}
}
// Test diskCache with upper bound on max cache use.
func TestDiskCacheMaxUse(t *testing.T) {
fsDirs, err := getRandomDisks(1)
if err != nil {
t.Fatal(err)
}
d, err := initDiskCaches(fsDirs, globalCacheMaxUse, t)
if err != nil {
t.Fatal(err)
}
cache := d.cfs[0]
ctx := context.Background()
bucketName := "testbucket"
objectName := "testobject"
content := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
etag := "061208c10af71a30c6dcd6cf5d89f0fe"
contentType := "application/zip"
size := len(content)
httpMeta := make(map[string]string)
httpMeta["etag"] = etag
httpMeta["content-type"] = contentType
objInfo := ObjectInfo{}
objInfo.Bucket = bucketName
objInfo.Name = objectName
objInfo.Size = int64(size)
objInfo.ContentType = contentType
objInfo.ETag = etag
objInfo.UserDefined = httpMeta
byteReader := bytes.NewReader([]byte(content))
hashReader, err := hash.NewReader(byteReader, int64(size), "", "")
if err != nil {
t.Fatal(err)
}
if !cache.diskAvailable(int64(size)) {
err = cache.Put(ctx, bucketName, objectName, hashReader, httpMeta)
if err != errDiskFull {
t.Fatal("Cache max-use limit violated.")
}
} else {
err = cache.Put(ctx, bucketName, objectName, hashReader, httpMeta)
if err != nil {
t.Fatal(err)
}
cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName)
if err != nil {
t.Fatal(err)
}
if !cache.Exists(ctx, bucketName, objectName) {
t.Fatal("Expected object to exist on cache")
}
if cachedObjInfo.ETag != objInfo.ETag {
t.Fatal("Expected ETag to match")
}
if cachedObjInfo.Size != objInfo.Size {
t.Fatal("Size mismatch")
}
if cachedObjInfo.ContentType != objInfo.ContentType {
t.Fatal("Cached content-type does not match")
}
writer := bytes.NewBuffer(nil)
err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "")
if err != nil {
t.Fatal(err)
}
if ccontent := writer.Bytes(); !bytes.Equal([]byte(content), ccontent) {
t.Errorf("wrong cached file content")
}
err = cache.Delete(ctx, bucketName, objectName)
if err != nil {
t.Errorf("object missing from cache")
}
online := cache.IsOnline()
if !online {
t.Errorf("expected cache drive to be online")
}
}
}
func TestIsCacheExcludeDirective(t *testing.T) {
testCases := []struct {
cacheControlOpt string