diff --git a/app/descr/blob.go b/app/descr/blob.go index 604ff46..6cfddbf 100644 --- a/app/descr/blob.go +++ b/app/descr/blob.go @@ -9,4 +9,6 @@ type Blob struct { Size int64 `db:"size" json:"size"` CreatedAt string `db:"created_at" json:"createdAt"` UpdatedAt string `db:"updated_at" json:"updatedAt"` + CreatedBy string `db:"created_by" json:"createdBy,omitempty"` + UpdatedBy string `db:"updated_by" json:"updatedBy,omitempty"` } diff --git a/app/descr/manifest.go b/app/descr/manifest.go index 7694036..871505d 100644 --- a/app/descr/manifest.go +++ b/app/descr/manifest.go @@ -9,4 +9,6 @@ type Manifest struct { Digest string `db:"digest" json:"digest"` CreatedAt string `db:"created_at" json:"createdAt"` UpdatedAt string `db:"updated_at" json:"updatedAt"` + CreatedBy string `db:"created_by" json:"createdBy,omitempty"` + UpdatedBy string `db:"updated_by" json:"updatedBy,omitempty"` } diff --git a/app/handler/blob.go b/app/handler/blob.go new file mode 100644 index 0000000..268fcfc --- /dev/null +++ b/app/handler/blob.go @@ -0,0 +1,82 @@ +package handler + +import ( + "mstore/app/operator" + "mstore/app/router" +) + +// HEAD /v2//blobs/ 200 404 + +func (hand *Handler) BlobExists(rctx *router.Context) { + name, _ := rctx.GetSubpath("name") + digest, _ := rctx.GetSubpath("digest") + + hand.logg.Debugf("Handle BlobExists with name=[%s] digest=[%s]", name, digest) + params := &operator.BlobExistsParams{ + Name: name, + Digest: digest, + } + ctx := rctx.GetContext() + res, code, err := hand.oper.BlobExists(ctx, params) + if err != nil { + hand.logg.Errorf("BlobExist error: %v", err) + } else if res.Exists { + rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest) + rctx.SetHeader("Content-Length", res.ContentLength) + } + rctx.SetStatus(code) +} + +// POST /v2//blobs/uploads/ 202 404 +func (hand *Handler) PostUpload(rctx *router.Context) { + name, _ := rctx.GetSubpath("name") + digest := rctx.GetQuery("digest") + mount := rctx.GetQuery("mount") + from := rctx.GetQuery("from") + + hand.logg.Debugf("Handle PostUpload with name=[%s] digest=[%s]", name, digest) + params := &operator.PostUploadParams{ + Name: name, + Digest: digest, + Mount: mount, + From: from, + } + res, code, err := hand.oper.PostUpload(rctx.Ctx, params) + if err != nil { + hand.logg.Errorf("PostUpload error: %v", err) + } else { + hand.logg.Debugf("PostUpload send location=[%s] code=%d", res.Location, code) + rctx.SetHeader("Location", res.Location) + rctx.SetHeader("Content-Length", res.ContentLength) + rctx.SetHeader("Docker-Upload-UUID", res.DockerUploadUUID) + } + rctx.SetStatus(code) +} + +// POST /v2//blobs/uploads/?digest= 201/202 404/400 +// POST /v2//blobs/uploads/?mount=&from= 201 404 + +// PATCH /v2//blobs/uploads/ 202 404/416 +func (hand *Handler) PatchUpload(rctx *router.Context) { + + contentLength := rctx.GetHeader("Content-Length") + contentType := rctx.GetHeader("Content-Type") + name, _ := rctx.GetSubpath("name") + reference, _ := rctx.GetSubpath("reference") + reader := rctx.Request.Body + + params := &operator.PatchUploadParams{ + ContentLength: contentLength, + ContentType: contentType, + Name: name, + Reference: reference, + Reader: reader, + } + ctx := rctx.GetContext() + res, code, err := hand.oper.PatchUpload(ctx, params) + if err != nil { + hand.logg.Errorf("PatchUpload error: %v", err) + } + rctx.SetHeader("Location", res.Location) + rctx.SetStatus(code) +} diff --git a/app/handler/file.go b/app/handler/file.go index 57b7308..da72d66 100644 --- a/app/handler/file.go +++ b/app/handler/file.go @@ -24,10 +24,10 @@ func (hand *Handler) FileExists(rctx *router.Context) { return } // TODO - rctx.SetHeader("X-Content-Type", res.ContentType) - rctx.SetHeader("X-Content-Length", res.ContentLength) - rctx.SetHeader("X-Content-Digest", res.ContentDigest) - rctx.SetHeader("Content-Length", zeroContentLength) + rctx.SetHeader("Content-Type", res.ContentType) + rctx.SetHeader("Content-Length", res.ContentLength) + rctx.SetHeader("Content-Digest", res.ContentDigest) + //rctx.SetHeader("Content-Length", zeroContentLength) rctx.SetStatus(code) } diff --git a/app/handler/manifest.go b/app/handler/manifest.go new file mode 100644 index 0000000..f15ce8a --- /dev/null +++ b/app/handler/manifest.go @@ -0,0 +1,107 @@ +package handler + +import ( + //"mstore/app/descr" + "mstore/app/operator" + "mstore/app/router" +) + +// https://github.com/opencontainers/distribution-spec/blob/main/spec.md +// +// Open Container Initiative Distribution Specification +// +// Existing Manifests +// +// The image manifest can be checked for existence with the following url: +// +// HEAD /v2//manifests/ +// +// The name and reference parameter identify the image and are required. The reference may include a tag or digest. +// +// A 404 Not Found response will be returned if the image is unknown to the registry. +// If the image exists and the response is successful the response will be as follows: +// +// 200 OK +// Content-Length: +// Docker-Content-Digest: + +func (hand *Handler) ManifestExists(rctx *router.Context) { + name, _ := rctx.GetSubpath("name") + reference, _ := rctx.GetSubpath("reference") + params := &operator.ManifestExistsParams{ + Name: name, + Reference: reference, + } + ctx := rctx.GetContext() + res, code, err := hand.oper.ManifestExists(ctx, params) + if err != nil { + hand.logg.Errorf("ManifestExist error: %v", err) + } else if res.Exists { + rctx.SetHeader("Content-Length", res.ContentLength) + rctx.SetHeader("Content-Type", res.ContentType) + rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest) + } + rctx.SetStatus(code) +} + +// https://github.com/opencontainers/distribution-spec/blob/main/spec.md +// +// Pushing Manifests +// +// To push a manifest, perform a PUT request to a path in the following format, +// and with the following headers and body: /v2//manifests/ +// +// Clients SHOULD set the Content-Type header to the type of the manifest being pushed. +// The client SHOULD NOT include parameters on the Content-Type header (see RFC7231). +// The registry SHOULD ignore parameters on the Content-Type header. +// +// All manifests SHOULD include a mediaType field declaring the type of the manifest being pushed. +// If a manifest includes a mediaType field, clients MUST set the Content-Type header to +// the value specified by the mediaType field. +// +// Content-Type: application/vnd.oci.image.manifest.v1+json +// +// is the namespace of the repository, and the MUST be either a) a digest or b) a tag. +// +// The uploaded manifest MUST reference any blobs that make up the object. +// However, the list of blobs MAY be empty. +// +// The registry MUST store the manifest in the exact byte representation provided by the client. +// Upon a successful upload, the registry MUST return response code 201 Created, +// and MUST have the following header: +// +// Location: +// +// The is a pullable manifest URL. The Docker-Content-Digest header returns +// the canonical digest of the uploaded blob, and MUST be equal to the client provided digest. +// Clients MAY ignore the value but if it is used, the client SHOULD verify +// the value against the uploaded blob data. +// +// An attempt to pull a nonexistent repository MUST return response code 404 Not Found. +// +// A registry SHOULD enforce some limit on the maximum manifest size that it can accept. +// A registry that enforces this limit SHOULD respond to a request to push a manifest over this +// limit with a response code 413 Payload Too Large. +// Client and registry implementations SHOULD expect to be able to support manifest +// pushes of at least 4 megabytes. + +// PUT /v2//manifests/ 201 404 +func (hand *Handler) PutManifest(rctx *router.Context) { + name, _ := rctx.GetSubpath("name") + reference, _ := rctx.GetSubpath("reference") + contentType := rctx.GetHeader("Content-Type") + params := &operator.PutManifestParams{ + ContentType: contentType, + Name: name, + Reference: reference, + Reader: rctx.Request.Body, + } + ctx := rctx.GetContext() + res, code, err := hand.oper.PutManifest(ctx, params) + if err != nil { + hand.logg.Errorf("PutManifest error: %v", err) + } else { + rctx.SetHeader("Location", res.Location) + } + rctx.SetStatus(code) +} diff --git a/app/maindb/blob.go b/app/maindb/blob.go index 7edd029..7fbef76 100644 --- a/app/maindb/blob.go +++ b/app/maindb/blob.go @@ -20,6 +20,23 @@ func (db *Database) InsertBlob(ctx context.Context, layer *descr.Blob) error { return err } +func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, descr.Blob, error) { + var err error + blobs := make([]descr.Blob, 0) + res := descr.Blob{} + exists := false + request := `SELECT * FROM blobs WHERE digest = $1 LIMIT 1` + err = db.db.Select(&blobs, request, digest) + if err != nil { + return exists, res, err + } + if len(blobs) > 0 { + res = blobs[0] + exists = true + } + return exists, res, err +} + func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) { var err error blobs := make([]descr.Blob, 0) @@ -31,7 +48,7 @@ func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) { return blobs, err } -func (db *Database) BlobExists(ctx context.Context, name, reference, digest string) (bool, error) { +func (db *Database) xxxBlobExists(ctx context.Context, name, reference, digest string) (bool, error) { var err error blobs := make([]descr.Blob, 0) request := ` @@ -58,23 +75,6 @@ func (db *Database) GetBlobsByReferense(ctx context.Context, name, reference str return blobs, err } -func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, descr.Blob, error) { - var err error - blobs := make([]descr.Blob, 0) - res := descr.Blob{} - exists := false - request := `SELECT * FROM blobs WHERE digest = $1 LIMIT 1` - err = db.db.Select(&blobs, request, digest) - if err != nil { - return exists, res, err - } - if len(blobs) > 0 { - res = blobs[0] - exists = true - } - return exists, res, err -} - func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, error) { var err error var usage int64 diff --git a/app/maindb/schema.go b/app/maindb/schema.go index bd49b23..adc7e75 100644 --- a/app/maindb/schema.go +++ b/app/maindb/schema.go @@ -15,4 +15,36 @@ const schema = ` ); CREATE UNIQUE INDEX IF NOT EXISTS file_index ON file(collection, name); + + --- DROP TABLE IF EXISTS manifests; + CREATE TABLE IF NOT EXISTS manifests ( + id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + reference VARCHAR(255) NOT NULL, + digest VARCHAR(1024) NOT NULL, + contentType VARCHAR(255) NOT NULL, + payload VARCHAR(4096) NOT NULL, + created_at VARCHAR(255) NOT NULL, + updated_at VARCHAR(255) NOT NULL, + created_by VARCHAR(255) NOT NULL, + updated_by VARCHAR(255) NOT NULL + ); + CREATE UNIQUE INDEX IF NOT EXISTS manifest_index + ON manifests(name, reference); + + CREATE TABLE IF NOT EXISTS blobs ( + id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + reference VARCHAR(255) NOT NULL, + mediaType VARCHAR(255) NOT NULL, + digest VARCHAR(255) NOT NULL, + size INTEGER NOT NULL, + created_at VARCHAR(255) NOT NULL, + updated_at VARCHAR(255) NOT NULL, + created_by VARCHAR(255) NOT NULL, + updated_by VARCHAR(255) NOT NULL + ); + CREATE UNIQUE INDEX IF NOT EXISTS blobs_index + ON blobs(name, reference, digest); + ` diff --git a/app/operator/blob.go b/app/operator/blob.go new file mode 100644 index 0000000..6a9c040 --- /dev/null +++ b/app/operator/blob.go @@ -0,0 +1,153 @@ +package operator + +import ( + "context" + "fmt" + "io" + "net/http" + "strconv" + + "mstore/pkg/auxuuid" +) + +type BlobExistsParams struct { + Name string + Digest string +} +type BlobExistsResult struct { + DockerContentDigest string + ContentLength string + Exists bool +} + +func (oper *Operator) BlobExists(ctx context.Context, params *BlobExistsParams) (*BlobExistsResult, int, error) { + var err error + res := &BlobExistsResult{} + oper.logg.Debugf("Call BlobExists") + if params.Digest == "" { + err = fmt.Errorf("Empty reference") + return res, http.StatusBadRequest, err + } + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } + exists, blobDescr, err := oper.mdb.GetBlobByDigest(ctx, params.Digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !exists { + return res, http.StatusNotFound, err + } + res.ContentLength = strconv.FormatInt(blobDescr.Size, 10) + res.DockerContentDigest = params.Digest + res.Exists = exists + + return res, http.StatusOK, err +} + +// https://github.com/opencontainers/distribution-spec/blob/v1.1.1/spec.md +// +// PostUpload +// +// POST then PUT +// +// To push a blob monolithically by using a POST request followed by a PUT request, there are two steps: +// +// - Obtain a session id (upload URL) +// - Upload the blob to said URL +// +// A chunked blob upload is accomplished in three phases: +// +// - Obtain a session ID (upload URL) (POST) +// - Upload the chunks (PATCH) +// Close the session (PUT) + +type PostUploadParams struct { + Name string + Digest string + Mount string + From string +} +type PostUploadResult struct { + DockerUploadUUID string + Location string + ContentLength string +} + +func (oper *Operator) PostUpload(ctx context.Context, params *PostUploadParams) (*PostUploadResult, int, error) { + var err error + res := &PostUploadResult{} + oper.logg.Debugf("PostUpload") + + if params.Digest == "" { + uuid := auxuuid.NewUUID() + location := fmt.Sprintf("/v2/%s/blobs/uploads/%s", params.Name, uuid) + res.DockerUploadUUID = uuid + res.Location = location + res.ContentLength = strconv.FormatInt(0, 10) + return res, http.StatusAccepted, err + } else { + err = fmt.Errorf("PostUpload: Not empty digest header") + return res, http.StatusInternalServerError, err + } + return res, http.StatusOK, err +} + +type PatchUploadParams struct { + ContentType string + ContentLength string + Name string + Reference string + Reader io.Reader +} +type PatchUploadResult struct { + Location string +} + +// The response for each successful chunk upload MUST be 202 Accepted, and MUST have the following headers: +// +// Location: +// Range: 0- + +func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams) (*PatchUploadResult, int, error) { + var err error + res := &PatchUploadResult{} + oper.logg.Debugf("Call PatchUpload") + + if params.Reference == "" { + err = fmt.Errorf("Empty reference") + return res, http.StatusBadRequest, err + } + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } + + if params.ContentType != "application/octet-stream" { + err = fmt.Errorf("Wrong Conten-Type header: %s", params.ContentType) + return res, http.StatusBadRequest, err + } + if params.ContentLength == "" { + err = fmt.Errorf("Empty Content-length header") + return res, http.StatusBadRequest, err + } + contentLength, err := strconv.ParseInt(params.ContentLength, 10, 64) + if err != nil { + err = fmt.Errorf("Wrong Content-length header") + return res, http.StatusBadRequest, err + } + + recsize, err := oper.store.WriteUpload(params.Reference, params.Reader) + if err != nil { + return res, http.StatusInternalServerError, err + } + if recsize != contentLength { + oper.store.RemoveUpload(params.Reference) + err = fmt.Errorf("Mismatch upload recorded size and content length") + return res, http.StatusInternalServerError, err + } + res.Location = fmt.Sprintf("/v2/%s/uploads/%s", params.Name, params.Reference) + + return res, http.StatusAccepted, err // http.StatusCreated +} diff --git a/app/operator/imgaux.go b/app/operator/imgaux.go new file mode 100644 index 0000000..22aa801 --- /dev/null +++ b/app/operator/imgaux.go @@ -0,0 +1,22 @@ +package operator + +import ( + "encoding/hex" + "strings" +) + +const sha256prefix = "sha256:" + +func stringLikeSHA256Digest(some string) bool { + if strings.HasPrefix(some, sha256prefix) { + some = strings.TrimPrefix(some, sha256prefix) + } + _, err := hex.DecodeString(some) + if err != nil { + return false + } + if len(some) == 64 { + return true + } + return false +} diff --git a/app/operator/manifest.go b/app/operator/manifest.go new file mode 100644 index 0000000..30ca801 --- /dev/null +++ b/app/operator/manifest.go @@ -0,0 +1,115 @@ +package operator + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strconv" + + "mstore/app/descr" + + ocidigest "github.com/opencontainers/go-digest" +) + +const ( + ddmMimeType = "application/vnd.docker.distribution.manifest.v2+json" + oimMimeType = "application/vnd.oci.image.manifest.v1+json" +) + +type ManifestExistsParams struct { + Name string + Reference string +} +type ManifestExistsResult struct { + ContentLength string + ContentType string + DockerContentDigest string + Exists bool +} + +func (oper *Operator) ManifestExists(ctx context.Context, params *ManifestExistsParams) (*ManifestExistsResult, int, error) { + var err error + res := &ManifestExistsResult{} + + if params.Name == "" { + err = fmt.Errorf("Empty name") + return res, http.StatusBadRequest, err + } + if params.Reference == "" { + err = fmt.Errorf("Empty reference") + return res, http.StatusBadRequest, err + } + + oper.logg.Debugf("Head manifest [%s:%s]", params.Name, params.Reference) + + var manifest descr.Manifest + var exists bool + if stringLikeSHA256Digest(params.Reference) { + digest := fmt.Sprintf("%s:%s", sha256prefix, params.Reference) + oper.logg.Debugf("Find manifest %s by digest %s", params.Name, params.Reference) + exists, manifest, err = oper.mdb.GetManifestByDigest(ctx, params.Name, digest) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !exists { + return res, http.StatusNotFound, err + } + } else { + oper.logg.Debugf("Find manifest %s by reference %s", params.Name, params.Reference) + exists, manifest, err = oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference) + if err != nil { + return res, http.StatusInternalServerError, err + } + if !exists { + return res, http.StatusNotFound, err + } + } + + digest := ocidigest.SHA256.FromString(manifest.Payload) + payloadSize := len(manifest.Payload) + res.ContentLength = strconv.FormatInt(int64(payloadSize), 10) + res.ContentType = manifest.ContentType + res.DockerContentDigest = digest.String() + res.Exists = exists + + return res, http.StatusOK, err +} + +type PutManifestParams struct { + ContentType string + Name string + Reference string + Reader io.Reader +} +type PutManifestResult struct { + Location string +} + +// TODO: control size 413 Payload Too Large + +func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams) (*PutManifestResult, int, error) { + var err error + res := &PutManifestResult{} + oper.logg.Debugf("Put manifest %s:%s", params.Name, params.Reference) + + // Check Content-Type + if params.ContentType != ddmMimeType && params.ContentType != oimMimeType { + err = fmt.Errorf("Unknown or empty Content-Type: %s", params.ContentType) + return res, http.StatusNotFound, err + } + + // Copy manifest data + buffer := bytes.NewBuffer(nil) + _, err = io.Copy(buffer, params.Reader) + if err != nil { + return res, http.StatusInternalServerError, err + } + inBytes := buffer.Bytes() + oper.logg.Debugf("Manifest data: [%s]", string(inBytes)) + + res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference) + return res, http.StatusCreated, err + +} diff --git a/app/router/context.go b/app/router/context.go index cbfef2d..a05243f 100644 --- a/app/router/context.go +++ b/app/router/context.go @@ -25,6 +25,15 @@ func NewContext(writer http.ResponseWriter, request *http.Request) *Context { return rctx } +func (rctx *Context) GetSubpath(key string) (string, bool) { + value, exists := rctx.PathMap[key] + return value, exists +} + +func (rctx *Context) GetQuery(key string) string { + return rctx.Request.URL.Query().Get(key) +} + func (rctx *Context) SetHeader(key, value string) { rctx.Writer.Header().Set(key, value) } diff --git a/app/service/service.go b/app/service/service.go index cdb0d89..0fdfcba 100644 --- a/app/service/service.go +++ b/app/service/service.go @@ -67,9 +67,128 @@ func (svc *Service) Build() error { svc.rout.Get("/v3/api/file/{filepath}", svc.hand.GetFile) svc.rout.Delete("/v3/api/file/{filepath}", svc.hand.DeleteFile) svc.rout.Get("/v3/api/files/{filepath}", svc.hand.ListFiles) - svc.rout.Get("/v2/", svc.hand.GetVersion) + // https://github.com/opencontainers/distribution-spec/blob/main/spec.md + // + // Pulling manifests + // + // To pull a manifest, perform a GET request to a URL in the following form: + // /v2//manifests/ end-3 + // + // refers to the namespace of the repository. + // MUST be either (a) the digest of the manifest or (b) a tag. + // The MUST NOT be in any other format. + // + // Throughout this document, MUST match the following regular expression: + // + // [a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)* + // + // Throughout this document, as a tag MUST be at most 128 characters + // in length and MUST match the following regular expression: + // + // [a-zA-Z0-9_][a-zA-Z0-9._-]{0,127} + + // Pushing Manifests + // + // To push a manifest, perform a PUT request to a path in the following format, + // and with the following headers and body: /v2//manifests/ + + const reference = `{reference:[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}}` + svc.rout.Head(`/v2/{name}/manifests/{reference}`, svc.hand.ManifestExists) + + svc.rout.Put(`/v2/{name}/manifests/{reference}`, svc.hand.PutManifest) + + // Pulling blobs + // + // To pull a blob, perform a GET request to a URL in the following form: + // /v2//blobs/ + // + // is the namespace of the repository, and is the blob's digest. + // + // A GET request to an existing blob URL MUST provide the expected blob, + // with a response code that MUST be 200 OK. A successful response SHOULD contain + // the digest of the uploaded blob in the header Docker-Content-Digest. // + // If present, the value of this header MUST be a digest matching that of the + // response body. Most clients MAY ignore the value, but if it is used, + // the client MUST verify the value matches the returned response body. + // Clients SHOULD verify that the response body matches the requested digest. + // + // If the blob is not found in the repository, the response code MUST be 404 Not Found. + + svc.rout.Head(`/v2/{name}/blobs/{digest}`, svc.hand.BlobExists) + + // Single POST + // + // Registries MAY support pushing blobs using a single POST request. + // + // To push a blob monolithically by using a single POST request, perform a POST request + // to a URL in the following form, and with the following headers and body: + // + // /v2//blobs/uploads/?digest= + // + // Content-Length: + // Content-Type: application/octet-stream + // + // + // + // Here, is the repository's namespace, is the blob's digest, + // and is the size (in bytes) of the blob. + // + // The Content-Length header MUST match the blob's actual content length. + // Likewise, the MUST match the blob's digest. + // + // Registries that do not support single request monolithic uploads SHOULD + // return a 202 Accepted status code and Location header and clients SHOULD + // proceed with a subsequent PUT request, as described by the POST then PUT upload method. + // + // Successful completion of the request MUST return a 201 Created and MUST include the following header: + // + // Location: + // + // Here, is a pullable blob URL. This location does not necessarily + // have to be served by your registry, for example, in the case of a signed URL from + // some cloud storage provider that your registry generates. + + svc.rout.Post(`/v2/{name}/blobs/uploads/`, svc.hand.PostUpload) + + // Pushing a blob in chunks + // + // A chunked blob upload is accomplished in three phases: + // + // - Obtain a session ID (upload URL) (POST) + // - Upload the chunks (PATCH) + // - Close the session (PUT) + // + // For information on obtaining a session ID, reference the above section on pushing + // a blob monolithically via the POST/PUT method. The process remains unchanged for + // chunked upload, except that the post request MUST include the following header: + // + // Content-Length: 0 + // + // If the registry has a minimum chunk size, the POST response SHOULD include + // the following header, where is the size in bytes + // (see the blob PATCH definition for usage): + // + // OCI-Chunk-Min-Length: + // + // Please reference the above section for restrictions on the . + // + // To upload a chunk, issue a PATCH request to a URL path in the following format, + // and with the following headers and body: + // + // URL path: + // + // Content-Type: application/octet-stream + // Content-Range: + // Content-Length: + // + // + // + // The refers to the URL obtained from the preceding POST request. + + svc.rout.Patch(`/v2/{name}/blobs/uploads/{reference}`, svc.hand.PatchUpload) + svc.rout.NotFound(svc.hand.NotFound) selector := svc.rout.Selector() diff --git a/app/storage/storage.go b/app/storage/storage.go index 1f33c9f..6240554 100644 --- a/app/storage/storage.go +++ b/app/storage/storage.go @@ -25,8 +25,10 @@ func NewStorage(basepath string) *Storage { return res } -const filesubdir = "files" -const tmpsubdir = "tmps" +const ( + filesubdir = "files" + tmpsubdir = "tmps" +) func (store *Storage) makeCollecionpath(collection string) string { return filepath.Join(store.basepath, filesubdir, collection) @@ -135,3 +137,41 @@ func (store *Storage) DeleteFile(collection, filename string) error { } return err } + +const ( + upsubdir = "uploads" +) + +func (store *Storage) makeUppath(upname string) string { + return filepath.Join(store.basepath, upsubdir, upname) +} + +func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error) { + var err error + var recsize int64 + + uploadPath := store.makeUppath(digest) + uploadFile, err := os.OpenFile(uploadPath, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return recsize, err + } + defer uploadFile.Close() + + hasher := sha256.New() // TODO: upload cheking + streamWriter := io.MultiWriter(uploadFile, hasher) + recsize, err = io.Copy(streamWriter, source) + if err != nil { + return recsize, err + } + return recsize, err +} + +func (st *Storage) RemoveUpload(digest string) error { + var err error + uploadPath := st.makeUppath(digest) + err = os.Remove(uploadPath) + if err != nil { + return err + } + return err +} diff --git a/pkg/client/taraux.go b/pkg/auxutar/utar.go similarity index 72% rename from pkg/client/taraux.go rename to pkg/auxutar/utar.go index ebaca39..1aceef9 100644 --- a/pkg/client/taraux.go +++ b/pkg/auxutar/utar.go @@ -1,4 +1,4 @@ -package client +package auxutar import ( "archive/tar" @@ -8,16 +8,18 @@ import ( "strings" ) -func archiveDir(srcDir, dstPath string) error { - var err error - srcDir = filepath.Clean(srcDir) - dstPath = filepath.Clean(dstPath) +// TODO: file and dir modes - err = os.MkdirAll(filepath.Dir(dstPath), 0755) +func Archive(srcdir, dstpath string) error { + var err error + srcdir = filepath.Clean(srcdir) + dstpath = filepath.Clean(dstpath) + + err = os.MkdirAll(filepath.Dir(dstpath), 0755) if err != nil { return err } - tarFile, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) + tarFile, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) if err != nil { return err } @@ -26,7 +28,7 @@ func archiveDir(srcDir, dstPath string) error { tarWriter := tar.NewWriter(tarFile) defer tarWriter.Close() - walker := func(filePath string, fileInfo os.FileInfo, err error) error { + walker := func(filename string, fileInfo os.FileInfo, err error) error { if err != nil { return err } @@ -37,14 +39,14 @@ func archiveDir(srcDir, dstPath string) error { if err != nil { return err } - header.Name = strings.TrimPrefix(filePath, filepath.Clean(srcDir)) + header.Name = strings.TrimPrefix(filename, filepath.Clean(srcdir)) header.Name = strings.TrimPrefix(header.Name, string(filepath.Separator)) err = tarWriter.WriteHeader(header) if err != nil { return err } - file, err := os.Open(filePath) + file, err := os.Open(filename) if err != nil { return err } @@ -55,20 +57,20 @@ func archiveDir(srcDir, dstPath string) error { } return nil } - err = filepath.Walk(srcDir, walker) + err = filepath.Walk(srcdir, walker) if err != nil { return err } return err } -func unarchive(filePath, dstDir string) error { +func Unarchive(filename, dstdir string) error { var err error - err = os.MkdirAll(dstDir, 0755) + err = os.MkdirAll(dstdir, 0755) if err != nil { return err } - file, err := os.OpenFile(filePath, os.O_RDONLY, 0) + file, err := os.OpenFile(filename, os.O_RDONLY, 0) if err != nil { return err } @@ -85,7 +87,7 @@ func unarchive(filePath, dstDir string) error { case header == nil: continue } - target := filepath.Join(dstDir, header.Name) + target := filepath.Join(dstdir, header.Name) target = filepath.Clean(target) //fileInfo := header.FileInfo() switch header.Typeflag { diff --git a/pkg/client/imagelife_test.go b/pkg/client/imagelife_test.go index 703d827..776b2d0 100644 --- a/pkg/client/imagelife_test.go +++ b/pkg/client/imagelife_test.go @@ -44,6 +44,7 @@ func TestImageLife(t *testing.T) { } stopFunc := func() { + time.Sleep(5 * time.Second) srv.Service().Stop() svcWG.Wait() err = <-errPipe diff --git a/pkg/client/imagepull.go b/pkg/client/imagepull.go index 95f8dfa..c71f636 100644 --- a/pkg/client/imagepull.go +++ b/pkg/client/imagepull.go @@ -5,6 +5,8 @@ import ( "os" "time" + "mstore/pkg/auxutar" + "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/crane" "github.com/google/go-containerregistry/pkg/name" @@ -63,7 +65,7 @@ func (cli *Client) PullImage(ctx context.Context, filepath, imagepath string, ti if err != nil { return err } - err = archiveDir(dstdir, filepath) + err = auxutar.Archive(dstdir, filepath) if err != nil { return err } diff --git a/pkg/client/imagepush.go b/pkg/client/imagepush.go index c8c707a..177b2a5 100644 --- a/pkg/client/imagepush.go +++ b/pkg/client/imagepush.go @@ -6,6 +6,8 @@ import ( "os" "time" + "mstore/pkg/auxutar" + "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/crane" "github.com/google/go-containerregistry/pkg/name" @@ -57,20 +59,23 @@ func (cli *Client) PushImage(ctx context.Context, filepath, imagepath string, ti } dstdir := makeTmpFileName(filepath) - err = unarchive(filepath, dstdir) + err = auxutar.Unarchive(filepath, dstdir) if err != nil { os.RemoveAll(dstdir) return err } image, err := imageLoader(dstdir) if err != nil { - os.RemoveAll(dstdir) return err } err = crane.Push(image, imagepath, options...) if err != nil { return err } + err = os.RemoveAll(dstdir) + if err != nil { + return err + } return err }