diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index d17487f04..3adc7c5f8 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -11,8 +11,7 @@ module HStream.Common.Server.Lookup ) where import Control.Concurrent.STM -import Control.Exception (SomeException (..), throwIO, - try) +import Control.Exception (SomeException (..), try) import Data.List (find) import Data.Text (Text) import qualified Data.Vector as V @@ -22,7 +21,6 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing, readLoadBalanceHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..)) import HStream.Common.Types (fromInternalServerNodeWithKey) -import qualified HStream.Exception as HE import HStream.Gossip (GossipContext, getMemberList) import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as M @@ -31,8 +29,7 @@ import qualified HStream.Server.HStreamApi as A lookupNode :: LoadBalanceHashRing -> Text -> Maybe Text -> IO A.ServerNode lookupNode loadBalanceHashRing key advertisedListenersKey = do (_, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing) - theNode <- getResNode hashRing key advertisedListenersKey - return theNode + getResNode hashRing key advertisedListenersKey lookupNodePersist :: M.MetaHandle diff --git a/hstream-io/HStream/IO/Meta.hs b/hstream-io/HStream/IO/Meta.hs index 9bf6c799a..517a35a9e 100644 --- a/hstream-io/HStream/IO/Meta.hs +++ b/hstream-io/HStream/IO/Meta.hs @@ -11,6 +11,7 @@ import qualified Data.Text as T import GHC.Stack (HasCallStack) import qualified Data.Aeson as J +import qualified Data.Map.Strict as M import HStream.IO.Types import qualified HStream.IO.Types as Types import HStream.MetaStore.Types (MetaHandle, MetaStore (..)) @@ -23,8 +24,8 @@ createIOTaskMeta h taskName taskId taskInfo = do insertMeta taskName (TaskIdMeta taskId) h listIOTaskMeta :: MetaHandle -> IO [API.Connector] -listIOTaskMeta h = do - map convertTaskMeta . filter (\TaskMeta{..} -> taskStateMeta /= DELETED) <$> listMeta @TaskMeta h +listIOTaskMeta h = + map convertTaskMeta . filter (\(_, TaskMeta{..}) -> taskStateMeta /= DELETED) . M.toList <$> getAllMeta @TaskMeta h getIOTaskMeta :: MetaHandle -> T.Text -> IO (Maybe TaskMeta) getIOTaskMeta h tid = getMeta tid h diff --git a/hstream-io/HStream/IO/Types.hs b/hstream-io/HStream/IO/Types.hs index 1453525df..bcb80d01d 100644 --- a/hstream-io/HStream/IO/Types.hs +++ b/hstream-io/HStream/IO/Types.hs @@ -157,13 +157,14 @@ instance HasPath TaskIdMeta FHandle where instance HasPath TaskKvMeta FHandle where myRootPath = "ioTaskKvs" -convertTaskMeta :: TaskMeta -> API.Connector -convertTaskMeta TaskMeta {..} = +convertTaskMeta :: (T.Text, TaskMeta) -> API.Connector +convertTaskMeta (taskId, TaskMeta {..}) = def { API.connectorName = taskName taskInfoMeta , API.connectorType = ioTaskTypeToText . taskType $ taskInfoMeta , API.connectorTarget = taskTarget taskInfoMeta , API.connectorCreationTime = Just . taskCreatedTime $ taskInfoMeta , API.connectorStatus = ioTaskStatusToText taskStateMeta + , API.connectorTaskId = taskId , API.connectorConfig = TL.toStrict . J.encodeToLazyText . J.lookup "connector" $ connectorConfig taskInfoMeta } diff --git a/hstream-io/HStream/IO/Worker.hs b/hstream-io/HStream/IO/Worker.hs index 514e3ebc9..325856a7d 100644 --- a/hstream-io/HStream/IO/Worker.hs +++ b/hstream-io/HStream/IO/Worker.hs @@ -89,7 +89,7 @@ createIOTask worker@Worker{..} name typ target cfg = do createIOTaskFromTaskInfo :: HasCallStack => Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO () -createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..} +createIOTaskFromTaskInfo Worker{..} taskId taskInfo@TaskInfo {..} ioOptions cleanIfExists createMetaData enableCheck = do M.getIOTaskFromName workerHandle taskName >>= \case Nothing -> pure () @@ -132,9 +132,8 @@ showIOTask_ worker@Worker{..} name = do Nothing -> throwIO $ HE.ConnectorNotFound name Just c -> do dockerStatus <- getDockerStatus task - let connector = convertTaskMeta c + let connector = convertTaskMeta (taskId, c) return $ connector { API.connectorOffsets = taskOffsets - , API.connectorTaskId = taskId , API.connectorNode = fromMaybe "" (getServerNode connectorConfig) , API.connectorConfig = getConnectorConfig connectorConfig , API.connectorImage = tcImage taskConfig diff --git a/hstream/src/HStream/Server/Handler/Admin.hs b/hstream/src/HStream/Server/Handler/Admin.hs index 193a18e8d..ccf63e0ab 100644 --- a/hstream/src/HStream/Server/Handler/Admin.hs +++ b/hstream/src/HStream/Server/Handler/Admin.hs @@ -458,12 +458,11 @@ runQuery sc AT.QueryCmdList = do runConnector :: ServerContext -> AT.ConnectorCommand -> IO Text runConnector ServerContext{..} AT.ConnectorCmdList = do connectors <- HC.listIOTasks scIOWorker - let headers = ["Connector Name" :: Text, "Type", "Target", "Node", "Task ID", "Status", "CreatedTime"] + let headers = ["Connector Name" :: Text, "Type", "Target", "Task ID", "Status", "CreatedTime"] rows <- forM connectors $ \API.Connector{..} -> do return [ connectorName , connectorType , connectorTarget - , connectorNode , connectorTaskId , connectorStatus , maybe "unknown" (Text.pack . show . timestampToMsTimestamp) connectorCreationTime