Skip to content

Commit

Permalink
Merge pull request #29 from oscoin/conc
Browse files Browse the repository at this point in the history
Git Remote Helper: Concurrency
  • Loading branch information
kim authored Jan 23, 2019
2 parents be5b230 + 42294ab commit 8a3444b
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 70 deletions.
10 changes: 4 additions & 6 deletions git-remote-ipfs/exe/git-remote-ipfs/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ data Error
= ParseError String
| ProcError ProcessError

instance DisplayError Error where
displayError = renderError

renderError :: Error -> Text
renderError = \case
ParseError e -> "Command failed to parse: " <> Text.pack e
Expand All @@ -58,12 +61,7 @@ main = do
.| sinkHandle stdout

case res of
Left e -> do
Text.hPutStrLn stderr $
renderError (errError e)
<> renderSourceLoc (errCallStack e)
exitFailure

Left e -> Text.hPutStrLn stderr (displayError e) *> exitFailure
Right _ -> exitSuccess
where
optInfo = info (helper <*> parseOptions) fullDesc
Expand Down
3 changes: 3 additions & 0 deletions git-remote-ipfs/git-remote-ipfs.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ library

build-depends:
aeson
, async
, attoparsec
, base >= 4.9 && < 5
, bytestring
Expand Down Expand Up @@ -106,6 +107,8 @@ executable git-remote-ipfs
, optparse-applicative
, text

ghc-options: -threaded -rtsopts "-with-rtsopts=-maxN4 -A8m"

test-suite e2e-tests
import: common

Expand Down
121 changes: 58 additions & 63 deletions git-remote-ipfs/src/Network/IPFS/Git/RemoteHelper.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ module Network.IPFS.Git.RemoteHelper
)
where

import Control.Concurrent.MVar (modifyMVar, newMVar, withMVar)
import Control.Exception.Safe
( MonadCatch
, MonadMask
, SomeException
, catchAny
, tryAny
Expand All @@ -21,7 +21,7 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as L
import Data.Foldable (toList, traverse_)
import Data.Foldable (toList)
import Data.IORef (atomicModifyIORef')
import Data.Maybe (catMaybes)
import Data.Text (Text)
Expand All @@ -30,6 +30,7 @@ import Data.Text.Encoding (decodeUtf8With)
import Data.Text.Encoding.Error (lenientDecode)
import qualified Data.Text.Read as Text
import Data.Traversable (for)
import GHC.Stack (HasCallStack)
import System.FilePath (joinPath, splitDirectories)

import Data.IPLD.CID (CID, cidFromText, cidToText)
Expand All @@ -51,6 +52,10 @@ data ProcessError
| IPFSError ClientError
| CidError String
| UnknownLocalRef Text
deriving Show

instance DisplayError ProcessError where
displayError = renderProcessError

renderProcessError :: ProcessError -> Text
renderProcessError = \case
Expand All @@ -59,10 +64,7 @@ renderProcessError = \case
CidError e -> fmt ("Cid conversion error: " % fstr) e
UnknownLocalRef r -> fmt ("Ref not found locally: " % ftxt) r

processCommand
:: (MonadMask m, MonadIO m)
=> Command
-> RemoteHelperT ProcessError m CommandResult
processCommand :: Command -> RemoteHelper ProcessError CommandResult
processCommand Capabilities =
pure $ CapabilitiesResult ["push", "fetch", "option"]

Expand Down Expand Up @@ -112,10 +114,11 @@ processCommand ListForPush = fmap ListForPushResult $ do

remoteRefs <- do
cids <-
fmap catMaybes . for branches $ \branch ->
ipfs $ resolvePath (cidToText root <> "/" <> branch)
forConcurrently clientMaxConns branches $ \branch ->
ipfs (resolvePath (cidToText root <> "/" <> branch))

traverse (liftEitherRH . first CidError . hexShaFromCidText) cids
for (catMaybes cids) $
liftEitherRH . first CidError . hexShaFromCidText

logDebug $ "list for-push: remoteRefs: " <> Text.pack (show remoteRefs)

Expand All @@ -138,27 +141,22 @@ processCommand (Fetch sha _) = FetchOk <$ processFetch sha

--------------------------------------------------------------------------------

processPush
:: (MonadMask m, MonadIO m)
=> Bool
-> Text
-> Text
-> RemoteHelperT ProcessError m CID
processPush :: Bool -> Text -> Text -> RemoteHelper ProcessError CID
processPush _ localRef remoteRef = do
root <- asks envIpfsRoot

localHeadRef <- do
localRefCid <- do
ref <- git $ Git.resolve (Git.Revision (Text.unpack localRef) [])
maybe (throwRH $ UnknownLocalRef localRef) (pure . refToCid) ref

remoteHeadRef <- do
remoteRefCid <- do
refCid <- ipfs $ resolvePath (cidToText root <> "/" <> remoteRef)
pure $ refCid >>= hush . cidFromText

go root remoteHeadRef localHeadRef
unless (Just localRefCid == remoteRefCid) $ go root localRefCid

-- patch link remoteRef
root' <- ipfs $ patchLink root remoteRef localHeadRef
root' <- ipfs $ patchLink root remoteRef localRefCid

-- The remote HEAD denotes the default branch to check out. If it is not
-- present, git clone will refuse to check out the worktree and exit with a
Expand All @@ -170,83 +168,80 @@ processPush _ localRef remoteRef = do
Nothing -> linkedObject root' "HEAD" "refs/heads/master"
root'' <$ ipfs (updateRemoteUrl root'')
where
go !root remoteHeadRef localHeadRef
| Just localHeadRef == remoteHeadRef = pure ()
| otherwise = do

logDebug $
fmt ("processPush: " % fcid % " " % fcid % " " % shown)
root
localHeadRef
(cidToRef @Git.SHA1 <$> remoteHeadRef)

go !root localRefCid = do
logDebug $ fmt ("processPush: " % fcid % " " % fcid) root localRefCid
obj <- do
sha <- liftEitherRH . first CidError $ cidToRef localHeadRef
sha <-
liftEitherRH . first CidError $
cidToRef @Git.SHA1 localRefCid
logDebug $ "sha " <> Text.pack (show sha)
localHeadRef' <- git $ Git.resolve (sha :: Git.Ref Git.SHA1)
case localHeadRef' of
Nothing -> throwRH $
UnknownLocalRef $ Text.pack (Git.toHexString sha)
Just r -> do
dir <- Git.gitRepoPath <$> Git.getGit
git . liftIO $ Git.looseRead dir r
dir <- Git.gitRepoPath <$> Git.getGit
git . liftIO $ Git.looseRead dir sha

let raw = Git.looseMarshall obj

logDebug $ "BlockPut: " <> decodeUtf8With lenientDecode (L.toStrict raw)
blockCid <- do
cid <- ipfs $ putBlock raw
-- check 'res' CID matches SHA
when (localHeadRef /= cid) $
when (localRefCid /= cid) $
throwRH
. CidError
$ sfmt ("CID mismatch: expected `" % fcid % "`, actual `" % fcid % "`")
localHeadRef
localRefCid
cid
pure cid

logDebug $ fmt ("blockCid: " % fcid) blockCid

-- if loose object > 2048k, create object + link block to it
objCid <-
if L.length raw > 2048000 then
linkedObject root ("objects/" <> cidToText blockCid) raw
else
pure blockCid
when (L.length raw > 2048000) $
void $ linkedObject root ("objects/" <> cidToText blockCid) raw

logDebug $ fmt ("objCid: " % fcid) objCid
-- process links
traverse_ (go root remoteHeadRef) (objectLinks obj)
forConcurrently_ clientMaxConns (objectLinks obj) $ go root

linkedObject base name raw = ipfs $ addObject raw >>= patchLink base name

processFetch
:: (MonadCatch m, MonadIO m)
=> Text
-> RemoteHelperT ProcessError m ()
processFetch :: Text -> RemoteHelper ProcessError ()
processFetch sha = do
repo <- Git.gitRepoPath <$> Git.getGit
root <- asks envIpfsRoot
cid <- liftEitherRH . first CidError $ cidFromHexShaText sha
lobjs <- ipfs $ largeObjects root -- XXX: load lobjs only once
go repo root lobjs cid
repo <- Git.getGit
root <- asks envIpfsRoot
cid <- liftEitherRH . first CidError $ cidFromHexShaText sha
lck <- liftIO $ newMVar ()
lobs <- do
env <- ask
(>>= either throwError pure)
. liftIO . modifyMVar (envLobs env) $ \case
Just ls -> pure (Just ls, Right ls)
Nothing ->
runRemoteHelper env (ipfs (largeObjects root)) >>= \case
Left e -> pure (Nothing, Left e)
Right ls -> pure (Just ls, Right ls)

go repo root lobs lck cid
where
go repo root lobjs cid = do
go !repo !root !lobs lck cid = do
ref <- liftEitherRH . first CidError $ cidToRef @Git.SHA1 cid
have <- Git.getGit >>= \g -> git . liftIO $ Git.getObject g ref True
have <-
-- Nb. mutex here as we might access the same packfile concurrently
git . liftIO . withMVar lck . const $
Git.getObject repo ref True
case have of
Just _ ->
logInfo $ fmt ("fetch: Skipping " % fref % " (" % fcid % ")") ref cid
Just _ -> logInfo $
fmt ("fetch: Skipping " % fref % " (" % fcid % ")") ref cid
Nothing -> do
raw <- do
blk <- ipfs $ provideBlock lobjs cid
blk <- ipfs $ provideBlock lobs cid
case blk of
Just b -> pure b
Nothing -> ipfs $ getBlock cid

let obj = Git.looseUnmarshall @Git.SHA1 raw
void . git . liftIO $ Git.looseWrite repo obj -- XXX: check sha matches
traverse_ (go repo root lobjs) (objectLinks obj)
void . git . liftIO $
Git.looseWrite (Git.gitRepoPath repo) obj -- XXX: check sha matches
forConcurrently_ clientMaxConns (objectLinks obj) $
go repo root lobs lck

--------------------------------------------------------------------------------

Expand All @@ -257,7 +252,7 @@ ipfs = mapError IPFSError

-- XXX: hs-git uses 'error' deliberately, should be using 'tryAnyDeep' here.
-- Requires patch to upstream to get 'NFData' instances everywhere.
git :: MonadCatch m
git :: (MonadCatch m, HasCallStack)
=> RemoteHelperT ProcessError m a
-> RemoteHelperT ProcessError m a
git f = either throwRH pure =<< fmap (first GitError) (tryAny f)
10 changes: 10 additions & 0 deletions git-remote-ipfs/src/Network/IPFS/Git/RemoteHelper/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Network.IPFS.Git.RemoteHelper.Client
, ClientError
, renderClientError

, clientMaxConns

, listPaths
, getRef
, resolvePath
Expand Down Expand Up @@ -75,6 +77,10 @@ data ClientError
| InvalidResponse Text Aeson.Value
| CidError String
| StreamingError String
deriving Show

instance DisplayError ClientError where
displayError = renderClientError

renderClientError :: ClientError -> Text
renderClientError = \case
Expand All @@ -91,6 +97,10 @@ data RefPath = RefPath

data RefPathType = RefPathRef | RefPathHead

-- FIXME(kim): We may want this to be configurable somehow
clientMaxConns :: Int
clientMaxConns = 30

listPaths
:: MonadIO m
=> Text
Expand Down
Loading

0 comments on commit 8a3444b

Please sign in to comment.