Skip to content

Commit

Permalink
add admin cli command to support list and clean connector related met…
Browse files Browse the repository at this point in the history
…a data (#1827)

* hadmin-server: add connector resource type for meta command

* hadmin-server: add metaClean subcommand to support clean dirty connector meta

* hadmin-server: update cli help info for meta get-task command
  • Loading branch information
YangKian authored Jun 5, 2024
1 parent b1820d8 commit 42d98cb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 17 deletions.
20 changes: 16 additions & 4 deletions hstream-admin/server/HStream/Admin/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ data MetaCommand
= MetaCmdList Text
| MetaCmdGet Text Text
| MetaCmdTask MetaTaskCommand
| MetaCmdClean MetaCleanCommand
| MetaCmdInfo
deriving (Show)

Expand All @@ -283,20 +284,22 @@ metaCmdParser = O.hsubparser
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|view-info|qv-relation]")))
<> "[subscription|query-info|view-info|qv-relation|connectors|connector-infos]")))
(O.progDesc "List all metadata of specific resource"))
<> O.command "get" (O.info (MetaCmdGet <$> O.strOption ( O.long "resource"
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help ("The category of the resource, currently support: "
<> "[subscription|query-info|query-status|view-info|qv-relation]"))
<> "[subscription|query-info|query-status|view-info|qv-relation|connector|connector-info]"))
<*> O.strOption ( O.long "id"
<> O.short 'i'
<> O.metavar "RESOURCE_ID"
<> O.help "The Id of the resource"))
(O.progDesc "Get metadata of specific resource"))
<> O.command "info" (O.info (pure MetaCmdInfo) (O.progDesc "Get meta info"))
) O.<|> MetaCmdTask <$> metaTaskCmdParser
)
O.<|> MetaCmdTask <$> metaTaskCmdParser
O.<|> MetaCmdClean <$> metaCleanCmdParser

data MetaTaskCommand
= MetaTaskGet Text Text
Expand All @@ -307,14 +310,23 @@ metaTaskCmdParser = O.hsubparser
( O.command "get-task" (O.info (MetaTaskGet <$> O.strOption ( O.long "resource"
<> O.short 'r'
<> O.metavar "RESOURCE_CATEGORY"
<> O.help "The category of the resource")
<> O.help ("The category of the resource, currently support: "
<> "[stream|subscription|query|view|connector|shard|shard-reader]"))
<*> O.strOption ( O.long "id"
<> O.short 'i'
<> O.metavar "RESOURCE_ID"
<> O.help "The Id of the resource"))
(O.progDesc "Get task allocation metadata of specific resource"))
)

data MetaCleanCommand = CleanConnectors
deriving (Show)

metaCleanCmdParser :: O.Parser MetaCleanCommand
metaCleanCmdParser = O.hsubparser
( O.command "clean-connectors" (O.info (pure CleanConnectors) (O.progDesc "Clean up the taskMeta of connectors in deleted state."))
)

-------------------------------------------------------------------------------

-- TODO: auto generate from commom/stats/include/*.inc
Expand Down
49 changes: 36 additions & 13 deletions hstream/src/HStream/Server/Handler/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ module HStream.Server.Handler.Admin

import Control.Concurrent (readMVar, tryReadMVar)
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad (forM, void)
import Control.Exception (catch, throw)
import Control.Monad (forM, void, when)
import Data.Aeson ((.=))
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Text as Aeson
import qualified Data.HashMap.Strict as HM
import qualified Data.List as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as ST
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Lazy as TL
Expand All @@ -36,7 +38,6 @@ import Proto3.Suite (Enumerated (Enumerated),
import qualified Z.Data.CBytes as CB
import Z.Data.CBytes (CBytes)

import Control.Exception (throw)
import qualified HStream.Admin.Server.Types as AT
import HStream.Base (rmTrailingZeros)
import qualified HStream.Exception as HE
Expand All @@ -61,6 +62,7 @@ import qualified HStream.Server.Core.View as HC
#endif
import HStream.Common.Server.MetaData (TaskAllocation,
renderTaskAllocationsToTable)
import HStream.IO.Types (TaskIdMeta (..), TaskMeta)
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import qualified HStream.Server.HStreamApi as API
Expand All @@ -69,6 +71,8 @@ import HStream.Server.MetaData (QVRelation, QueryInfo,
renderQVRelationToTable,
renderQueryInfosToTable,
renderQueryStatusToTable,
renderTaskIdMetaMapToTable,
renderTaskMetaMapToTable,
renderViewInfosToTable)
import HStream.Server.Types
import qualified HStream.Stats as Stats
Expand All @@ -78,6 +82,7 @@ import HStream.Utils (Interval (..), cBytesToText,
returnResp, showNodeStatus,
structToJsonObject,
timestampToMsTimestamp)
import ZooKeeper.Exception (ZNONODE)

-------------------------------------------------------------------------------
-- All command line data types are defined in 'HStream.Admin.Types'
Expand Down Expand Up @@ -244,19 +249,25 @@ getResType resType =
runMeta :: ServerContext -> AT.MetaCommand -> IO Text
runMeta ServerContext{..} (AT.MetaCmdList resType) = do
case resType of
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|view-info|qv-relateion]"
"subscription" -> pure <$> AT.tableResponse . renderSubscriptionWrapToTable =<< M.listMeta @SubscriptionWrap metaHandle
"query-info" -> pure <$> AT.plainResponse . renderQueryInfosToTable =<< M.listMeta @QueryInfo metaHandle
"view-info" -> pure <$> AT.plainResponse . renderViewInfosToTable =<< M.listMeta @ViewInfo metaHandle
"qv-relation" -> pure <$> AT.tableResponse . renderQVRelationToTable =<< M.listMeta @QVRelation metaHandle
"connectors" -> pure <$> AT.tableResponse . renderTaskIdMetaMapToTable =<< M.getAllMeta @TaskIdMeta metaHandle
"connector-infos" -> pure <$> AT.tableResponse . renderTaskMetaMapToTable =<< M.getAllMeta @TaskMeta metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|view-info|qv-relateion|connectors|connector-infos]"
runMeta ServerContext{..} (AT.MetaCmdGet resType rId) = do
case resType of
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try [subscription|query-info|query-status|view-info|qv-relateion]"
"subscription" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderSubscriptionWrapToTable .L.singleton) =<< M.getMeta @SubscriptionWrap rId metaHandle
"query-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderQueryInfosToTable . L.singleton) =<< M.getMeta @QueryInfo rId metaHandle
"query-status" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQueryStatusToTable . L.singleton) =<< M.getMeta @QueryStatus rId metaHandle
"view-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.plainResponse . renderViewInfosToTable . L.singleton) =<< M.getMeta @ViewInfo rId metaHandle
"qv-relation" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderQVRelationToTable . L.singleton) =<< M.getMeta @QVRelation rId metaHandle
"connector" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskIdMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskIdMeta rId metaHandle
"connector-info" -> pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskMetaMapToTable . Map.singleton rId) =<< M.getMeta @TaskMeta rId metaHandle
_ -> return $ AT.errorResponse "invalid resource type, try "
<> "[subscription|query-info|query-status|view-info|qv-relateion|connector|connector-info]"
runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
let headers = ["Meta Type" :: Text, "Connection Info"]
rows = case _metaStore of
Expand All @@ -266,12 +277,24 @@ runMeta ServerContext{serverOpts=ServerOpts{..}} AT.MetaCmdInfo = do
content = Aeson.object ["headers" .= headers, "rows" .= rows]
return $ AT.tableResponse content
runMeta sc (AT.MetaCmdTask taskCmd) = runMetaTask sc taskCmd
runMeta sc (AT.MetaCmdClean cmd) = runMetaCleanTask sc cmd

runMetaTask :: ServerContext -> AT.MetaTaskCommand -> IO Text
runMetaTask ServerContext{..} (AT.MetaTaskGet resType rId) = do
let metaId = mkAllocationKey (getResType resType) rId
pure <$> maybe (AT.plainResponse "Not Found") (AT.tableResponse . renderTaskAllocationsToTable . L.singleton) =<< M.getMeta @TaskAllocation metaId metaHandle

runMetaCleanTask :: ServerContext -> AT.MetaCleanCommand -> IO Text
runMetaCleanTask ServerContext{..} AT.CleanConnectors = do
activIds <- ST.fromList . map taskIdMeta <$> M.listMeta @TaskIdMeta metaHandle
allTaskMetas <- M.getAllMeta @TaskMeta metaHandle
void $ Map.traverseWithKey (\k _ -> removeMeta k activIds) allTaskMetas
return $ AT.plainResponse "OK"
where
removeMeta key ids = do
when (ST.notMember key ids) $
catch (M.deleteMeta @TaskMeta key Nothing metaHandle) $ \(_ :: ZNONODE) -> return ()

-------------------------------------------------------------------------------
-- Admin Stream Command

Expand Down
30 changes: 30 additions & 0 deletions hstream/src/HStream/Server/MetaData/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ module HStream.Server.MetaData.Types
, renderQueryStatusToTable
, renderViewInfosToTable
, renderQVRelationToTable
, renderTaskMetaMapToTable
, renderTaskIdMetaMapToTable

#ifdef HStreamEnableSchema
, hstreamColumnCatalogToColumnCatalog
Expand All @@ -55,8 +57,11 @@ import Data.Int (Int64)
import qualified Data.IntMap as IntMap
import Data.IORef
import qualified Data.List as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Time.Clock.System (SystemTime (MkSystemTime),
getSystemTime)
import qualified Data.Vector as V
Expand All @@ -67,6 +72,9 @@ import GHC.Stack

import HStream.Common.Server.MetaData (rootPath)
import HStream.Common.ZookeeperClient (ZookeeperClient)
import HStream.IO.Types (TaskIdMeta (..),
TaskInfo (..),
TaskMeta (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (FHandle, HasPath (..),
MetaHandle,
Expand All @@ -79,6 +87,7 @@ import qualified HStream.Server.HStreamApi as API
import HStream.Server.MetaData.Exception
import HStream.Server.Types (SubscriptionWrap (..))
import qualified HStream.Store as S
import HStream.ThirdParty.Protobuf (toRFC3339)
import HStream.Utils
#ifdef HStreamUseV2Engine
import DiffFlow.Types
Expand Down Expand Up @@ -157,6 +166,27 @@ renderQVRelationToTable relations =
rows = map (\QVRelation{..} -> [qvRelationQueryName, qvRelationViewName]) relations
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

renderTaskMetaMapToTable :: Map Text TaskMeta -> Aeson.Value
renderTaskMetaMapToTable mp =
let headers = [ "Connector Name" :: Text
, "Task Id" :: Text
, "Task Type" :: Text
, "Created Time" :: Text
, "State" :: Text
]
rows = map getMetaInfo $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]
where
getMetaInfo (taskId, TaskMeta{taskInfoMeta=TaskInfo{..}, ..}) =
let createTime = TL.toStrict . toRFC3339 $ taskCreatedTime
in [taskName, taskId, T.pack . show $ taskType, createTime, T.pack . show $ taskStateMeta]

renderTaskIdMetaMapToTable :: Map Text TaskIdMeta -> Aeson.Value
renderTaskIdMetaMapToTable mp =
let headers = ["Connector Name" :: Text , "Task Id"]
rows = map (\(name, TaskIdMeta{..}) -> [name, taskIdMeta]) $ M.toList mp
in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows]

type SourceStreams = [Text]
type SinkStream = Text
type RelatedStreams = (SourceStreams, SinkStream)
Expand Down

0 comments on commit 42d98cb

Please sign in to comment.