From 3900d93559b60a1ec1c130d8756feec46c28fc0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=B3=20=D0=91=D0=BE=D1=80=D0=BE=D0=B4?= =?UTF-8?q?=D0=B8=D0=BD?= Date: Wed, 4 Feb 2026 21:43:26 +0200 Subject: [PATCH] working commit --- app/handler/blob.go | 3 + app/maindb/manifest.go | 11 +-- app/operator/blob.go | 8 +- app/operator/manifest.go | 122 +++++++++++++++++++++++++- app/operator/ociaux.go | 65 ++++++++++++++ app/storage/storage.go | 181 ++++++++++++++++++++++++++++----------- pkg/auxoci/ociaux.go | 18 ++++ 7 files changed, 340 insertions(+), 68 deletions(-) create mode 100644 app/operator/ociaux.go create mode 100644 pkg/auxoci/ociaux.go diff --git a/app/handler/blob.go b/app/handler/blob.go index 6dbb7e4..7dc3aab 100644 --- a/app/handler/blob.go +++ b/app/handler/blob.go @@ -70,6 +70,8 @@ func (hand *Handler) PatchUpload(rctx *router.Context) { contentLength := rctx.GetHeader("Content-Length") contentType := rctx.GetHeader("Content-Type") + contentRange := rctx.GetHeader("Content-Range") + name, _ := rctx.GetSubpath("name") reference, _ := rctx.GetSubpath("reference") reader := rctx.Request.Body @@ -77,6 +79,7 @@ func (hand *Handler) PatchUpload(rctx *router.Context) { params := &operator.PatchUploadParams{ ContentLength: contentLength, ContentType: contentType, + ContentRange: contentRange, Name: name, Reference: reference, Reader: reader, diff --git a/app/maindb/manifest.go b/app/maindb/manifest.go index 40b40db..262dcbe 100644 --- a/app/maindb/manifest.go +++ b/app/maindb/manifest.go @@ -37,7 +37,7 @@ func (db *Database) UpdateManifest(ctx context.Context, manifest *descr.Manifest return err } -func (db *Database) InsertManifestWithBlobs(ctx context.Context, manifest *descr.Manifest, config *descr.Blob, layers []descr.Blob) error { +func (db *Database) InsertManifestWithLayers(ctx context.Context, manifest *descr.Manifest, layers []descr.Blob) error { var err error var request string @@ -53,15 +53,6 @@ func (db *Database) InsertManifestWithBlobs(ctx context.Context, manifest *descr if err != nil { return err } - // Config - request = ` - INSERT INTO blobs(id, name, reference, mediaType, digest, size, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` - _, err = tx.Exec(request, config.ID, config.Name, config.Reference, config.MediaType, - config.Digest, config.Size, config.CreatedAt, config.UpdatedAt) - if err != nil { - return err - } // Layers for _, layer := range layers { request = ` diff --git a/app/operator/blob.go b/app/operator/blob.go index 3e090e8..b314e3f 100644 --- a/app/operator/blob.go +++ b/app/operator/blob.go @@ -97,6 +97,7 @@ func (oper *Operator) PostUpload(ctx context.Context, params *PostUploadParams) type PatchUploadParams struct { ContentType string ContentLength string + ContentRange string Name string Reference string Reader io.Reader @@ -129,7 +130,8 @@ func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams return res, http.StatusBadRequest, err } var contentLength int64 - // podman & github.com/containers/image don't set Content-length header for docker transport + + // Unfortunately, podman & github.com/containers/image don't set Content-length header for docker transport if params.ContentLength != "" { contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64) if err != nil { @@ -138,7 +140,7 @@ func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams } } - recsize, err := oper.store.WriteUpload(params.Reference, params.Reader) + recsize, _, err := oper.store.WriteUpload(params.Reference, params.Reader) if err != nil { return res, http.StatusInternalServerError, err } @@ -200,8 +202,6 @@ func (oper *Operator) PutUpload(ctx context.Context, params *PutUploadParams) (* // TODO err = fmt.Errorf("Unexpected Content-Length header: %s", params.ContentLength) return res, http.StatusInternalServerError, err - - Content - Range } err = oper.store.LinkUpload(params.Reference, params.Digest) diff --git a/app/operator/manifest.go b/app/operator/manifest.go index 30ca801..ffe896b 100644 --- a/app/operator/manifest.go +++ b/app/operator/manifest.go @@ -3,12 +3,14 @@ package operator import ( "bytes" "context" + "errors" "fmt" "io" "net/http" "strconv" "mstore/app/descr" + "mstore/pkg/auxoci" ocidigest "github.com/opencontainers/go-digest" ) @@ -94,8 +96,16 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams res := &PutManifestResult{} oper.logg.Debugf("Put manifest %s:%s", params.Name, params.Reference) + if params.Reference == "" { + err = fmt.Errorf("Empty reference") + return res, http.StatusBadRequest, err + } + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } // Check Content-Type - if params.ContentType != ddmMimeType && params.ContentType != oimMimeType { + if params.ContentType != oimMimeType { err = fmt.Errorf("Unknown or empty Content-Type: %s", params.ContentType) return res, http.StatusNotFound, err } @@ -106,8 +116,114 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams if err != nil { return res, http.StatusInternalServerError, err } - inBytes := buffer.Bytes() - oper.logg.Debugf("Manifest data: [%s]", string(inBytes)) + incomingManifestBytes := buffer.Bytes() + oper.logg.Debugf("Manifest data: [%s]", string(incomingManifestBytes)) + + incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes) + if err != nil { + err = fmt.Errorf("Parsing OCI manifest error: %v", err) + return res, http.StatusInternalServerError, err + } + if incomingManifest.MediaType != params.ContentType { + err := fmt.Errorf("Mismatch Content-Type header and manifest MediaType: %s vs %s", + params.ContentType, incomingManifest.MediaType) + return res, http.StatusInternalServerError, err + } + + manifestExists, _, err := oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !manifestExists { + name := params.Name + reference := params.Reference + manifestDescr, layerDescrs, err := descrsFromManifest(name, reference, incomingManifest, incomingManifestBytes) + // Check layers + var blobError error + for _, layer := range layerDescrs { + layerExists, _, err := oper.store.BlobFileExists(layer.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !layerExists { + err := fmt.Errorf("Layer %s not found", layer.Digest) + blobError = errors.Join(blobError, err) + } + } + if blobError != nil { + return res, http.StatusInternalServerError, blobError + } + // Store manifest and layesrs data + err = oper.mdb.InsertManifestWithLayers(ctx, &manifestDescr, layerDescrs) + if err != nil { + return res, http.StatusInternalServerError, err + } + } + + /* + exists, existingManifestDescr, err := lg.mdb.GetManifestByReference(params.Name, params.Reference) + if err != nil { + return res, http.StatusInternalServerError, err + } + + if exists { + existingManifestBytes := []byte(existingManifestDescr.Payload) + // Exist if incoming and existing manyfest is equal + if bytes.Equal(existingManifestBytes, incomingManifestBytes) { + return res, http.StatusCreated, err + } + name := params.Name + reference := params.Reference + manifestDescr, newBlobDescrs, delBlobDescrs, err := blobsDiff(name, reference, existingManifestBytes, incomingManifestBytes) + if err != nil { + return res, http.StatusInternalServerError, err + } + err = lg.maindb.UpdateManifest(ctx, &manifestDescr, newBlobDescrs, delBlobDescrs) + if err != nil { + return res, http.StatusInternalServerError, err + } + // Clean blobs + for _, blob := range delBlobDescrs { + exists, _, err = lg.st.BlobFileExists(blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + blobUsage, err := lg.maindb.GetBlobUsage(blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if exists && blobUsage == 0 { + lg.log.Debugf("Delete file %s:%s blob %s", params.Name, params.Reference, blob.Digest) + err = lg.st.DeleteBlobFile(blob.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + } + } + } else { + name := params.Name + reference := params.Reference + manifestDescr, configDescr, layerDescrs, err := descrsFromManifestBytes(name, reference, incomingManifestBytes) + // Check layer blobs + var blobError error + for _, layer := range layerDescrs { + exists, _, err = lg.st.BlobFileExists(layer.Digest) + if !exists { + err := fmt.Errorf("Blob %s not exists", layer.Digest) + blobError = errors.Join(blobError, err) + } + } + if blobError != nil { + return res, http.StatusInternalServerError, blobError + } + // Store manifest and blobs data + err = lg.maindb.InsertManifest(ctx, &manifestDescr, &configDescr, layerDescrs) + if err != nil { + return res, http.StatusInternalServerError, err + } + } + + */ res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference) return res, http.StatusCreated, err diff --git a/app/operator/ociaux.go b/app/operator/ociaux.go new file mode 100644 index 0000000..d4b15f5 --- /dev/null +++ b/app/operator/ociaux.go @@ -0,0 +1,65 @@ +package operator + +import ( + "mstore/app/descr" + "mstore/pkg/auxtool" + "mstore/pkg/auxuuid" + + ocidigest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func descrsFromManifest(name, reference string, manifest *ocispec.Manifest, rawManifest []byte) (descr.Manifest, []descr.Blob, error) { + var err error + manifestDescr := descr.Manifest{} + //configDescr := descr.Blob{} + blobDescrs := make([]descr.Blob, 0) + + timestamp := auxtool.TimeNow() + + // Make manifest descriptor + manifestDigest := ocidigest.SHA256.FromBytes(rawManifest).String() + manifestDescr = descr.Manifest{ + ID: auxuuid.NewUUID(), + Name: name, + Reference: reference, + Digest: manifestDigest, + ContentType: manifest.MediaType, + Payload: string(rawManifest), + CreatedAt: timestamp, + UpdatedAt: timestamp, + } + // Make config descriptor + ociConfig := manifest.Config + configDescr := descr.Blob{ + ID: auxuuid.NewUUID(), + Name: name, + Reference: reference, + MediaType: ociConfig.MediaType, + Digest: string(ociConfig.Digest), + Size: ociConfig.Size, + CreatedAt: timestamp, + UpdatedAt: timestamp, + } + blobDescrs = append(blobDescrs, configDescr) + // Make blob descriptions + layerMap := make(map[string]bool) + for _, layer := range manifest.Layers { + blobDescr := descr.Blob{ + ID: auxuuid.NewUUID(), + Name: name, + Reference: reference, + MediaType: layer.MediaType, + Digest: string(layer.Digest), + Size: layer.Size, + CreatedAt: timestamp, + UpdatedAt: timestamp, + } + _, alreadyAdded := layerMap[string(layer.Digest)] + if !alreadyAdded { + blobDescrs = append(blobDescrs, blobDescr) + layerMap[string(layer.Digest)] = true + } + } + return manifestDescr, blobDescrs, err +} diff --git a/app/storage/storage.go b/app/storage/storage.go index 308eab8..0743dbd 100644 --- a/app/storage/storage.go +++ b/app/storage/storage.go @@ -3,6 +3,7 @@ package storage import ( "crypto/sha256" "encoding/hex" + "errors" "fmt" "io" "os" @@ -12,6 +13,12 @@ import ( "mstore/pkg/auxuuid" ) +const ( + sha256prefix = "sha256:" + filesubdir = "files" + tmpsubdir = "tmps" +) + type Storage struct { basepath string logg *logger.Logger @@ -25,11 +32,6 @@ func NewStorage(basepath string) *Storage { return res } -const ( - filesubdir = "files" - tmpsubdir = "tmps" -) - func (store *Storage) makeCollecionpath(collection string) string { return filepath.Join(store.basepath, filesubdir, collection) } @@ -63,10 +65,51 @@ func (store *Storage) GetFileReader(collection, filename string) (io.ReadCloser, return res, err } +func (store *Storage) WriteTempFile(source io.Reader) (string, int64, string, error) { + var err error + var size int64 + var csum string + + tmpname := auxuuid.NewUUID() + tmpname = fmt.Sprintf("file-%s.tmp", tmpname) + tmppath := store.makeTmppath(tmpname) + + tmpdirpath := store.makeTmpsubdir() + err = os.MkdirAll(tmpdirpath, 0750) + if err != nil { + return tmpname, size, csum, err + } + + file, err := os.OpenFile(tmppath, os.O_WRONLY|os.O_CREATE, 0640) + if err != nil { + return tmpname, size, csum, err + } + defer file.Close() + + hasher := sha256.New() + writer := io.MultiWriter(file, hasher) + + size, err = io.Copy(writer, source) + if err != nil { + return tmpname, size, csum, err + } + csum = hex.EncodeToString(hasher.Sum(nil)) + csum = sha256prefix + csum + + return tmpname, size, csum, err +} + func (store *Storage) HardlinkFile(tmpname, collection, filename string) error { var err error + dirname := store.makeCollecionpath(collection) - err = os.MkdirAll(dirname, 0750) + _, err = os.Stat(dirname) + if errors.Is(err, os.ErrNotExist) { + err = os.MkdirAll(dirname, 0750) + if err != nil { + return err + } + } if err != nil { return err } @@ -86,40 +129,6 @@ func (store *Storage) HardlinkFile(tmpname, collection, filename string) error { return err } -func (store *Storage) WriteTempFile(source io.Reader) (string, int64, string, error) { - var err error - var size int64 - var digest string - - tmpname := auxuuid.NewUUID() - tmpname = fmt.Sprintf("file-%s.tmp", tmpname) - tmppath := store.makeTmppath(tmpname) - - tmpdirpath := store.makeTmpsubdir() - err = os.MkdirAll(tmpdirpath, 0750) - if err != nil { - return tmpname, size, digest, err - } - - file, err := os.OpenFile(tmppath, os.O_WRONLY|os.O_CREATE, 0640) - if err != nil { - return tmpname, size, digest, err - } - defer file.Close() - - hasher := sha256.New() - writer := io.MultiWriter(file, hasher) - - size, err = io.Copy(writer, source) - if err != nil { - return tmpname, size, digest, err - } - digest = hex.EncodeToString(hasher.Sum(nil)) - digest = fmt.Sprintf("sha256:%s", digest) - - return tmpname, size, digest, err -} - func (store *Storage) DeleteFile(collection, filename string) error { var err error filename = store.makeFilepath(collection, filename) @@ -127,10 +136,8 @@ func (store *Storage) DeleteFile(collection, filename string) error { if err != nil { return err } - - // TODO: more safe removing + // TODO: safe dir removing dirname := store.makeCollecionpath(collection) - err = os.RemoveAll(dirname) if err != nil { return err @@ -159,20 +166,27 @@ func (store *Storage) makeBlobsubdir() string { return filepath.Join(store.basepath, blobsubdir) } -func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error) { +func (store *Storage) WriteUpload(uploadID string, source io.Reader) (int64, string, error) { var err error var recsize int64 + var recsum string uploadDir := store.makeUpsubdir() - err = os.MkdirAll(uploadDir, 0750) + _, err = os.Stat(uploadDir) + if errors.Is(err, os.ErrNotExist) { + err = os.MkdirAll(uploadDir, 0750) + if err != nil { + return recsize, recsum, err + } + } if err != nil { - return recsize, err + return recsize, recsum, err } - uploadPath := store.makeUppath(digest) + uploadPath := store.makeUppath(uploadID) uploadFile, err := os.OpenFile(uploadPath, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { - return recsize, err + return recsize, recsum, err } defer uploadFile.Close() @@ -180,9 +194,12 @@ func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error streamWriter := io.MultiWriter(uploadFile, hasher) recsize, err = io.Copy(streamWriter, source) if err != nil { - return recsize, err + return recsize, recsum, err } - return recsize, err + recsum = hex.EncodeToString(hasher.Sum(nil)) + recsum = sha256prefix + recsum + + return recsize, recsum, err } func (store *Storage) LinkUpload(reference, digest string) error { @@ -191,7 +208,13 @@ func (store *Storage) LinkUpload(reference, digest string) error { blobPath := store.makeBlobpath(digest) blobdir := store.makeBlobsubdir() - err = os.MkdirAll(blobdir, 0750) + _, err = os.Stat(blobdir) + if errors.Is(err, os.ErrNotExist) { + err = os.MkdirAll(blobdir, 0750) + if err != nil { + return err + } + } if err != nil { return err } @@ -216,3 +239,59 @@ func (store *Storage) RemoveUpload(digest string) error { } return err } + +func (store *Storage) WriteBlob(digest string, source io.Reader) (int64, string, error) { + var err error + var recsize int64 + var recsum string + + //defer source.Close() + + blobdir := store.makeBlobsubdir() + _, err = os.Stat(blobdir) + if errors.Is(err, os.ErrNotExist) { + err = os.MkdirAll(blobdir, 0750) + if err != nil { + return recsize, digest, err + } + } + if err != nil { + return recsize, digest, err + } + + blobpath := store.makeBlobpath(digest) + blobfile, err := os.OpenFile(blobpath, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return recsize, digest, err + } + defer blobfile.Close() + + hasher := sha256.New() // TODO + multiWriter := io.MultiWriter(blobfile, hasher) + recsize, err = io.Copy(multiWriter, source) + if err != nil { + return recsize, digest, err + } + recsum = hex.EncodeToString(hasher.Sum(nil)) + recsum = sha256prefix + recsum + + return recsize, recsum, err +} + +func (st *Storage) BlobFileExists(digest string) (bool, int64, error) { + var err error + var fileSize int64 + + blobPath := st.makeBlobpath(digest) + + fileStat, err := os.Stat(blobPath) + if errors.Is(err, os.ErrNotExist) { + return false, 0, nil + } + if err != nil { + return false, 0, err + } + + fileSize = fileStat.Size() + return true, fileSize, err +} diff --git a/pkg/auxoci/ociaux.go b/pkg/auxoci/ociaux.go new file mode 100644 index 0000000..6296a02 --- /dev/null +++ b/pkg/auxoci/ociaux.go @@ -0,0 +1,18 @@ +package auxoci + +import ( + "encoding/json" + "fmt" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func ParseOCIManifest(source []byte) (*ocispec.Manifest, error) { + manifest := &ocispec.Manifest{} + err := json.Unmarshal(source, &manifest) + if err != nil { + err = fmt.Errorf("Manifest parsing error: %v", err) + return manifest, err + } + return manifest, err +}