Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently stat shares in OCS service #2941

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Concurrently stat received shares
  • Loading branch information
ishank011 committed Jun 9, 2022
commit 1a52165f471a2440ab25916f37a127a29b161034
2 changes: 0 additions & 2 deletions internal/grpc/interceptors/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ func NewUnary(m map[string]interface{}, unprotected []string) (grpc.UnaryServerI
log := appctx.GetLogger(ctx)

if utils.Skip(info.FullMethod, unprotected) {
log.Debug().Str("method", info.FullMethod).Msg("skipping auth")

// If a token is present, set it anyway, as we might need the user info
// to decide the storage provider.
tkn, ok := ctxpkg.ContextGetToken(ctx)
Expand Down
26 changes: 14 additions & 12 deletions internal/grpc/interceptors/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,15 @@ func NewUnary() grpc.UnaryServerInterceptor {
var event *zerolog.Event
if code != codes.OK {
event = log.Error()
} else {
event = log.Debug()
event.Str("user-agent", userAgent).
Str("from", fromAddress).
Str("uri", info.FullMethod).
Str("start", start.Format("02/Jan/2006:15:04:05 -0700")).
Str("end", end.Format("02/Jan/2006:15:04:05 -0700")).Int("time_ns", int(diff)).
Str("code", code.String()).
Msg("unary")
}

event.Str("user-agent", userAgent).
Str("from", fromAddress).
Str("uri", info.FullMethod).
Str("start", start.Format("02/Jan/2006:15:04:05 -0700")).
Str("end", end.Format("02/Jan/2006:15:04:05 -0700")).Int("time_ns", int(diff)).
Str("code", code.String()).
Msg("unary")

return res, err
}
return interceptor
Expand Down Expand Up @@ -93,8 +90,13 @@ func NewStream() grpc.StreamServerInterceptor {
var event *zerolog.Event
if code != codes.OK {
event = log.Error()
} else {
event = log.Info()
event.Str("user-agent", userAgent).
Str("from", fromAddress).
Str("uri", info.FullMethod).
Str("start", start.Format("02/Jan/2006:15:04:05 -0700")).
Str("end", end.Format("02/Jan/2006:15:04:05 -0700")).Int("time_ns", int(diff)).
Str("code", code.String()).
Msg("stream")
}

event.Str("user-agent", userAgent).
Expand Down
4 changes: 2 additions & 2 deletions internal/http/services/owncloud/ocdav/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (s *svc) newPropRaw(key, val string) *propertyXML {
// ns is the CS3 namespace that needs to be removed from the CS3 path before
// prefixing it with the baseURI
func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provider.ResourceInfo, ns string, linkshares map[string]struct{}) (*responseXML, error) {
sublog := appctx.GetLogger(ctx).With().Interface("md", md).Str("ns", ns).Logger()
sublog := appctx.GetLogger(ctx).With().Str("ns", ns).Logger()
md.Path = strings.TrimPrefix(md.Path, ns)

baseURI := ctx.Value(ctxKeyBaseURI).(string)
Expand Down Expand Up @@ -713,7 +713,7 @@ func (s *svc) mdToPropResponse(ctx context.Context, pf *propfindXML, md *provide
propstatOK.Prop = append(propstatOK.Prop, s.newProp("oc:public-link-share-owner", u.Username))
} else {
u, _ := ctxpkg.ContextGetUser(ctx)
sublog.Error().Interface("share", ls).Interface("user", u).Msg("the current user in the context should be the owner of a public link share")
sublog.Error().Interface("share", ls).Interface("user", u.GetId()).Msg("the current user in the context should be the owner of a public link share")
propstatNotFound.Prop = append(propstatNotFound.Prop, s.newProp("oc:public-link-share-owner", ""))
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (h *Handler) updateReceivedShare(w http.ResponseWriter, r *http.Request, sh

data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
logger.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
logger.Debug().Interface("share", rs.Share.GetId()).Err(err).Msg("could not CS3Share2ShareData, skipping")
}

data.State = mapState(rs.GetState())

if err := h.addFileInfo(ctx, data, info); err != nil {
logger.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
logger.Debug().Interface("received_share", rs.Share.GetId()).Err(err).Msg("could not add file info, skipping")
}
h.mapUserIds(r.Context(), client, data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -658,84 +659,107 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) {

shares := make([]*conversions.ShareData, 0, len(lrsRes.GetShares()))

// TODO(refs) filter out "invalid" shares
for _, rs := range lrsRes.GetShares() {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
continue
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
continue
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
continue
}
}
var wg sync.WaitGroup
workers := 50
input := make(chan *collaboration.ReceivedShare, len(lrsRes.GetShares()))
output := make(chan *conversions.ShareData, len(lrsRes.GetShares()))

data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share).Interface("shareData", data).Err(err).Msg("could not CS3Share2ShareData, skipping")
continue
}

data.State = mapState(rs.GetState())
for i := 0; i < workers; i++ {
wg.Add(1)
go func(ctx context.Context, client gateway.GatewayAPIClient, input chan *collaboration.ReceivedShare, output chan *conversions.ShareData, wg *sync.WaitGroup) {
defer wg.Done()

if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs).Interface("info", info).Interface("shareData", data).Err(err).Msg("could not add file info, skipping")
continue
}
h.mapUserIds(r.Context(), client, data)
for rs := range input {
if stateFilter != ocsStateUnknown && rs.GetState() != stateFilter {
return
}
var info *provider.ResourceInfo
if pinfo != nil {
// check if the shared resource matches the path resource
if !utils.ResourceIDEqual(rs.Share.ResourceId, pinfo.Id) {
// try next share
return
}
// we can reuse the stat info
info = pinfo
} else {
var status *rpc.Status
info, status, err = h.getResourceInfoByID(ctx, client, rs.Share.ResourceId)
if err != nil || status.Code != rpc.Code_CODE_OK {
h.logProblems(status, err, "could not stat, skipping")
return
}
}

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible
data, err := conversions.CS3Share2ShareData(r.Context(), rs.Share)
if err != nil {
log.Debug().Interface("share", rs.Share.GetId()).Err(err).Msg("CS3Share2ShareData call failed, skipping")
return
}

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?
data.State = mapState(rs.GetState())

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet
if err := h.addFileInfo(ctx, data, info); err != nil {
log.Debug().Interface("received_share", rs.Share.GetId()).Err(err).Msg("could not add file info, skipping")
return
}
h.mapUserIds(r.Context(), client, data)

if data.State == ocsStateAccepted {
// only accepted shares can be accessed when jailing users into their home.
// in this case we cannot stat shared resources that are outside the users home (/home),
// the path (/users/u-u-i-d/foo) will not be accessible

// in a global namespace we can access the share using the full path
// in a jailed namespace we have to point to the mount point in the users /Shares jail
// - needed for oc10 hot migration
// or use the /dav/spaces/<space id> endpoint?

// list /Shares and match fileids with list of received shares
// - only works for a /Shares folder jail
// - does not work for freely mountable shares as in oc10 because we would need to iterate over the whole tree, there is no listing of mountpoints, yet

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration

// best we can do for now is stat the /Shares jail if it is set and return those paths

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home

if h.sharePrefix != "/" {
// if we have share jail infos use them to build the path
if sji := findMatch(shareJailInfos, rs.Share.ResourceId); sji != nil {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, path.Base(sji.Path))
data.Path = path.Join(h.sharePrefix, path.Base(sji.Path))
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
}

// can we return the mountpoint when the gateway resolves the listing of shares?
// - no, the gateway only sees the same list any has the same options as the ocs service
// - we would need to have a list of mountpoints for the shares -> owncloudstorageprovider for hot migration migration
output <- data

// best we can do for now is stat the /Shares jail if it is set and return those paths
}
}(ctx, client, input, output, &wg)
}

// if we are in a jail and the current share has been accepted use the stat from the share jail
// Needed because received shares can be jailed in a folder in the users home
for _, share := range lrsRes.GetShares() {
input <- share
}

if h.sharePrefix != "/" {
// if we have share jail infos use them to build the path
if sji := findMatch(shareJailInfos, rs.Share.ResourceId); sji != nil {
// override path with info from share jail
data.FileTarget = path.Join(h.sharePrefix, path.Base(sji.Path))
data.Path = path.Join(h.sharePrefix, path.Base(sji.Path))
} else {
data.FileTarget = path.Join(h.sharePrefix, path.Base(info.Path))
data.Path = path.Join(h.sharePrefix, path.Base(info.Path))
}
} else {
data.FileTarget = info.Path
data.Path = info.Path
}
}
close(input)
wg.Wait()
close(output)

shares = append(shares, data)
log.Debug().Msgf("share: %+v", *data)
for s := range output {
shares = append(shares, s)
}

response.WriteOCSSuccess(w, r, shares)
Expand Down