From e81037d75fb3a743071eb106175e01c23dec1b85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3=20=D0=91=D0=BE=D1=80=D0=BE=D0=B4?= =?UTF-8?q?=D0=B8=D0=BD?= Date: Thu, 5 Feb 2026 14:37:54 +0200 Subject: [PATCH] working commit --- app/handler/blob.go | 52 +++++++++++++ app/handler/manifest.go | 2 +- app/maindb/blob.go | 32 +++++++- app/operator/blob.go | 88 ++++++++++++++++++++++ app/operator/manifest.go | 159 +++++++++++++++++---------------------- app/operator/ociaux.go | 65 ++++++++++++++++ app/service/service.go | 38 ++++++++++ app/storage/storage.go | 40 +++++++++- pkg/auxoci/ociaux.go | 4 +- pkg/auxtool/cleandir.go | 47 ++++++------ 10 files changed, 408 insertions(+), 119 deletions(-) diff --git a/app/handler/blob.go b/app/handler/blob.go index 695857e..bd1947b 100644 --- a/app/handler/blob.go +++ b/app/handler/blob.go @@ -1,6 +1,9 @@ package handler import ( + "io" + "net/http" + "mstore/app/operator" "mstore/app/router" @@ -125,3 +128,52 @@ func (hand *Handler) PutUpload(rctx *router.Context) { rctx.SetHeader("Location", res.Location) rctx.SetStatus(code) } + +// GET /v2//blobs/ 200 404 +func (hand *Handler) GetBlob(rctx *router.Context) { + + name, _ := rctx.GetSubpath("name") + digest, _ := rctx.GetSubpath("digest") + + params := &operator.GetBlobParams{ + Name: name, + Digest: digest, + } + ctx := rctx.GetContext() + res, code, err := hand.oper.GetBlob(ctx, params) + if err != nil { + hand.logg.Errorf("GetBlob error: %v", err) + } + + rctx.SetHeader("Content-Length", res.ContentLength) + rctx.SetHeader("Content-Type", res.ContentType) + rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest) + rctx.SetStatus(code) + + defer res.ReadCloser.Close() + _, err = io.Copy(rctx.Writer, res.ReadCloser) + if err != nil { + hand.logg.Errorf("GetFile error: %v", err) + rctx.SetStatus(http.StatusInternalServerError) + return + } +} + +// DELETE /v2//blobs/ 202 404/405 +func (hand *Handler) DeleteBlob(rctx *router.Context) { + + name, _ := rctx.GetSubpath("name") + digest, _ := rctx.GetSubpath("digest") + + params := &operator.DeleteBlobParams{ + Name: name, + Digest: digest, + } + ctx := rctx.GetContext() + _, code, err := hand.oper.DeleteBlob(ctx, params) + if err != nil { + hand.logg.Errorf("DeleteBlob error: %v", err) + } + + rctx.SetStatus(code) +} diff --git a/app/handler/manifest.go b/app/handler/manifest.go index 4a35d16..f201371 100644 --- a/app/handler/manifest.go +++ b/app/handler/manifest.go @@ -89,7 +89,7 @@ func (hand *Handler) ManifestExists(rctx *router.Context) { func (hand *Handler) PutManifest(rctx *router.Context) { - hand.DumpHeaders("PutManifest headers", rctx) + //hand.DumpHeaders("PutManifest headers", rctx) contentType := rctx.GetHeader("Content-Type") contentLength := rctx.GetHeader("Content-Length") diff --git a/app/maindb/blob.go b/app/maindb/blob.go index 7fbef76..908f575 100644 --- a/app/maindb/blob.go +++ b/app/maindb/blob.go @@ -37,6 +37,23 @@ func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, d return exists, res, err } +func (db *Database) GetBlobByNameDigest(ctx context.Context, name, digest string) (bool, descr.Blob, error) { + var err error + blobs := make([]descr.Blob, 0) + res := descr.Blob{} + exists := false + request := `SELECT * FROM blobs WHERE name = $1 AND digest = $1 LIMIT 1` + err = db.db.Select(&blobs, request, name, digest) + if err != nil { + return exists, res, err + } + if len(blobs) > 0 { + res = blobs[0] + exists = true + } + return exists, res, err +} + func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) { var err error blobs := make([]descr.Blob, 0) @@ -79,8 +96,7 @@ func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, err var err error var usage int64 count := make([]int64, 0) - request := ` - SELECT count(id) AS count FROM blobs WHERE digest = $1` + request := `SELECT count(id) AS count FROM blobs WHERE digest = $1` err = db.db.Select(&count, request, digest) if err != nil { return usage, err @@ -89,7 +105,7 @@ func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, err return usage, err } -func (db *Database) DeleteBlobByDigest(ctx context.Context, digest string) error { +func (db *Database) xxxxDeleteBlobByDigest(ctx context.Context, digest string) error { var err error request := `DELETE FROM blobs WHERE digest = $1;` _, err = db.db.Exec(request, digest) @@ -98,3 +114,13 @@ func (db *Database) DeleteBlobByDigest(ctx context.Context, digest string) error } return err } + +func (db *Database) DeleteBlobByNameDigest(ctx context.Context, name, digest string) error { + var err error + request := `DELETE FROM blobs WHERE name = $1 AND digest = $2;` + _, err = db.db.Exec(request, name, digest) + if err != nil { + return err + } + return err +} diff --git a/app/operator/blob.go b/app/operator/blob.go index b928d51..f88660d 100644 --- a/app/operator/blob.go +++ b/app/operator/blob.go @@ -24,6 +24,7 @@ func (oper *Operator) BlobExists(ctx context.Context, params *BlobExistsParams) var err error res := &BlobExistsResult{} oper.logg.Debugf("Call BlobExists") + if params.Digest == "" { err = fmt.Errorf("Empty reference") return res, http.StatusBadRequest, err @@ -224,3 +225,90 @@ func (oper *Operator) PutUpload(ctx context.Context, params *PutUploadParams) (* res.Location = fmt.Sprintf("/v2/%s/blobs/%s", params.Name, params.Digest) return res, http.StatusCreated, err } + +type GetBlobParams struct { + Name string + Digest string +} +type GetBlobResult struct { + ContentLength string + ContentType string + DockerContentDigest string + ReadCloser io.ReadCloser +} + +func (oper *Operator) GetBlob(ctx context.Context, params *GetBlobParams) (*GetBlobResult, int, error) { + var err error + res := &GetBlobResult{} + oper.logg.Debugf("Calling GetBlob %s:%s", params.Name, params.Digest) + + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } + if params.Digest == "" { + err = fmt.Errorf("Empty digest") + return res, http.StatusBadRequest, err + } + + blobExists, blobSize, err := oper.store.BlobExists(params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !blobExists { + oper.logg.Debugf("Blob %s:%s not exists", params.Name, params.Digest) + return res, http.StatusNotFound, err + } + + _, readCloser, err := oper.store.BlobReader(params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + + res.ContentLength = strconv.FormatInt(blobSize, 10) + res.DockerContentDigest = params.Digest + res.ReadCloser = readCloser + return res, http.StatusOK, err +} + +type DeleteBlobParams struct { + Name string + Digest string +} +type DeleteBlobResult struct{} + +func (oper *Operator) DeleteBlob(ctx context.Context, params *DeleteBlobParams) (*DeleteBlobResult, int, error) { + var err error + res := &DeleteBlobResult{} + oper.logg.Debugf("DeleteBlob") + + if params.Digest == "" { + err = fmt.Errorf("Empty digest") + return res, http.StatusBadRequest, err + } + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } + // Check namespace record + exists, _, err := oper.mdb.GetBlobByNameDigest(ctx, params.Name, params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !exists { + return res, http.StatusNotFound, err + } + err = oper.mdb.DeleteBlobByNameDigest(ctx, params.Name, params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + // Removing blob file if usage == 0 + blobUsage, err := oper.mdb.GetBlobUsage(ctx, params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if blobUsage == 0 { + oper.store.DeleteBlob(params.Digest) + } + return res, http.StatusOK, err +} diff --git a/app/operator/manifest.go b/app/operator/manifest.go index 7c59a56..db225b6 100644 --- a/app/operator/manifest.go +++ b/app/operator/manifest.go @@ -90,8 +90,7 @@ type PutManifestResult struct { Location string } -// TODO: control size 413 Payload Too Large - +// TODO: lock for the name-reference or simular? func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams) (*PutManifestResult, int, error) { var err error res := &PutManifestResult{} @@ -128,6 +127,7 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams if err != nil { return res, http.StatusInternalServerError, err } + incomingManifestBytes := buffer.Bytes() if int64(len(incomingManifestBytes)) != contentLength { err = fmt.Errorf("Mismatch Content-Length and received manifest size: %d vs %d", @@ -136,7 +136,12 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams return res, code, err } - oper.logg.Debugf("Manifest data: [%s]", string(incomingManifestBytes)) + if len(incomingManifestBytes) > (4 * 1024 * 1024) { + err = fmt.Errorf("Payload more 4M: %d bytes", len(incomingManifestBytes)) + code := http.StatusRequestEntityTooLarge + return res, code, err + } + //oper.logg.Debugf("Manifest data: [%s]", string(incomingManifestBytes)) incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes) if err != nil { @@ -147,100 +152,76 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams incomingManifest.MediaType = params.ContentType } - manifestExists, _, err := oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference) + manifestExists, existengManifestDescr, err := oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference) if err != nil { return res, http.StatusInternalServerError, err } - if !manifestExists { - name := params.Name - reference := params.Reference - manifestDescr, layerDescrs, err := descrsFromManifest(name, reference, incomingManifest, incomingManifestBytes) - // Check layers - var blobError error - for _, layer := range layerDescrs { - layerExists, _, err := oper.store.BlobFileExists(layer.Digest) - if err != nil { - return res, http.StatusInternalServerError, err - } - if !layerExists { - err := fmt.Errorf("Layer %s not found", layer.Digest) - blobError = errors.Join(blobError, err) - } - } - if blobError != nil { - return res, http.StatusInternalServerError, blobError - } - // Store manifest and layesrs data - err = oper.mdb.InsertManifestWithLayers(ctx, &manifestDescr, layerDescrs) + + name := params.Name + reference := params.Reference + + incomingManifestDescr, incomingLayerDescrs, err := descrsFromManifest(name, reference, incomingManifest, incomingManifestBytes) + // Always check layer files for availability + var blobError error + for _, layer := range incomingLayerDescrs { + layerExists, _, err := oper.store.BlobExists(layer.Digest) if err != nil { return res, http.StatusInternalServerError, err } + if !layerExists { + err := fmt.Errorf("Layer %s not found.", layer.Digest) + blobError = errors.Join(blobError, err) + } } + if blobError != nil { + return res, http.StatusInternalServerError, blobError + } + if !manifestExists { + // Store manifest and layesrs data + err = oper.mdb.InsertManifestWithLayers(ctx, &incomingManifestDescr, incomingLayerDescrs) + if err != nil { + return res, http.StatusInternalServerError, err + } + } else { + /* TODO: only update descr + if bytes.Equal(existingManifestBytes, incomingManifestBytes) { + return res, http.StatusCreated, err + } + */ - /* - exists, existingManifestDescr, err := lg.mdb.GetManifestByReference(params.Name, params.Reference) - if err != nil { - return res, http.StatusInternalServerError, err - } - - if exists { - existingManifestBytes := []byte(existingManifestDescr.Payload) - // Exist if incoming and existing manyfest is equal - if bytes.Equal(existingManifestBytes, incomingManifestBytes) { - return res, http.StatusCreated, err - } - name := params.Name - reference := params.Reference - manifestDescr, newBlobDescrs, delBlobDescrs, err := blobsDiff(name, reference, existingManifestBytes, incomingManifestBytes) - if err != nil { - return res, http.StatusInternalServerError, err - } - err = lg.maindb.UpdateManifest(ctx, &manifestDescr, newBlobDescrs, delBlobDescrs) - if err != nil { - return res, http.StatusInternalServerError, err - } - // Clean blobs - for _, blob := range delBlobDescrs { - exists, _, err = lg.st.BlobFileExists(blob.Digest) - if err != nil { - return res, http.StatusInternalServerError, err - } - blobUsage, err := lg.maindb.GetBlobUsage(blob.Digest) - if err != nil { - return res, http.StatusInternalServerError, err - } - if exists && blobUsage == 0 { - lg.log.Debugf("Delete file %s:%s blob %s", params.Name, params.Reference, blob.Digest) - err = lg.st.DeleteBlobFile(blob.Digest) - if err != nil { - return res, http.StatusInternalServerError, err - } - } - } - } else { - name := params.Name - reference := params.Reference - manifestDescr, configDescr, layerDescrs, err := descrsFromManifestBytes(name, reference, incomingManifestBytes) - // Check layer blobs - var blobError error - for _, layer := range layerDescrs { - exists, _, err = lg.st.BlobFileExists(layer.Digest) - if !exists { - err := fmt.Errorf("Blob %s not exists", layer.Digest) - blobError = errors.Join(blobError, err) - } - } - if blobError != nil { - return res, http.StatusInternalServerError, blobError - } - // Store manifest and blobs data - err = lg.maindb.InsertManifest(ctx, &manifestDescr, &configDescr, layerDescrs) - if err != nil { - return res, http.StatusInternalServerError, err - } - } - - */ + existingManifestBytes := []byte(existengManifestDescr.Payload) + existingManifest, err := auxoci.ParseOCIManifest(existingManifestBytes) + if err != nil { + return res, http.StatusInternalServerError, err + } + addedBlobDescrs, uselessBlobDescrs, err := layersDiff(name, reference, + existingManifest, incomingManifest, incomingManifestBytes) + if err != nil { + return res, http.StatusInternalServerError, err + } + // Starting manifest and blobs transaction + err = oper.mdb.UpdateManifestWithBlobs(ctx, &incomingManifestDescr, addedBlobDescrs, uselessBlobDescrs) + if err != nil { + return res, http.StatusInternalServerError, err + } + for _, blob := range uselessBlobDescrs { + exists, _, err := oper.store.BlobExists(blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + blobUsage, err := oper.mdb.GetBlobUsage(ctx, blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if exists && blobUsage == 0 { + oper.logg.Debugf("Delete file %s:%s blob %s", name, reference, blob.Digest) + err = oper.store.DeleteBlob(blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + } + } + } res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference) return res, http.StatusCreated, err diff --git a/app/operator/ociaux.go b/app/operator/ociaux.go index d4b15f5..fbdd767 100644 --- a/app/operator/ociaux.go +++ b/app/operator/ociaux.go @@ -63,3 +63,68 @@ func descrsFromManifest(name, reference string, manifest *ocispec.Manifest, rawM } return manifestDescr, blobDescrs, err } + +func layersDiff(name, reference string, existingManifest, incomingManifest *ocispec.Manifest, rawManifest []byte) ([]descr.Blob, []descr.Blob, error) { + var err error + newBlobDescrs := make([]descr.Blob, 0) + delBlobDescrs := make([]descr.Blob, 0) + + // Calculating old layers + delLayers := make([]ocispec.Descriptor, 0) + for _, oldLayer := range existingManifest.Layers { + uniq := true + for _, newLayer := range incomingManifest.Layers { + if oldLayer.Digest == newLayer.Digest { + uniq = false + } + } + if uniq { + delLayers = append(delLayers, oldLayer) + } + } + // Calculating new layers + newLayers := make([]ocispec.Descriptor, 0) + for _, newLayer := range incomingManifest.Layers { + uniq := true + for _, oldLayer := range existingManifest.Layers { + if oldLayer.Digest == newLayer.Digest { + uniq = false + } + } + if uniq { + newLayers = append(newLayers, newLayer) + } + } + if incomingManifest.Config.Digest != existingManifest.Config.Digest { + delLayers = append(delLayers, existingManifest.Config) + newLayers = append(newLayers, incomingManifest.Config) + } + // Converting to new blob + timestamp := auxtool.TimeNow() + for _, layer := range newLayers { + blobDescr := descr.Blob{ + ID: auxuuid.NewUUID(), + Name: name, + Reference: reference, + MediaType: layer.MediaType, + Digest: string(layer.Digest), + Size: layer.Size, + CreatedAt: timestamp, + UpdatedAt: timestamp, + } + newBlobDescrs = append(newBlobDescrs, blobDescr) + } + // Converting to old blobs + for _, layer := range delLayers { + blobDescr := descr.Blob{ + ID: auxuuid.NewUUID(), + Name: name, + Reference: reference, + MediaType: layer.MediaType, + Digest: string(layer.Digest), + Size: layer.Size, + } + delBlobDescrs = append(delBlobDescrs, blobDescr) + } + return newBlobDescrs, delBlobDescrs, err +} diff --git a/app/service/service.go b/app/service/service.go index a519439..272d575 100644 --- a/app/service/service.go +++ b/app/service/service.go @@ -213,6 +213,44 @@ func (svc *Service) Build() error { svc.rout.Put(`/v2/{name}/uploads/{reference}`, svc.hand.PutUpload) + // Pulling blobs + // + // To pull a blob, perform a GET request to a URL in the following form: + // + // /v2//blobs/ end-2 + // + // is the namespace of the repository, and is the blob's digest. + // + // A GET request to an existing blob URL MUST provide the expected blob, + // with a response code that MUST be 200 OK. + // A successful response SHOULD contain the digest of the uploaded blob in the header Docker-Content-Digest. + // If present, the value of this header MUST be a digest matching that of the response body. + // Most clients MAY ignore the value, but if it is used, the client MUST verify the value + // matches the returned response body. Clients SHOULD verify that the response body + // matches the requested digest. + // + // If the blob is not found in the repository, the response code MUST be 404 Not Found. + // + // A registry SHOULD support the Range request header in accordance with RFC 9110. + + svc.rout.Get(`/v2/{name}/blobs/{digest}`, svc.hand.GetBlob) + + // Deleting Blobs + // + // To delete a blob, perform a DELETE request to a path in the following format: + // + // /v2//blobs/ + // + // is the namespace of the repository, and is the digest + // of the blob to be deleted. + // + // Upon success, the registry MUST respond with code 202 Accepted. + // If the blob is not found, a 404 Not Found code MUST be returned. + // If blob deletion is disabled, the registry MUST respond with either + // a 400 Bad Request or a 405 Method Not Allowed. + + svc.rout.Delete(`/v2/{name}/blobs/{digest}`, svc.hand.DeleteBlob) + svc.rout.NotFound(svc.hand.NotFound) selector := svc.rout.Selector() diff --git a/app/storage/storage.go b/app/storage/storage.go index 037245c..6873358 100644 --- a/app/storage/storage.go +++ b/app/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "crypto/sha256" "encoding/hex" "errors" @@ -309,7 +310,7 @@ func (store *Storage) WriteBlob(digest string, source io.Reader) (int64, string, return recsize, recsum, err } -func (st *Storage) BlobFileExists(digest string) (bool, int64, error) { +func (st *Storage) BlobExists(digest string) (bool, int64, error) { var err error var fileSize int64 @@ -326,3 +327,40 @@ func (st *Storage) BlobFileExists(digest string) (bool, int64, error) { fileSize = fileStat.Size() return true, fileSize, err } + +func (store *Storage) BlobReader(digest string) (int64, io.ReadCloser, error) { + var err error + var filesize int64 + blobpath := store.makeBlobpath(digest) + + emptyReadCloser := io.NopCloser(bytes.NewReader(nil)) + + file, err := os.OpenFile(blobpath, os.O_RDONLY, 0) + if err != nil { + return filesize, emptyReadCloser, err + } + defer func() { + if err != nil { + file.Close() + } + }() + filestat, err := file.Stat() + if err != nil { + return filesize, emptyReadCloser, err + } + filesize = filestat.Size() + return filesize, file, err +} + +func (store *Storage) DeleteBlob(digest string) error { + var err error + blobpath := store.makeBlobpath(digest) + err = os.Remove(blobpath) + if errors.Is(err, os.ErrNotExist) { + err = nil + } + if err != nil { + return err + } + return err +} diff --git a/pkg/auxoci/ociaux.go b/pkg/auxoci/ociaux.go index 6296a02..541375d 100644 --- a/pkg/auxoci/ociaux.go +++ b/pkg/auxoci/ociaux.go @@ -7,9 +7,9 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) -func ParseOCIManifest(source []byte) (*ocispec.Manifest, error) { +func ParseOCIManifest(rawManifest []byte) (*ocispec.Manifest, error) { manifest := &ocispec.Manifest{} - err := json.Unmarshal(source, &manifest) + err := json.Unmarshal(rawManifest, &manifest) if err != nil { err = fmt.Errorf("Manifest parsing error: %v", err) return manifest, err diff --git a/pkg/auxtool/cleandir.go b/pkg/auxtool/cleandir.go index a52a410..e5538fc 100644 --- a/pkg/auxtool/cleandir.go +++ b/pkg/auxtool/cleandir.go @@ -1,33 +1,34 @@ package auxtool import ( - "path/filepath" - "os" - "strings" + "os" + "path/filepath" + "strings" ) +// Clean only overbase elements of dir path if possible func CleanDirs(basedir, datadir string) { - separator := string(os.PathSeparator) + separator := string(os.PathSeparator) - basedir = filepath.Clean(separator + basedir) - datadir = filepath.Clean(separator + datadir) + basedir = filepath.Clean(separator + basedir) + datadir = filepath.Clean(separator + datadir) items := strings.Split(datadir, separator) - for i := len(items); i > 0; i-- { - p := filepath.Join(items[0:i]...) - p = filepath.Clean(separator + p) - if p == basedir { - break - } - fileInfo, err := os.Stat(p) - if err != nil { - return - } - if fileInfo.IsDir() { - err = os.Remove(p) - if err != nil { - return - } - } - } + for i := len(items); i > 0; i-- { + p := filepath.Join(items[0:i]...) + p = filepath.Clean(separator + p) + if p == basedir { + break + } + fileInfo, err := os.Stat(p) + if err != nil { + return + } + if fileInfo.IsDir() { + err = os.Remove(p) + if err != nil { + return + } + } + } }