working commit

This commit is contained in:
2026-02-04 12:32:54 +02:00
parent 24b9acd678
commit 219a4cb890
17 changed files with 736 additions and 43 deletions
+2
View File
@@ -9,4 +9,6 @@ type Blob struct {
Size int64 `db:"size" json:"size"` Size int64 `db:"size" json:"size"`
CreatedAt string `db:"created_at" json:"createdAt"` CreatedAt string `db:"created_at" json:"createdAt"`
UpdatedAt string `db:"updated_at" json:"updatedAt"` UpdatedAt string `db:"updated_at" json:"updatedAt"`
CreatedBy string `db:"created_by" json:"createdBy,omitempty"`
UpdatedBy string `db:"updated_by" json:"updatedBy,omitempty"`
} }
+2
View File
@@ -9,4 +9,6 @@ type Manifest struct {
Digest string `db:"digest" json:"digest"` Digest string `db:"digest" json:"digest"`
CreatedAt string `db:"created_at" json:"createdAt"` CreatedAt string `db:"created_at" json:"createdAt"`
UpdatedAt string `db:"updated_at" json:"updatedAt"` UpdatedAt string `db:"updated_at" json:"updatedAt"`
CreatedBy string `db:"created_by" json:"createdBy,omitempty"`
UpdatedBy string `db:"updated_by" json:"updatedBy,omitempty"`
} }
+82
View File
@@ -0,0 +1,82 @@
package handler
import (
"mstore/app/operator"
"mstore/app/router"
)
// HEAD /v2/<name>/blobs/<digest> 200 404
func (hand *Handler) BlobExists(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest, _ := rctx.GetSubpath("digest")
hand.logg.Debugf("Handle BlobExists with name=[%s] digest=[%s]", name, digest)
params := &operator.BlobExistsParams{
Name: name,
Digest: digest,
}
ctx := rctx.GetContext()
res, code, err := hand.oper.BlobExists(ctx, params)
if err != nil {
hand.logg.Errorf("BlobExist error: %v", err)
} else if res.Exists {
rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest)
rctx.SetHeader("Content-Length", res.ContentLength)
}
rctx.SetStatus(code)
}
// POST /v2/<name>/blobs/uploads/ 202 404
func (hand *Handler) PostUpload(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest := rctx.GetQuery("digest")
mount := rctx.GetQuery("mount")
from := rctx.GetQuery("from")
hand.logg.Debugf("Handle PostUpload with name=[%s] digest=[%s]", name, digest)
params := &operator.PostUploadParams{
Name: name,
Digest: digest,
Mount: mount,
From: from,
}
res, code, err := hand.oper.PostUpload(rctx.Ctx, params)
if err != nil {
hand.logg.Errorf("PostUpload error: %v", err)
} else {
hand.logg.Debugf("PostUpload send location=[%s] code=%d", res.Location, code)
rctx.SetHeader("Location", res.Location)
rctx.SetHeader("Content-Length", res.ContentLength)
rctx.SetHeader("Docker-Upload-UUID", res.DockerUploadUUID)
}
rctx.SetStatus(code)
}
// POST /v2/<name>/blobs/uploads/?digest=<digest> 201/202 404/400
// POST /v2/<name>/blobs/uploads/?mount=<digest>&from=<other_name> 201 404
// PATCH /v2/<name>/blobs/uploads/<reference> 202 404/416
func (hand *Handler) PatchUpload(rctx *router.Context) {
contentLength := rctx.GetHeader("Content-Length")
contentType := rctx.GetHeader("Content-Type")
name, _ := rctx.GetSubpath("name")
reference, _ := rctx.GetSubpath("reference")
reader := rctx.Request.Body
params := &operator.PatchUploadParams{
ContentLength: contentLength,
ContentType: contentType,
Name: name,
Reference: reference,
Reader: reader,
}
ctx := rctx.GetContext()
res, code, err := hand.oper.PatchUpload(ctx, params)
if err != nil {
hand.logg.Errorf("PatchUpload error: %v", err)
}
rctx.SetHeader("Location", res.Location)
rctx.SetStatus(code)
}
+4 -4
View File
@@ -24,10 +24,10 @@ func (hand *Handler) FileExists(rctx *router.Context) {
return return
} }
// TODO // TODO
rctx.SetHeader("X-Content-Type", res.ContentType) rctx.SetHeader("Content-Type", res.ContentType)
rctx.SetHeader("X-Content-Length", res.ContentLength) rctx.SetHeader("Content-Length", res.ContentLength)
rctx.SetHeader("X-Content-Digest", res.ContentDigest) rctx.SetHeader("Content-Digest", res.ContentDigest)
rctx.SetHeader("Content-Length", zeroContentLength) //rctx.SetHeader("Content-Length", zeroContentLength)
rctx.SetStatus(code) rctx.SetStatus(code)
} }
+107
View File
@@ -0,0 +1,107 @@
package handler
import (
//"mstore/app/descr"
"mstore/app/operator"
"mstore/app/router"
)
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md
//
// Open Container Initiative Distribution Specification
//
// Existing Manifests
//
// The image manifest can be checked for existence with the following url:
//
// HEAD /v2/<name>/manifests/<reference>
//
// The name and reference parameter identify the image and are required. The reference may include a tag or digest.
//
// A 404 Not Found response will be returned if the image is unknown to the registry.
// If the image exists and the response is successful the response will be as follows:
//
// 200 OK
// Content-Length: <length of manifest>
// Docker-Content-Digest: <digest>
func (hand *Handler) ManifestExists(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
reference, _ := rctx.GetSubpath("reference")
params := &operator.ManifestExistsParams{
Name: name,
Reference: reference,
}
ctx := rctx.GetContext()
res, code, err := hand.oper.ManifestExists(ctx, params)
if err != nil {
hand.logg.Errorf("ManifestExist error: %v", err)
} else if res.Exists {
rctx.SetHeader("Content-Length", res.ContentLength)
rctx.SetHeader("Content-Type", res.ContentType)
rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest)
}
rctx.SetStatus(code)
}
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md
//
// Pushing Manifests
//
// To push a manifest, perform a PUT request to a path in the following format,
// and with the following headers and body: /v2/<name>/manifests/<reference>
//
// Clients SHOULD set the Content-Type header to the type of the manifest being pushed.
// The client SHOULD NOT include parameters on the Content-Type header (see RFC7231).
// The registry SHOULD ignore parameters on the Content-Type header.
//
// All manifests SHOULD include a mediaType field declaring the type of the manifest being pushed.
// If a manifest includes a mediaType field, clients MUST set the Content-Type header to
// the value specified by the mediaType field.
//
// Content-Type: application/vnd.oci.image.manifest.v1+json
//
// <name> is the namespace of the repository, and the <reference> MUST be either a) a digest or b) a tag.
//
// The uploaded manifest MUST reference any blobs that make up the object.
// However, the list of blobs MAY be empty.
//
// The registry MUST store the manifest in the exact byte representation provided by the client.
// Upon a successful upload, the registry MUST return response code 201 Created,
// and MUST have the following header:
//
// Location: <location>
//
// The <location> is a pullable manifest URL. The Docker-Content-Digest header returns
// the canonical digest of the uploaded blob, and MUST be equal to the client provided digest.
// Clients MAY ignore the value but if it is used, the client SHOULD verify
// the value against the uploaded blob data.
//
// An attempt to pull a nonexistent repository MUST return response code 404 Not Found.
//
// A registry SHOULD enforce some limit on the maximum manifest size that it can accept.
// A registry that enforces this limit SHOULD respond to a request to push a manifest over this
// limit with a response code 413 Payload Too Large.
// Client and registry implementations SHOULD expect to be able to support manifest
// pushes of at least 4 megabytes.
// PUT /v2/<name>/manifests/<reference> 201 404
func (hand *Handler) PutManifest(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
reference, _ := rctx.GetSubpath("reference")
contentType := rctx.GetHeader("Content-Type")
params := &operator.PutManifestParams{
ContentType: contentType,
Name: name,
Reference: reference,
Reader: rctx.Request.Body,
}
ctx := rctx.GetContext()
res, code, err := hand.oper.PutManifest(ctx, params)
if err != nil {
hand.logg.Errorf("PutManifest error: %v", err)
} else {
rctx.SetHeader("Location", res.Location)
}
rctx.SetStatus(code)
}
+18 -18
View File
@@ -20,6 +20,23 @@ func (db *Database) InsertBlob(ctx context.Context, layer *descr.Blob) error {
return err return err
} }
func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, descr.Blob, error) {
var err error
blobs := make([]descr.Blob, 0)
res := descr.Blob{}
exists := false
request := `SELECT * FROM blobs WHERE digest = $1 LIMIT 1`
err = db.db.Select(&blobs, request, digest)
if err != nil {
return exists, res, err
}
if len(blobs) > 0 {
res = blobs[0]
exists = true
}
return exists, res, err
}
func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) { func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) {
var err error var err error
blobs := make([]descr.Blob, 0) blobs := make([]descr.Blob, 0)
@@ -31,7 +48,7 @@ func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) {
return blobs, err return blobs, err
} }
func (db *Database) BlobExists(ctx context.Context, name, reference, digest string) (bool, error) { func (db *Database) xxxBlobExists(ctx context.Context, name, reference, digest string) (bool, error) {
var err error var err error
blobs := make([]descr.Blob, 0) blobs := make([]descr.Blob, 0)
request := ` request := `
@@ -58,23 +75,6 @@ func (db *Database) GetBlobsByReferense(ctx context.Context, name, reference str
return blobs, err return blobs, err
} }
func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, descr.Blob, error) {
var err error
blobs := make([]descr.Blob, 0)
res := descr.Blob{}
exists := false
request := `SELECT * FROM blobs WHERE digest = $1 LIMIT 1`
err = db.db.Select(&blobs, request, digest)
if err != nil {
return exists, res, err
}
if len(blobs) > 0 {
res = blobs[0]
exists = true
}
return exists, res, err
}
func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, error) { func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, error) {
var err error var err error
var usage int64 var usage int64
+32
View File
@@ -15,4 +15,36 @@ const schema = `
); );
CREATE UNIQUE INDEX IF NOT EXISTS file_index CREATE UNIQUE INDEX IF NOT EXISTS file_index
ON file(collection, name); ON file(collection, name);
--- DROP TABLE IF EXISTS manifests;
CREATE TABLE IF NOT EXISTS manifests (
id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
reference VARCHAR(255) NOT NULL,
digest VARCHAR(1024) NOT NULL,
contentType VARCHAR(255) NOT NULL,
payload VARCHAR(4096) NOT NULL,
created_at VARCHAR(255) NOT NULL,
updated_at VARCHAR(255) NOT NULL,
created_by VARCHAR(255) NOT NULL,
updated_by VARCHAR(255) NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS manifest_index
ON manifests(name, reference);
CREATE TABLE IF NOT EXISTS blobs (
id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
reference VARCHAR(255) NOT NULL,
mediaType VARCHAR(255) NOT NULL,
digest VARCHAR(255) NOT NULL,
size INTEGER NOT NULL,
created_at VARCHAR(255) NOT NULL,
updated_at VARCHAR(255) NOT NULL,
created_by VARCHAR(255) NOT NULL,
updated_by VARCHAR(255) NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS blobs_index
ON blobs(name, reference, digest);
` `
+153
View File
@@ -0,0 +1,153 @@
package operator
import (
"context"
"fmt"
"io"
"net/http"
"strconv"
"mstore/pkg/auxuuid"
)
type BlobExistsParams struct {
Name string
Digest string
}
type BlobExistsResult struct {
DockerContentDigest string
ContentLength string
Exists bool
}
func (oper *Operator) BlobExists(ctx context.Context, params *BlobExistsParams) (*BlobExistsResult, int, error) {
var err error
res := &BlobExistsResult{}
oper.logg.Debugf("Call BlobExists")
if params.Digest == "" {
err = fmt.Errorf("Empty reference")
return res, http.StatusBadRequest, err
}
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
exists, blobDescr, err := oper.mdb.GetBlobByDigest(ctx, params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
if !exists {
return res, http.StatusNotFound, err
}
res.ContentLength = strconv.FormatInt(blobDescr.Size, 10)
res.DockerContentDigest = params.Digest
res.Exists = exists
return res, http.StatusOK, err
}
// https://github.com/opencontainers/distribution-spec/blob/v1.1.1/spec.md
//
// PostUpload
//
// POST then PUT
//
// To push a blob monolithically by using a POST request followed by a PUT request, there are two steps:
//
// - Obtain a session id (upload URL)
// - Upload the blob to said URL
//
// A chunked blob upload is accomplished in three phases:
//
// - Obtain a session ID (upload URL) (POST)
// - Upload the chunks (PATCH)
// Close the session (PUT)
type PostUploadParams struct {
Name string
Digest string
Mount string
From string
}
type PostUploadResult struct {
DockerUploadUUID string
Location string
ContentLength string
}
func (oper *Operator) PostUpload(ctx context.Context, params *PostUploadParams) (*PostUploadResult, int, error) {
var err error
res := &PostUploadResult{}
oper.logg.Debugf("PostUpload")
if params.Digest == "" {
uuid := auxuuid.NewUUID()
location := fmt.Sprintf("/v2/%s/blobs/uploads/%s", params.Name, uuid)
res.DockerUploadUUID = uuid
res.Location = location
res.ContentLength = strconv.FormatInt(0, 10)
return res, http.StatusAccepted, err
} else {
err = fmt.Errorf("PostUpload: Not empty digest header")
return res, http.StatusInternalServerError, err
}
return res, http.StatusOK, err
}
type PatchUploadParams struct {
ContentType string
ContentLength string
Name string
Reference string
Reader io.Reader
}
type PatchUploadResult struct {
Location string
}
// The response for each successful chunk upload MUST be 202 Accepted, and MUST have the following headers:
//
// Location: <location>
// Range: 0-<end-of-range>
func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams) (*PatchUploadResult, int, error) {
var err error
res := &PatchUploadResult{}
oper.logg.Debugf("Call PatchUpload")
if params.Reference == "" {
err = fmt.Errorf("Empty reference")
return res, http.StatusBadRequest, err
}
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
if params.ContentType != "application/octet-stream" {
err = fmt.Errorf("Wrong Conten-Type header: %s", params.ContentType)
return res, http.StatusBadRequest, err
}
if params.ContentLength == "" {
err = fmt.Errorf("Empty Content-length header")
return res, http.StatusBadRequest, err
}
contentLength, err := strconv.ParseInt(params.ContentLength, 10, 64)
if err != nil {
err = fmt.Errorf("Wrong Content-length header")
return res, http.StatusBadRequest, err
}
recsize, err := oper.store.WriteUpload(params.Reference, params.Reader)
if err != nil {
return res, http.StatusInternalServerError, err
}
if recsize != contentLength {
oper.store.RemoveUpload(params.Reference)
err = fmt.Errorf("Mismatch upload recorded size and content length")
return res, http.StatusInternalServerError, err
}
res.Location = fmt.Sprintf("/v2/%s/uploads/%s", params.Name, params.Reference)
return res, http.StatusAccepted, err // http.StatusCreated
}
+22
View File
@@ -0,0 +1,22 @@
package operator
import (
"encoding/hex"
"strings"
)
const sha256prefix = "sha256:"
func stringLikeSHA256Digest(some string) bool {
if strings.HasPrefix(some, sha256prefix) {
some = strings.TrimPrefix(some, sha256prefix)
}
_, err := hex.DecodeString(some)
if err != nil {
return false
}
if len(some) == 64 {
return true
}
return false
}
+115
View File
@@ -0,0 +1,115 @@
package operator
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strconv"
"mstore/app/descr"
ocidigest "github.com/opencontainers/go-digest"
)
const (
ddmMimeType = "application/vnd.docker.distribution.manifest.v2+json"
oimMimeType = "application/vnd.oci.image.manifest.v1+json"
)
type ManifestExistsParams struct {
Name string
Reference string
}
type ManifestExistsResult struct {
ContentLength string
ContentType string
DockerContentDigest string
Exists bool
}
func (oper *Operator) ManifestExists(ctx context.Context, params *ManifestExistsParams) (*ManifestExistsResult, int, error) {
var err error
res := &ManifestExistsResult{}
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
if params.Reference == "" {
err = fmt.Errorf("Empty reference")
return res, http.StatusBadRequest, err
}
oper.logg.Debugf("Head manifest [%s:%s]", params.Name, params.Reference)
var manifest descr.Manifest
var exists bool
if stringLikeSHA256Digest(params.Reference) {
digest := fmt.Sprintf("%s:%s", sha256prefix, params.Reference)
oper.logg.Debugf("Find manifest %s by digest %s", params.Name, params.Reference)
exists, manifest, err = oper.mdb.GetManifestByDigest(ctx, params.Name, digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
if !exists {
return res, http.StatusNotFound, err
}
} else {
oper.logg.Debugf("Find manifest %s by reference %s", params.Name, params.Reference)
exists, manifest, err = oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference)
if err != nil {
return res, http.StatusInternalServerError, err
}
if !exists {
return res, http.StatusNotFound, err
}
}
digest := ocidigest.SHA256.FromString(manifest.Payload)
payloadSize := len(manifest.Payload)
res.ContentLength = strconv.FormatInt(int64(payloadSize), 10)
res.ContentType = manifest.ContentType
res.DockerContentDigest = digest.String()
res.Exists = exists
return res, http.StatusOK, err
}
type PutManifestParams struct {
ContentType string
Name string
Reference string
Reader io.Reader
}
type PutManifestResult struct {
Location string
}
// TODO: control size 413 Payload Too Large
func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams) (*PutManifestResult, int, error) {
var err error
res := &PutManifestResult{}
oper.logg.Debugf("Put manifest %s:%s", params.Name, params.Reference)
// Check Content-Type
if params.ContentType != ddmMimeType && params.ContentType != oimMimeType {
err = fmt.Errorf("Unknown or empty Content-Type: %s", params.ContentType)
return res, http.StatusNotFound, err
}
// Copy manifest data
buffer := bytes.NewBuffer(nil)
_, err = io.Copy(buffer, params.Reader)
if err != nil {
return res, http.StatusInternalServerError, err
}
inBytes := buffer.Bytes()
oper.logg.Debugf("Manifest data: [%s]", string(inBytes))
res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference)
return res, http.StatusCreated, err
}
+9
View File
@@ -25,6 +25,15 @@ func NewContext(writer http.ResponseWriter, request *http.Request) *Context {
return rctx return rctx
} }
func (rctx *Context) GetSubpath(key string) (string, bool) {
value, exists := rctx.PathMap[key]
return value, exists
}
func (rctx *Context) GetQuery(key string) string {
return rctx.Request.URL.Query().Get(key)
}
func (rctx *Context) SetHeader(key, value string) { func (rctx *Context) SetHeader(key, value string) {
rctx.Writer.Header().Set(key, value) rctx.Writer.Header().Set(key, value)
} }
+120 -1
View File
@@ -67,9 +67,128 @@ func (svc *Service) Build() error {
svc.rout.Get("/v3/api/file/{filepath}", svc.hand.GetFile) svc.rout.Get("/v3/api/file/{filepath}", svc.hand.GetFile)
svc.rout.Delete("/v3/api/file/{filepath}", svc.hand.DeleteFile) svc.rout.Delete("/v3/api/file/{filepath}", svc.hand.DeleteFile)
svc.rout.Get("/v3/api/files/{filepath}", svc.hand.ListFiles) svc.rout.Get("/v3/api/files/{filepath}", svc.hand.ListFiles)
svc.rout.Get("/v2/", svc.hand.GetVersion) svc.rout.Get("/v2/", svc.hand.GetVersion)
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md
//
// Pulling manifests
//
// To pull a manifest, perform a GET request to a URL in the following form:
// /v2/<name>/manifests/<reference> end-3
//
// <name> refers to the namespace of the repository.
// <reference> MUST be either (a) the digest of the manifest or (b) a tag.
// The <reference> MUST NOT be in any other format.
//
// Throughout this document, <name> MUST match the following regular expression:
//
// [a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*(\/[a-z0-9]+((\.|_|__|-+)[a-z0-9]+)*)*
//
// Throughout this document, <reference> as a tag MUST be at most 128 characters
// in length and MUST match the following regular expression:
//
// [a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}
// Pushing Manifests
//
// To push a manifest, perform a PUT request to a path in the following format,
// and with the following headers and body: /v2/<name>/manifests/<reference>
const reference = `{reference:[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}}`
svc.rout.Head(`/v2/{name}/manifests/{reference}`, svc.hand.ManifestExists)
svc.rout.Put(`/v2/{name}/manifests/{reference}`, svc.hand.PutManifest)
// Pulling blobs
//
// To pull a blob, perform a GET request to a URL in the following form:
// /v2/<name>/blobs/<digest>
//
// <name> is the namespace of the repository, and <digest> is the blob's digest.
//
// A GET request to an existing blob URL MUST provide the expected blob,
// with a response code that MUST be 200 OK. A successful response SHOULD contain
// the digest of the uploaded blob in the header Docker-Content-Digest. //
// If present, the value of this header MUST be a digest matching that of the
// response body. Most clients MAY ignore the value, but if it is used,
// the client MUST verify the value matches the returned response body.
// Clients SHOULD verify that the response body matches the requested digest.
//
// If the blob is not found in the repository, the response code MUST be 404 Not Found.
svc.rout.Head(`/v2/{name}/blobs/{digest}`, svc.hand.BlobExists)
// Single POST
//
// Registries MAY support pushing blobs using a single POST request.
//
// To push a blob monolithically by using a single POST request, perform a POST request
// to a URL in the following form, and with the following headers and body:
//
// /v2/<name>/blobs/uploads/?digest=<digest>
//
// Content-Length: <length>
// Content-Type: application/octet-stream
//
// <upload byte stream>
//
// Here, <name> is the repository's namespace, <digest> is the blob's digest,
// and <length> is the size (in bytes) of the blob.
//
// The Content-Length header MUST match the blob's actual content length.
// Likewise, the <digest> MUST match the blob's digest.
//
// Registries that do not support single request monolithic uploads SHOULD
// return a 202 Accepted status code and Location header and clients SHOULD
// proceed with a subsequent PUT request, as described by the POST then PUT upload method.
//
// Successful completion of the request MUST return a 201 Created and MUST include the following header:
//
// Location: <blob-location>
//
// Here, <blob-location> is a pullable blob URL. This location does not necessarily
// have to be served by your registry, for example, in the case of a signed URL from
// some cloud storage provider that your registry generates.
svc.rout.Post(`/v2/{name}/blobs/uploads/`, svc.hand.PostUpload)
// Pushing a blob in chunks
//
// A chunked blob upload is accomplished in three phases:
//
// - Obtain a session ID (upload URL) (POST)
// - Upload the chunks (PATCH)
// - Close the session (PUT)
//
// For information on obtaining a session ID, reference the above section on pushing
// a blob monolithically via the POST/PUT method. The process remains unchanged for
// chunked upload, except that the post request MUST include the following header:
//
// Content-Length: 0
//
// If the registry has a minimum chunk size, the POST response SHOULD include
// the following header, where <size> is the size in bytes
// (see the blob PATCH definition for usage):
//
// OCI-Chunk-Min-Length: <size>
//
// Please reference the above section for restrictions on the <location>.
//
// To upload a chunk, issue a PATCH request to a URL path in the following format,
// and with the following headers and body:
//
// URL path: <location>
//
// Content-Type: application/octet-stream
// Content-Range: <range>
// Content-Length: <length>
//
// <upload byte stream of chunk>
//
// The <location> refers to the URL obtained from the preceding POST request.
svc.rout.Patch(`/v2/{name}/blobs/uploads/{reference}`, svc.hand.PatchUpload)
svc.rout.NotFound(svc.hand.NotFound) svc.rout.NotFound(svc.hand.NotFound)
selector := svc.rout.Selector() selector := svc.rout.Selector()
+42 -2
View File
@@ -25,8 +25,10 @@ func NewStorage(basepath string) *Storage {
return res return res
} }
const filesubdir = "files" const (
const tmpsubdir = "tmps" filesubdir = "files"
tmpsubdir = "tmps"
)
func (store *Storage) makeCollecionpath(collection string) string { func (store *Storage) makeCollecionpath(collection string) string {
return filepath.Join(store.basepath, filesubdir, collection) return filepath.Join(store.basepath, filesubdir, collection)
@@ -135,3 +137,41 @@ func (store *Storage) DeleteFile(collection, filename string) error {
} }
return err return err
} }
const (
upsubdir = "uploads"
)
func (store *Storage) makeUppath(upname string) string {
return filepath.Join(store.basepath, upsubdir, upname)
}
func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error) {
var err error
var recsize int64
uploadPath := store.makeUppath(digest)
uploadFile, err := os.OpenFile(uploadPath, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return recsize, err
}
defer uploadFile.Close()
hasher := sha256.New() // TODO: upload cheking
streamWriter := io.MultiWriter(uploadFile, hasher)
recsize, err = io.Copy(streamWriter, source)
if err != nil {
return recsize, err
}
return recsize, err
}
func (st *Storage) RemoveUpload(digest string) error {
var err error
uploadPath := st.makeUppath(digest)
err = os.Remove(uploadPath)
if err != nil {
return err
}
return err
}
+17 -15
View File
@@ -1,4 +1,4 @@
package client package auxutar
import ( import (
"archive/tar" "archive/tar"
@@ -8,16 +8,18 @@ import (
"strings" "strings"
) )
func archiveDir(srcDir, dstPath string) error { // TODO: file and dir modes
var err error
srcDir = filepath.Clean(srcDir)
dstPath = filepath.Clean(dstPath)
err = os.MkdirAll(filepath.Dir(dstPath), 0755) func Archive(srcdir, dstpath string) error {
var err error
srcdir = filepath.Clean(srcdir)
dstpath = filepath.Clean(dstpath)
err = os.MkdirAll(filepath.Dir(dstpath), 0755)
if err != nil { if err != nil {
return err return err
} }
tarFile, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) tarFile, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640)
if err != nil { if err != nil {
return err return err
} }
@@ -26,7 +28,7 @@ func archiveDir(srcDir, dstPath string) error {
tarWriter := tar.NewWriter(tarFile) tarWriter := tar.NewWriter(tarFile)
defer tarWriter.Close() defer tarWriter.Close()
walker := func(filePath string, fileInfo os.FileInfo, err error) error { walker := func(filename string, fileInfo os.FileInfo, err error) error {
if err != nil { if err != nil {
return err return err
} }
@@ -37,14 +39,14 @@ func archiveDir(srcDir, dstPath string) error {
if err != nil { if err != nil {
return err return err
} }
header.Name = strings.TrimPrefix(filePath, filepath.Clean(srcDir)) header.Name = strings.TrimPrefix(filename, filepath.Clean(srcdir))
header.Name = strings.TrimPrefix(header.Name, string(filepath.Separator)) header.Name = strings.TrimPrefix(header.Name, string(filepath.Separator))
err = tarWriter.WriteHeader(header) err = tarWriter.WriteHeader(header)
if err != nil { if err != nil {
return err return err
} }
file, err := os.Open(filePath) file, err := os.Open(filename)
if err != nil { if err != nil {
return err return err
} }
@@ -55,20 +57,20 @@ func archiveDir(srcDir, dstPath string) error {
} }
return nil return nil
} }
err = filepath.Walk(srcDir, walker) err = filepath.Walk(srcdir, walker)
if err != nil { if err != nil {
return err return err
} }
return err return err
} }
func unarchive(filePath, dstDir string) error { func Unarchive(filename, dstdir string) error {
var err error var err error
err = os.MkdirAll(dstDir, 0755) err = os.MkdirAll(dstdir, 0755)
if err != nil { if err != nil {
return err return err
} }
file, err := os.OpenFile(filePath, os.O_RDONLY, 0) file, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil { if err != nil {
return err return err
} }
@@ -85,7 +87,7 @@ func unarchive(filePath, dstDir string) error {
case header == nil: case header == nil:
continue continue
} }
target := filepath.Join(dstDir, header.Name) target := filepath.Join(dstdir, header.Name)
target = filepath.Clean(target) target = filepath.Clean(target)
//fileInfo := header.FileInfo() //fileInfo := header.FileInfo()
switch header.Typeflag { switch header.Typeflag {
+1
View File
@@ -44,6 +44,7 @@ func TestImageLife(t *testing.T) {
} }
stopFunc := func() { stopFunc := func() {
time.Sleep(5 * time.Second)
srv.Service().Stop() srv.Service().Stop()
svcWG.Wait() svcWG.Wait()
err = <-errPipe err = <-errPipe
+3 -1
View File
@@ -5,6 +5,8 @@ import (
"os" "os"
"time" "time"
"mstore/pkg/auxutar"
"github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/crane" "github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/name"
@@ -63,7 +65,7 @@ func (cli *Client) PullImage(ctx context.Context, filepath, imagepath string, ti
if err != nil { if err != nil {
return err return err
} }
err = archiveDir(dstdir, filepath) err = auxutar.Archive(dstdir, filepath)
if err != nil { if err != nil {
return err return err
} }
+7 -2
View File
@@ -6,6 +6,8 @@ import (
"os" "os"
"time" "time"
"mstore/pkg/auxutar"
"github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/crane" "github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/name"
@@ -57,20 +59,23 @@ func (cli *Client) PushImage(ctx context.Context, filepath, imagepath string, ti
} }
dstdir := makeTmpFileName(filepath) dstdir := makeTmpFileName(filepath)
err = unarchive(filepath, dstdir) err = auxutar.Unarchive(filepath, dstdir)
if err != nil { if err != nil {
os.RemoveAll(dstdir) os.RemoveAll(dstdir)
return err return err
} }
image, err := imageLoader(dstdir) image, err := imageLoader(dstdir)
if err != nil { if err != nil {
os.RemoveAll(dstdir)
return err return err
} }
err = crane.Push(image, imagepath, options...) err = crane.Push(image, imagepath, options...)
if err != nil { if err != nil {
return err return err
} }
err = os.RemoveAll(dstdir)
if err != nil {
return err
}
return err return err
} }