diff --git a/common/stats/HStream/Stats.hs b/common/stats/HStream/Stats.hs index 40d9da876..7fd1e90f3 100644 --- a/common/stats/HStream/Stats.hs +++ b/common/stats/HStream/Stats.hs @@ -312,7 +312,7 @@ PER_X_STAT_GETALL_SEP(connector_stat_, name) #include "../include/per_connector_stats.inc" -- cache_store -#define STAT_DEFINE(name, _) \ +#define STAT_DEFINE(name, _) \ PER_X_STAT_ADD(cache_store_stat_, name) \ PER_X_STAT_SET(cache_store_stat_, name) \ PER_X_STAT_GET(cache_store_stat_, name) \ diff --git a/hstream/src/HStream/Server/Core/ShardReader.hs b/hstream/src/HStream/Server/Core/ShardReader.hs index da4e10b60..82b516fef 100644 --- a/hstream/src/HStream/Server/Core/ShardReader.hs +++ b/hstream/src/HStream/Server/Core/ShardReader.hs @@ -143,12 +143,6 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do readRecords r@ShardReader{..} = do let cStreamName = textToCBytes targetStream !read_start <- getPOSIXTime - -- records <- readProcessGap r (fromIntegral readShardRequestMaxRecords) - -- Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start - -- Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records) - -- Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records) - -- let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records - -- receivedRecordsVecs <- forM records' decodeRecordBatch state <- readIORef serverState receivedRecordsVecs <- case state of ServerNormal -> do diff --git a/hstream/src/HStream/Server/Handler/Cluster.hs b/hstream/src/HStream/Server/Handler/Cluster.hs index cbe280ef2..547c3a8d8 100644 --- a/hstream/src/HStream/Server/Handler/Cluster.hs +++ b/hstream/src/HStream/Server/Handler/Cluster.hs @@ -58,7 +58,6 @@ lookupResourceHandler sc@ServerContext{..} (ServerNormalRequest _meta req@Lookup case lookupResourceRequestResType of Enumerated (Right rType) -> do validateResourceIdAndThrow rType lookupResourceRequestResId - -- returnResp =<< lookupResource sc rType lookupResourceRequestResId state <- readIORef serverState case state of ServerNormal -> do @@ -113,12 +112,6 @@ handleDescribeCluster sc _ _ = catchDefaultEx $ C.describeCluster sc handleLookupResource :: ServerContext -> G.UnaryHandler LookupResourceRequest ServerNode handleLookupResource sc@ServerContext{..} _sc req@LookupResourceRequest{..} = catchDefaultEx $ do - -- Log.debug $ "receive lookup resource request: " <> Log.build (show req) - -- case lookupResourceRequestResType of - -- Enumerated (Right rType) -> do - -- validateResourceIdAndThrow rType lookupResourceRequestResId - -- C.lookupResource sc rType lookupResourceRequestResId - -- x -> throwIO $ HE.InvalidResourceType (show x) Log.info $ "receive lookup resource request: " <> Log.build (show req) case lookupResourceRequestResType of Enumerated (Right rType) -> do diff --git a/hstream/test/HStream/ConfigSpec.hs b/hstream/test/HStream/ConfigSpec.hs index e3bde07fe..3f374d464 100644 --- a/hstream/test/HStream/ConfigSpec.hs +++ b/hstream/test/HStream/ConfigSpec.hs @@ -9,7 +9,6 @@ import Data.Bifunctor (second) import Data.ByteString (ByteString) import qualified Data.HashMap.Strict as HM import qualified Data.Map.Strict as M -import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isJust) import qualified Data.Set as Set import Data.Text (Text) @@ -29,10 +28,7 @@ import qualified Z.Data.CBytes as CB import HStream.Gossip (GossipOpts (..), defaultGossipOpts) import HStream.IO.Types (IOOptions (..)) -import HStream.Server.Config (CliOptions (..), - MetaStoreAddr (..), - ServerOpts (..), - TlsConfig (..), +import HStream.Server.Config (ServerOpts (..), parseHostPorts, parseJSONToOptions) import HStream.Server.Configuration.Cli @@ -54,12 +50,12 @@ spec = describe "HStream.ConfigSpec" $ do addr2 = "hstream://127.0.0.1:6571" listener2 = SAI.Listener{listenerAddress = "127.0.0.1", listenerPort = 6571} - either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1)) - `shouldBe` Map.toList (M.singleton "l1" (Set.singleton listener1)) - either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2)) - `shouldBe` Map.toList (M.fromList [ ("l1", Set.singleton listener1) - , ("l2", Set.singleton listener2) - ]) + either error M.toList (parseAdvertisedListeners ("l1:" <> addr1)) + `shouldBe` M.toList (M.singleton "l1" (Set.singleton listener1)) + either error M.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2)) + `shouldBe` M.toList (M.fromList [ ("l1", Set.singleton listener1) + , ("l2", Set.singleton listener2) + ]) xdescribe "TODO: parseConfig" $ do it "basic config test" $ do @@ -126,6 +122,8 @@ defaultConfig = ServerOpts , _ioOptions = defaultIOOptions , _querySnapshotPath = "/data/query_snapshots" , experimentalFeatures = [] + , _enableServerCache = False + , _cacheStorePath = "" , grpcChannelArgs = [] , serverTokens = [] } @@ -249,6 +247,9 @@ emptyCliOptions = CliOptions , cliQuerySnapshotPath = Nothing , cliExperimentalFeatures = [] + + , cliEnableServerCache = False + , cliCacheStorePath = Nothing } @@ -294,6 +295,8 @@ instance Arbitrary ServerOpts where let experimentalFeatures = [] let grpcChannelArgs = [] let serverTokens = [] + let _enableServerCache = False + let _cacheStorePath = "" pure ServerOpts{..} instance Arbitrary CliOptions where @@ -305,7 +308,7 @@ instance Arbitrary CliOptions where cliServerGossipAddress <- genMaybe addressGen cliServerAdvertisedAddress <- genMaybe addressGen cliServerAdvertisedListeners <- arbitrary - cliListenersSecurityProtocolMap <- M.fromList . zip (Map.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"] + cliListenersSecurityProtocolMap <- M.fromList . zip (M.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"] cliServerInternalPort <- genMaybe $ fromIntegral <$> portGen cliServerID <- arbitrary cliServerLogLevel <- genMaybe $ read <$> logLevelGen @@ -328,6 +331,8 @@ instance Arbitrary CliOptions where cliIoConnectorImages <- listOf5' $ T.pack <$> connectorImageCliOptGen let cliQuerySnapshotPath = Just "/data/query_snapshots" let cliExperimentalFeatures = [] + let cliEnableServerCache = False + let cliCacheStorePath = Nothing pure CliOptions{..} instance Arbitrary Listener where @@ -447,8 +452,8 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x { , _serverInternalPort = fromMaybe _serverInternalPort cliServerInternalPort , _serverAddress = fromMaybe _serverAddress cliServerAdvertisedAddress , _serverGossipAddress = fromMaybe _serverGossipAddress cliServerGossipAddress - , _serverAdvertisedListeners = Map.union cliServerAdvertisedListeners _serverAdvertisedListeners - , _listenersSecurityProtocolMap = Map.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap + , _serverAdvertisedListeners = M.union cliServerAdvertisedListeners _serverAdvertisedListeners + , _listenersSecurityProtocolMap = M.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap , _serverID = fromMaybe _serverID cliServerID , _metaStore = fromMaybe _metaStore cliMetaStore , _ldConfigPath = cliStoreConfigPath @@ -462,7 +467,7 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x { , _ldLogLevel = fromMaybe _ldLogLevel cliLdLogLevel , _ckpRepFactor = fromMaybe _ckpRepFactor cliCkpRepFactor , _ioOptions = cliIoOptions - , _securityProtocolMap = Map.insert "tls" tlsConfig' _securityProtocolMap} + , _securityProtocolMap = M.insert "tls" tlsConfig' _securityProtocolMap} where port = fromMaybe _serverPort cliServerPort updateSeedsPort = second $ fromMaybe (fromIntegral port)