Skip to content

Commit

Permalink
hstream-store: add OffsetCheckpointStore (#1604)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Sep 14, 2023
1 parent b81084d commit c7d74dd
Showing 1 changed file with 52 additions and 20 deletions.
72 changes: 52 additions & 20 deletions hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ module HStream.Store.Stream
, LD.ckpStoreRemoveCheckpoints
, LD.ckpStoreRemoveAllCheckpoints
--
, initOffsetCheckpointDir
, allocOffsetCheckpointId
, freeOffsetCheckpointId
--
, initSubscrCheckpointDir
, allocSubscrCheckpointId
, getSubscrCheckpointId
Expand Down Expand Up @@ -225,6 +229,9 @@ updateGloStreamSettings f = atomicModifyIORef' gloStreamSettings $ \s -> (f s, (
subscriptionCheckpointDir :: CBytes
subscriptionCheckpointDir = "/hstream/subscription/checkpoint"

offsetCheckpointDir :: CBytes
offsetCheckpointDir = "/hstream/offset/checkpoint"

-------------------------------------------------------------------------------

#ifdef HSTREAM_USE_LOCAL_STREAM_CACHE
Expand Down Expand Up @@ -665,36 +672,61 @@ checkpointStoreLogID :: FFI.C_LogID
checkpointStoreLogID = bit 56

initSubscrCheckpointDir :: FFI.LDClient -> LD.LogAttributes -> IO ()
initSubscrCheckpointDir client attrs = catch f (\(_ :: E.EXISTS) -> return ())
initSubscrCheckpointDir client =
initCheckpointDir client subscriptionCheckpointDir

allocSubscrCheckpointId :: FFI.LDClient -> CBytes -> IO FFI.C_LogID
allocSubscrCheckpointId client =
allocCheckpointId client subscriptionCheckpointDir

freeSubscrCheckpointId :: FFI.LDClient -> CBytes -> IO ()
freeSubscrCheckpointId client =
freeCheckpointId client subscriptionCheckpointDir

-- Prefer to using 'allocSubscrCheckpointId' instead
getSubscrCheckpointId :: HasCallStack => FFI.LDClient -> CBytes -> IO FFI.C_LogID
getSubscrCheckpointId client = getCheckpointId client subscriptionCheckpointDir

initOffsetCheckpointDir :: FFI.LDClient -> LD.LogAttributes -> IO ()
initOffsetCheckpointDir client = initCheckpointDir client offsetCheckpointDir

allocOffsetCheckpointId :: FFI.LDClient -> CBytes -> IO FFI.C_LogID
allocOffsetCheckpointId client = allocCheckpointId client offsetCheckpointDir

freeOffsetCheckpointId :: FFI.LDClient -> CBytes -> IO ()
freeOffsetCheckpointId client = freeCheckpointId client offsetCheckpointDir

initCheckpointDir :: FFI.LDClient -> CBytes -> LD.LogAttributes -> IO ()
initCheckpointDir client dir attrs = catch f (\(_ :: E.EXISTS) -> return ())
where
f = do
dir <- LD.makeLogDirectory client subscriptionCheckpointDir attrs True
LD.syncLogsConfigVersion client =<< LD.logDirectoryGetVersion dir
logdir <- LD.makeLogDirectory client dir attrs True
LD.syncLogsConfigVersion client =<< LD.logDirectoryGetVersion logdir

allocSubscrCheckpointId :: FFI.LDClient -> CBytes -> IO FFI.C_LogID
allocSubscrCheckpointId client key = do
r <- try $ getSubscrCheckpointId client key
allocCheckpointId :: FFI.LDClient -> CBytes -> CBytes -> IO FFI.C_LogID
allocCheckpointId client dir key = do
r <- try $ getCheckpointId client dir key
case r of
Left (_ :: E.NOTFOUND) ->
createRandomLogGroup client (subscriptionCheckpointDir <> "/" <> key) def
createRandomLogGroup client (dir <> "/" <> key) def
Right logid -> return logid

freeSubscrCheckpointId :: FFI.LDClient -> CBytes -> IO ()
freeSubscrCheckpointId client key = do
catch (deleteSubscrCheckpointId client key) $ \(_ :: E.NOTFOUND) -> do
Log.warning $ "freeSubscrCheckpointId NotFound: " <> Log.buildString' key
return ()
-- Prefer to using 'allocSubscrCheckpointId' instead
getCheckpointId :: HasCallStack => FFI.LDClient -> CBytes-> CBytes -> IO FFI.C_LogID
getCheckpointId client dir key = do
let logpath = dir <> "/" <> key
fst <$> (LD.logGroupGetRange =<< LD.getLogGroup client logpath)

deleteSubscrCheckpointId :: FFI.LDClient -> CBytes -> IO ()
deleteSubscrCheckpointId client key = do
let logpath = subscriptionCheckpointDir <> "/" <> key
deleteCheckpointId :: FFI.LDClient -> CBytes -> CBytes -> IO ()
deleteCheckpointId client dir key = do
let logpath = dir <> "/" <> key
LD.syncLogsConfigVersion client =<< LD.removeLogGroup client logpath

-- Prefer to using 'allocSubscrCheckpointId' instead
getSubscrCheckpointId :: HasCallStack => FFI.LDClient -> CBytes -> IO FFI.C_LogID
getSubscrCheckpointId client key = do
let logpath = subscriptionCheckpointDir <> "/" <> key
fst <$> (LD.logGroupGetRange =<< LD.getLogGroup client logpath)
freeCheckpointId :: FFI.LDClient -> CBytes -> CBytes -> IO ()
freeCheckpointId client dir key = do
catch (deleteCheckpointId client dir key) $ \(_ :: E.NOTFOUND) -> do
Log.warning $ "freeCheckpointId NotFound: " <> Log.buildString' key
return ()

-------------------------------------------------------------------------------
-- Reader
Expand Down

0 comments on commit c7d74dd

Please sign in to comment.