working commit

This commit is contained in:
2026-02-24 15:52:31 +02:00
parent e2e4ec1776
commit d6d2721c88
6 changed files with 178 additions and 16 deletions
+66
View File
@@ -0,0 +1,66 @@
package locker
import (
"sync"
)
type Elem struct {
Pipe chan bool
Usage int
}
func NewElem() *Elem {
return &Elem{
Pipe: make(chan bool, 1),
}
}
type Locker struct {
mtx sync.Mutex
lMap map[string]*Elem
}
func NewLocker() *Locker {
lock := &Locker{
lMap: make(map[string]*Elem),
}
return lock
}
func (lock *Locker) WaitAndLock(name string) {
lock.mtx.Lock()
p, exist := lock.lMap[name]
if !exist {
p = NewElem()
lock.lMap[name] = p
p.Pipe <- true
}
p.Usage += 1
lock.mtx.Unlock()
select {
case <-p.Pipe:
// NOP
}
}
func (lock *Locker) Done(name string) {
lock.mtx.Lock()
p, exist := lock.lMap[name]
if exist {
p.Pipe <- true
if p.Usage > 0 {
p.Usage -= 1
}
}
garbageKeys := make([]string, 0)
for key, _ := range lock.lMap {
elem := lock.lMap[key]
if elem.Usage == 0 && key != name {
garbageKeys = append(garbageKeys, key)
}
}
for _, key := range garbageKeys {
delete(lock.lMap, key)
}
lock.mtx.Unlock()
}
+62
View File
@@ -0,0 +1,62 @@
package locker
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
)
type Runner struct {
lock *Locker
res sync.Map
}
func NewRunner() *Runner {
return &Runner{
lock: NewLocker(),
}
}
func (r *Runner) Run(wg *sync.WaitGroup, resName string, t *testing.T) {
for n := 2; n < 1000; n++ {
r.lock.WaitAndLock(resName)
val := fmt.Sprintf("%d", n)
r.res.Store(resName, val)
td := time.Duration(rand.Uint64()%1000 + 1)
time.Sleep(td * time.Nanosecond)
foo, exist := r.res.Load(resName)
if !exist {
t.Errorf("not exist!\n")
}
if foo != val {
t.Errorf("not val!\n")
}
r.res.Delete(resName)
r.lock.Done(resName)
time.Sleep(1 * time.Millisecond)
}
wg.Done()
}
func TestLocker(t *testing.T) {
run := NewRunner()
var wg sync.WaitGroup
for n := 1; n < 200; n++ {
go run.Run(&wg, "foo", t)
wg.Add(1)
}
for n := 1; n < 200; n++ {
go run.Run(&wg, "foo/bare", t)
wg.Add(1)
}
for n := 1; n < 200; n++ {
go run.Run(&wg, "foo/bare/foo", t)
wg.Add(1)
}
wg.Wait()
}
+18
View File
@@ -42,6 +42,11 @@ func (oper *Operator) BlobExists(ctx context.Context, operatorID string, params
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
// Check blob descriptor
descrExists, blobDescr, err := oper.mdb.GetBlobByNameDigest(ctx, params.Digest, params.Digest)
if err != nil {
@@ -198,6 +203,10 @@ func (oper *Operator) PutUpload(ctx context.Context, operatorID string, params *
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
var contentLength int64
if params.ContentLength != "" {
contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64)
@@ -247,6 +256,10 @@ func (oper *Operator) GetBlob(ctx context.Context, operatorID string, params *Ge
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
blobExists, blobSize, err := oper.store.BlobExists(params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
@@ -294,6 +307,11 @@ func (oper *Operator) DeleteBlob(ctx context.Context, operatorID string, params
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
// Check namespace record
descrExists, _, err := oper.mdb.GetBlobByNameDigest(ctx, params.Name, params.Digest)
if err != nil {
+22 -14
View File
@@ -95,8 +95,8 @@ const (
ddmMimeType = "application/vnd.docker.distribution.manifest.v2+json"
oimMimeType = "application/vnd.oci.image.manifest.v1+json"
oicMimeType = "application/vnd.oci.image.config.v1+json"
dciMimeType = "application/vnd.docker.container.image.v1+json"
XXXoicMimeType = "application/vnd.oci.image.config.v1+json"
XXXdciMimeType = "application/vnd.docker.container.image.v1+json"
)
// TODO: lock for the name-reference or simular?
@@ -112,12 +112,13 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
// Check Content-Type
var mimeIsAcceptably bool
mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oimMimeType
mimeIsAcceptably = mimeIsAcceptably || params.ContentType == ddmMimeType
mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oicMimeType
mimeIsAcceptably = mimeIsAcceptably || params.ContentType == dciMimeType
//mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oicMimeType
//mimeIsAcceptably = mimeIsAcceptably || params.ContentType == dciMimeType
if !mimeIsAcceptably {
err = fmt.Errorf("Unknown or empty Content-Type: %s", params.ContentType)
return res, http.StatusNotFound, err
@@ -134,6 +135,10 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
return res, code, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
// Copy manifest data
buffer := bytes.NewBuffer(nil)
_, err = io.Copy(buffer, params.Reader)
@@ -155,13 +160,6 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
return res, code, err
}
// Check point
oper.logg.Debugf("=== Incoming manifest type: %s", params.ContentType)
oper.logg.Debugf("=== Incoming manifest body: %s", string(incomingManifestBytes))
if params.ContentType == oicMimeType || params.ContentType == dciMimeType {
return res, http.StatusOK, err
}
incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes)
if err != nil {
err = fmt.Errorf("Parsing OCI manifest error: %v", err)
@@ -173,11 +171,12 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
name := params.Name
reference := params.Reference
arch := incomingManifest.Subject.Platform.Architecture
os := incomingManifest.Subject.Platform.OS
variant := incomingManifest.Subject.Platform.Variant
manifestExists, existengManifestDescr, err := oper.mdb.GetManifestsByReferenceArchitecture(ctx, name, reference, arch, os, variant)
manifestExists, existingManifestDescr, err := oper.mdb.GetManifestsByReferenceArchitecture(ctx, name, reference, arch, os, variant)
if err != nil {
return res, http.StatusInternalServerError, err
}
@@ -213,7 +212,7 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
}
*/
existingManifestBytes := []byte(existengManifestDescr.Payload)
existingManifestBytes := []byte(existingManifestDescr.Payload)
existingManifest, err := auxoci.ParseOCIManifest(existingManifestBytes)
if err != nil {
return res, http.StatusInternalServerError, err
@@ -263,7 +262,6 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference)
return res, http.StatusCreated, err
}
type GetManifestParams struct {
@@ -290,6 +288,10 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
manifestDescr := descr.Manifest{}
var exists bool
// TODO: checking layers?
@@ -311,6 +313,7 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams
res.Payload = manifestDescr.Payload
} else {
// Create index of manifests
exists, manifestDescrs, err := oper.mdb.GetManifestsByReference(ctx, params.Name, params.Reference)
if err != nil {
return res, http.StatusInternalServerError, err
@@ -318,6 +321,7 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams
if !exists {
return res, http.StatusNotFound, err
}
index, indexBytes, err := indexFromManigestDescrs(manifestDescrs)
if err != nil {
return res, http.StatusInternalServerError, err
@@ -351,6 +355,10 @@ func (oper *Operator) DeleteManifest(ctx context.Context, params *DeleteManifest
return res, http.StatusBadRequest, err
}
resName := params.Name
oper.iLock.WaitAndLock(resName)
defer oper.iLock.Done(resName)
var exists bool
var reference string
+7 -2
View File
@@ -21,11 +21,16 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
const (
oiiMediaType = "application/vnd.oci.image.index.v1+json"
oimMediaType = "application/vnd.oci.image.manifest.v1+json"
)
func indexFromManigestDescrs(manifestDescrs []descr.Manifest) (ocispec.Index, string, error) {
var err error
var payload string
index := ocispec.Index{
MediaType: ocispec.MediaTypeImageIndex,
MediaType: oiiMediaType,
Manifests: make([]ocispec.Descriptor, 0),
}
index.Versioned.SchemaVersion = 2
@@ -42,7 +47,7 @@ func indexFromManigestDescrs(manifestDescrs []descr.Manifest) (ocispec.Index, st
return index, payload, err
}
descriptor := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
MediaType: oimMediaType,
Digest: auxoci.SHA256DigestFromString(manifestDescr.Payload),
Size: int64(len(manifestDescr.Payload)),
Platform: ociManifest.Subject.Platform,
+3
View File
@@ -10,6 +10,7 @@
package operator
import (
"mstore/app/locker"
"mstore/app/logger"
"mstore/app/maindb"
"mstore/app/storage"
@@ -24,6 +25,7 @@ type Operator struct {
mdb *maindb.Database
store *storage.Storage
logg *logger.Logger
iLock *locker.Locker
}
func NewOperator(params *OperatorParams) (*Operator, error) {
@@ -32,6 +34,7 @@ func NewOperator(params *OperatorParams) (*Operator, error) {
mdb: params.MainDB,
store: params.Store,
}
oper.iLock = locker.NewLocker()
oper.logg = logger.NewLoggerWithSubject("operator")
return oper, err
}