From d6d2721c8893c766be2717f98a549fe2c2135db4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3=20=D0=91=D0=BE=D1=80=D0=BE=D0=B4?= =?UTF-8?q?=D0=B8=D0=BD?= Date: Tue, 24 Feb 2026 15:52:31 +0200 Subject: [PATCH] working commit --- app/locker/locker.go | 66 +++++++++++++++++++++++++++++++++++++++ app/locker/locker_test.go | 62 ++++++++++++++++++++++++++++++++++++ app/operator/blob.go | 18 +++++++++++ app/operator/manifest.go | 36 ++++++++++++--------- app/operator/ociaux.go | 9 ++++-- app/operator/operator.go | 3 ++ 6 files changed, 178 insertions(+), 16 deletions(-) create mode 100644 app/locker/locker.go create mode 100644 app/locker/locker_test.go diff --git a/app/locker/locker.go b/app/locker/locker.go new file mode 100644 index 0000000..b88f099 --- /dev/null +++ b/app/locker/locker.go @@ -0,0 +1,66 @@ +package locker + +import ( + "sync" +) + +type Elem struct { + Pipe chan bool + Usage int +} + +func NewElem() *Elem { + return &Elem{ + Pipe: make(chan bool, 1), + } +} + +type Locker struct { + mtx sync.Mutex + lMap map[string]*Elem +} + +func NewLocker() *Locker { + lock := &Locker{ + lMap: make(map[string]*Elem), + } + return lock +} + +func (lock *Locker) WaitAndLock(name string) { + lock.mtx.Lock() + p, exist := lock.lMap[name] + if !exist { + p = NewElem() + lock.lMap[name] = p + p.Pipe <- true + } + p.Usage += 1 + lock.mtx.Unlock() + select { + case <-p.Pipe: + // NOP + } +} + +func (lock *Locker) Done(name string) { + lock.mtx.Lock() + p, exist := lock.lMap[name] + if exist { + p.Pipe <- true + if p.Usage > 0 { + p.Usage -= 1 + } + } + garbageKeys := make([]string, 0) + for key, _ := range lock.lMap { + elem := lock.lMap[key] + if elem.Usage == 0 && key != name { + garbageKeys = append(garbageKeys, key) + } + } + for _, key := range garbageKeys { + delete(lock.lMap, key) + } + lock.mtx.Unlock() +} diff --git a/app/locker/locker_test.go b/app/locker/locker_test.go new file mode 100644 index 0000000..3177f42 --- /dev/null +++ b/app/locker/locker_test.go @@ -0,0 +1,62 @@ +package locker + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +type Runner struct { + lock *Locker + res sync.Map +} + +func NewRunner() *Runner { + return &Runner{ + lock: NewLocker(), + } +} + +func (r *Runner) Run(wg *sync.WaitGroup, resName string, t *testing.T) { + for n := 2; n < 1000; n++ { + r.lock.WaitAndLock(resName) + + val := fmt.Sprintf("%d", n) + r.res.Store(resName, val) + + td := time.Duration(rand.Uint64()%1000 + 1) + time.Sleep(td * time.Nanosecond) + + foo, exist := r.res.Load(resName) + if !exist { + t.Errorf("not exist!\n") + } + if foo != val { + t.Errorf("not val!\n") + } + r.res.Delete(resName) + r.lock.Done(resName) + time.Sleep(1 * time.Millisecond) + } + wg.Done() +} + +func TestLocker(t *testing.T) { + run := NewRunner() + var wg sync.WaitGroup + for n := 1; n < 200; n++ { + go run.Run(&wg, "foo", t) + wg.Add(1) + } + for n := 1; n < 200; n++ { + go run.Run(&wg, "foo/bare", t) + wg.Add(1) + } + for n := 1; n < 200; n++ { + go run.Run(&wg, "foo/bare/foo", t) + wg.Add(1) + } + wg.Wait() +} diff --git a/app/operator/blob.go b/app/operator/blob.go index bd0731b..f0f7946 100644 --- a/app/operator/blob.go +++ b/app/operator/blob.go @@ -42,6 +42,11 @@ func (oper *Operator) BlobExists(ctx context.Context, operatorID string, params err = fmt.Errorf("Empty name") return res, http.StatusBadRequest, err } + + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + // Check blob descriptor descrExists, blobDescr, err := oper.mdb.GetBlobByNameDigest(ctx, params.Digest, params.Digest) if err != nil { @@ -198,6 +203,10 @@ func (oper *Operator) PutUpload(ctx context.Context, operatorID string, params * return res, http.StatusBadRequest, err } + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + var contentLength int64 if params.ContentLength != "" { contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64) @@ -247,6 +256,10 @@ func (oper *Operator) GetBlob(ctx context.Context, operatorID string, params *Ge return res, http.StatusBadRequest, err } + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + blobExists, blobSize, err := oper.store.BlobExists(params.Digest) if err != nil { return res, http.StatusInternalServerError, err @@ -294,6 +307,11 @@ func (oper *Operator) DeleteBlob(ctx context.Context, operatorID string, params err = fmt.Errorf("Empty name") return res, http.StatusBadRequest, err } + + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + // Check namespace record descrExists, _, err := oper.mdb.GetBlobByNameDigest(ctx, params.Name, params.Digest) if err != nil { diff --git a/app/operator/manifest.go b/app/operator/manifest.go index eab099d..cd8b04e 100644 --- a/app/operator/manifest.go +++ b/app/operator/manifest.go @@ -95,8 +95,8 @@ const ( ddmMimeType = "application/vnd.docker.distribution.manifest.v2+json" oimMimeType = "application/vnd.oci.image.manifest.v1+json" - oicMimeType = "application/vnd.oci.image.config.v1+json" - dciMimeType = "application/vnd.docker.container.image.v1+json" + XXXoicMimeType = "application/vnd.oci.image.config.v1+json" + XXXdciMimeType = "application/vnd.docker.container.image.v1+json" ) // TODO: lock for the name-reference or simular? @@ -112,12 +112,13 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams err = fmt.Errorf("Empty name") return res, http.StatusBadRequest, err } + // Check Content-Type var mimeIsAcceptably bool mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oimMimeType mimeIsAcceptably = mimeIsAcceptably || params.ContentType == ddmMimeType - mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oicMimeType - mimeIsAcceptably = mimeIsAcceptably || params.ContentType == dciMimeType + //mimeIsAcceptably = mimeIsAcceptably || params.ContentType == oicMimeType + //mimeIsAcceptably = mimeIsAcceptably || params.ContentType == dciMimeType if !mimeIsAcceptably { err = fmt.Errorf("Unknown or empty Content-Type: %s", params.ContentType) return res, http.StatusNotFound, err @@ -134,6 +135,10 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams return res, code, err } + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + // Copy manifest data buffer := bytes.NewBuffer(nil) _, err = io.Copy(buffer, params.Reader) @@ -155,13 +160,6 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams return res, code, err } - // Check point - oper.logg.Debugf("=== Incoming manifest type: %s", params.ContentType) - oper.logg.Debugf("=== Incoming manifest body: %s", string(incomingManifestBytes)) - if params.ContentType == oicMimeType || params.ContentType == dciMimeType { - return res, http.StatusOK, err - } - incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes) if err != nil { err = fmt.Errorf("Parsing OCI manifest error: %v", err) @@ -173,11 +171,12 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams name := params.Name reference := params.Reference + arch := incomingManifest.Subject.Platform.Architecture os := incomingManifest.Subject.Platform.OS variant := incomingManifest.Subject.Platform.Variant - manifestExists, existengManifestDescr, err := oper.mdb.GetManifestsByReferenceArchitecture(ctx, name, reference, arch, os, variant) + manifestExists, existingManifestDescr, err := oper.mdb.GetManifestsByReferenceArchitecture(ctx, name, reference, arch, os, variant) if err != nil { return res, http.StatusInternalServerError, err } @@ -213,7 +212,7 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams } */ - existingManifestBytes := []byte(existengManifestDescr.Payload) + existingManifestBytes := []byte(existingManifestDescr.Payload) existingManifest, err := auxoci.ParseOCIManifest(existingManifestBytes) if err != nil { return res, http.StatusInternalServerError, err @@ -263,7 +262,6 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference) return res, http.StatusCreated, err - } type GetManifestParams struct { @@ -290,6 +288,10 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams return res, http.StatusBadRequest, err } + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + manifestDescr := descr.Manifest{} var exists bool // TODO: checking layers? @@ -311,6 +313,7 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams res.Payload = manifestDescr.Payload } else { + // Create index of manifests exists, manifestDescrs, err := oper.mdb.GetManifestsByReference(ctx, params.Name, params.Reference) if err != nil { return res, http.StatusInternalServerError, err @@ -318,6 +321,7 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams if !exists { return res, http.StatusNotFound, err } + index, indexBytes, err := indexFromManigestDescrs(manifestDescrs) if err != nil { return res, http.StatusInternalServerError, err @@ -351,6 +355,10 @@ func (oper *Operator) DeleteManifest(ctx context.Context, params *DeleteManifest return res, http.StatusBadRequest, err } + resName := params.Name + oper.iLock.WaitAndLock(resName) + defer oper.iLock.Done(resName) + var exists bool var reference string diff --git a/app/operator/ociaux.go b/app/operator/ociaux.go index 85bd8a0..3edd8e0 100644 --- a/app/operator/ociaux.go +++ b/app/operator/ociaux.go @@ -21,11 +21,16 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) +const ( + oiiMediaType = "application/vnd.oci.image.index.v1+json" + oimMediaType = "application/vnd.oci.image.manifest.v1+json" +) + func indexFromManigestDescrs(manifestDescrs []descr.Manifest) (ocispec.Index, string, error) { var err error var payload string index := ocispec.Index{ - MediaType: ocispec.MediaTypeImageIndex, + MediaType: oiiMediaType, Manifests: make([]ocispec.Descriptor, 0), } index.Versioned.SchemaVersion = 2 @@ -42,7 +47,7 @@ func indexFromManigestDescrs(manifestDescrs []descr.Manifest) (ocispec.Index, st return index, payload, err } descriptor := ocispec.Descriptor{ - MediaType: ocispec.MediaTypeImageManifest, + MediaType: oimMediaType, Digest: auxoci.SHA256DigestFromString(manifestDescr.Payload), Size: int64(len(manifestDescr.Payload)), Platform: ociManifest.Subject.Platform, diff --git a/app/operator/operator.go b/app/operator/operator.go index 72de918..70be449 100644 --- a/app/operator/operator.go +++ b/app/operator/operator.go @@ -10,6 +10,7 @@ package operator import ( + "mstore/app/locker" "mstore/app/logger" "mstore/app/maindb" "mstore/app/storage" @@ -24,6 +25,7 @@ type Operator struct { mdb *maindb.Database store *storage.Storage logg *logger.Logger + iLock *locker.Locker } func NewOperator(params *OperatorParams) (*Operator, error) { @@ -32,6 +34,7 @@ func NewOperator(params *OperatorParams) (*Operator, error) { mdb: params.MainDB, store: params.Store, } + oper.iLock = locker.NewLocker() oper.logg = logger.NewLoggerWithSubject("operator") return oper, err }