Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Git Remote Helper: Concurrency #29

Merged
merged 2 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions git-remote-ipfs/exe/git-remote-ipfs/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ data Error
= ParseError String
| ProcError ProcessError

instance DisplayError Error where
displayError = renderError

renderError :: Error -> Text
renderError = \case
ParseError e -> "Command failed to parse: " <> Text.pack e
Expand All @@ -58,12 +61,7 @@ main = do
.| sinkHandle stdout

case res of
Left e -> do
Text.hPutStrLn stderr $
renderError (errError e)
<> renderSourceLoc (errCallStack e)
exitFailure

Left e -> Text.hPutStrLn stderr (displayError e) *> exitFailure
Right _ -> exitSuccess
where
optInfo = info (helper <*> parseOptions) fullDesc
Expand Down
3 changes: 3 additions & 0 deletions git-remote-ipfs/git-remote-ipfs.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ library

build-depends:
aeson
, async
, attoparsec
, base >= 4.9 && < 5
, bytestring
Expand Down Expand Up @@ -106,6 +107,8 @@ executable git-remote-ipfs
, optparse-applicative
, text

ghc-options: -threaded -rtsopts "-with-rtsopts=-maxN4 -A8m"
kim marked this conversation as resolved.
Show resolved Hide resolved

test-suite e2e-tests
import: common

Expand Down
121 changes: 58 additions & 63 deletions git-remote-ipfs/src/Network/IPFS/Git/RemoteHelper.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ module Network.IPFS.Git.RemoteHelper
)
where

import Control.Concurrent.MVar (modifyMVar, newMVar, withMVar)
import Control.Exception.Safe
( MonadCatch
, MonadMask
, SomeException
, catchAny
, tryAny
Expand All @@ -21,7 +21,7 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as L
import Data.Foldable (toList, traverse_)
import Data.Foldable (toList)
import Data.IORef (atomicModifyIORef')
import Data.Maybe (catMaybes)
import Data.Text (Text)
Expand All @@ -30,6 +30,7 @@ import Data.Text.Encoding (decodeUtf8With)
import Data.Text.Encoding.Error (lenientDecode)
import qualified Data.Text.Read as Text
import Data.Traversable (for)
import GHC.Stack (HasCallStack)
import System.FilePath (joinPath, splitDirectories)

import Data.IPLD.CID (CID, cidFromText, cidToText)
Expand All @@ -51,6 +52,10 @@ data ProcessError
| IPFSError ClientError
| CidError String
| UnknownLocalRef Text
deriving Show

instance DisplayError ProcessError where
displayError = renderProcessError

renderProcessError :: ProcessError -> Text
renderProcessError = \case
Expand All @@ -59,10 +64,7 @@ renderProcessError = \case
CidError e -> fmt ("Cid conversion error: " % fstr) e
UnknownLocalRef r -> fmt ("Ref not found locally: " % ftxt) r

processCommand
:: (MonadMask m, MonadIO m)
=> Command
-> RemoteHelperT ProcessError m CommandResult
processCommand :: Command -> RemoteHelper ProcessError CommandResult
processCommand Capabilities =
pure $ CapabilitiesResult ["push", "fetch", "option"]

Expand Down Expand Up @@ -112,10 +114,11 @@ processCommand ListForPush = fmap ListForPushResult $ do

remoteRefs <- do
cids <-
fmap catMaybes . for branches $ \branch ->
ipfs $ resolvePath (cidToText root <> "/" <> branch)
forConcurrently clientMaxConns branches $ \branch ->
ipfs (resolvePath (cidToText root <> "/" <> branch))

traverse (liftEitherRH . first CidError . hexShaFromCidText) cids
for (catMaybes cids) $
liftEitherRH . first CidError . hexShaFromCidText

logDebug $ "list for-push: remoteRefs: " <> Text.pack (show remoteRefs)

Expand All @@ -138,27 +141,22 @@ processCommand (Fetch sha _) = FetchOk <$ processFetch sha

--------------------------------------------------------------------------------

processPush
:: (MonadMask m, MonadIO m)
=> Bool
-> Text
-> Text
-> RemoteHelperT ProcessError m CID
processPush :: Bool -> Text -> Text -> RemoteHelper ProcessError CID
processPush _ localRef remoteRef = do
root <- asks envIpfsRoot

localHeadRef <- do
localRefCid <- do
ref <- git $ Git.resolve (Git.Revision (Text.unpack localRef) [])
maybe (throwRH $ UnknownLocalRef localRef) (pure . refToCid) ref

remoteHeadRef <- do
remoteRefCid <- do
refCid <- ipfs $ resolvePath (cidToText root <> "/" <> remoteRef)
pure $ refCid >>= hush . cidFromText

go root remoteHeadRef localHeadRef
unless (Just localRefCid == remoteRefCid) $ go root localRefCid

-- patch link remoteRef
root' <- ipfs $ patchLink root remoteRef localHeadRef
root' <- ipfs $ patchLink root remoteRef localRefCid

-- The remote HEAD denotes the default branch to check out. If it is not
-- present, git clone will refuse to check out the worktree and exit with a
Expand All @@ -170,83 +168,80 @@ processPush _ localRef remoteRef = do
Nothing -> linkedObject root' "HEAD" "refs/heads/master"
root'' <$ ipfs (updateRemoteUrl root'')
where
go !root remoteHeadRef localHeadRef
| Just localHeadRef == remoteHeadRef = pure ()
| otherwise = do

logDebug $
fmt ("processPush: " % fcid % " " % fcid % " " % shown)
root
localHeadRef
(cidToRef @Git.SHA1 <$> remoteHeadRef)

go !root localRefCid = do
logDebug $ fmt ("processPush: " % fcid % " " % fcid) root localRefCid
obj <- do
sha <- liftEitherRH . first CidError $ cidToRef localHeadRef
sha <-
liftEitherRH . first CidError $
cidToRef @Git.SHA1 localRefCid
logDebug $ "sha " <> Text.pack (show sha)
localHeadRef' <- git $ Git.resolve (sha :: Git.Ref Git.SHA1)
case localHeadRef' of
Nothing -> throwRH $
UnknownLocalRef $ Text.pack (Git.toHexString sha)
Just r -> do
dir <- Git.gitRepoPath <$> Git.getGit
git . liftIO $ Git.looseRead dir r
dir <- Git.gitRepoPath <$> Git.getGit
git . liftIO $ Git.looseRead dir sha

let raw = Git.looseMarshall obj

logDebug $ "BlockPut: " <> decodeUtf8With lenientDecode (L.toStrict raw)
blockCid <- do
cid <- ipfs $ putBlock raw
-- check 'res' CID matches SHA
when (localHeadRef /= cid) $
when (localRefCid /= cid) $
throwRH
. CidError
$ sfmt ("CID mismatch: expected `" % fcid % "`, actual `" % fcid % "`")
localHeadRef
localRefCid
cid
pure cid

logDebug $ fmt ("blockCid: " % fcid) blockCid

-- if loose object > 2048k, create object + link block to it
objCid <-
if L.length raw > 2048000 then
linkedObject root ("objects/" <> cidToText blockCid) raw
else
pure blockCid
when (L.length raw > 2048000) $
void $ linkedObject root ("objects/" <> cidToText blockCid) raw

logDebug $ fmt ("objCid: " % fcid) objCid
-- process links
traverse_ (go root remoteHeadRef) (objectLinks obj)
forConcurrently_ clientMaxConns (objectLinks obj) $ go root

linkedObject base name raw = ipfs $ addObject raw >>= patchLink base name

processFetch
:: (MonadCatch m, MonadIO m)
=> Text
-> RemoteHelperT ProcessError m ()
processFetch :: Text -> RemoteHelper ProcessError ()
processFetch sha = do
repo <- Git.gitRepoPath <$> Git.getGit
root <- asks envIpfsRoot
cid <- liftEitherRH . first CidError $ cidFromHexShaText sha
lobjs <- ipfs $ largeObjects root -- XXX: load lobjs only once
go repo root lobjs cid
repo <- Git.getGit
root <- asks envIpfsRoot
cid <- liftEitherRH . first CidError $ cidFromHexShaText sha
lck <- liftIO $ newMVar ()
lobs <- do
env <- ask
(>>= either throwError pure)
. liftIO . modifyMVar (envLobs env) $ \case
Just ls -> pure (Just ls, Right ls)
Nothing ->
runRemoteHelper env (ipfs (largeObjects root)) >>= \case
Left e -> pure (Nothing, Left e)
Right ls -> pure (Just ls, Right ls)

go repo root lobs lck cid
where
go repo root lobjs cid = do
go !repo !root !lobs lck cid = do
ref <- liftEitherRH . first CidError $ cidToRef @Git.SHA1 cid
have <- Git.getGit >>= \g -> git . liftIO $ Git.getObject g ref True
have <-
-- Nb. mutex here as we might access the same packfile concurrently
git . liftIO . withMVar lck . const $
Git.getObject repo ref True
case have of
Just _ ->
logInfo $ fmt ("fetch: Skipping " % fref % " (" % fcid % ")") ref cid
Just _ -> logInfo $
fmt ("fetch: Skipping " % fref % " (" % fcid % ")") ref cid
Nothing -> do
raw <- do
blk <- ipfs $ provideBlock lobjs cid
blk <- ipfs $ provideBlock lobs cid
case blk of
Just b -> pure b
Nothing -> ipfs $ getBlock cid

let obj = Git.looseUnmarshall @Git.SHA1 raw
void . git . liftIO $ Git.looseWrite repo obj -- XXX: check sha matches
traverse_ (go repo root lobjs) (objectLinks obj)
void . git . liftIO $
Git.looseWrite (Git.gitRepoPath repo) obj -- XXX: check sha matches
forConcurrently_ clientMaxConns (objectLinks obj) $
go repo root lobs lck

--------------------------------------------------------------------------------

Expand All @@ -257,7 +252,7 @@ ipfs = mapError IPFSError

-- XXX: hs-git uses 'error' deliberately, should be using 'tryAnyDeep' here.
-- Requires patch to upstream to get 'NFData' instances everywhere.
git :: MonadCatch m
git :: (MonadCatch m, HasCallStack)
=> RemoteHelperT ProcessError m a
-> RemoteHelperT ProcessError m a
git f = either throwRH pure =<< fmap (first GitError) (tryAny f)
10 changes: 10 additions & 0 deletions git-remote-ipfs/src/Network/IPFS/Git/RemoteHelper/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Network.IPFS.Git.RemoteHelper.Client
, ClientError
, renderClientError

, clientMaxConns

, listPaths
, getRef
, resolvePath
Expand Down Expand Up @@ -75,6 +77,10 @@ data ClientError
| InvalidResponse Text Aeson.Value
| CidError String
| StreamingError String
deriving Show

instance DisplayError ClientError where
displayError = renderClientError

renderClientError :: ClientError -> Text
renderClientError = \case
Expand All @@ -91,6 +97,10 @@ data RefPath = RefPath

data RefPathType = RefPathRef | RefPathHead

-- FIXME(kim): We may want this to be configurable somehow
clientMaxConns :: Int
clientMaxConns = 30

listPaths
:: MonadIO m
=> Text
Expand Down
Loading