Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jul 4, 2024
1 parent 7f94aaf commit 7e853c8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 29 deletions.
2 changes: 1 addition & 1 deletion common/stats/HStream/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ PER_X_STAT_GETALL_SEP(connector_stat_, name)
#include "../include/per_connector_stats.inc"

-- cache_store
#define STAT_DEFINE(name, _) \
#define STAT_DEFINE(name, _) \
PER_X_STAT_ADD(cache_store_stat_, name) \
PER_X_STAT_SET(cache_store_stat_, name) \
PER_X_STAT_GET(cache_store_stat_, name) \
Expand Down
6 changes: 0 additions & 6 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
readRecords r@ShardReader{..} = do
let cStreamName = textToCBytes targetStream
!read_start <- getPOSIXTime
-- records <- readProcessGap r (fromIntegral readShardRequestMaxRecords)
-- Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start
-- Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records)
-- Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records)
-- let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records
-- receivedRecordsVecs <- forM records' decodeRecordBatch
state <- readIORef serverState
receivedRecordsVecs <- case state of
ServerNormal -> do
Expand Down
7 changes: 0 additions & 7 deletions hstream/src/HStream/Server/Handler/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ lookupResourceHandler sc@ServerContext{..} (ServerNormalRequest _meta req@Lookup
case lookupResourceRequestResType of
Enumerated (Right rType) -> do
validateResourceIdAndThrow rType lookupResourceRequestResId
-- returnResp =<< lookupResource sc rType lookupResourceRequestResId
state <- readIORef serverState
case state of
ServerNormal -> do
Expand Down Expand Up @@ -113,12 +112,6 @@ handleDescribeCluster sc _ _ = catchDefaultEx $ C.describeCluster sc

handleLookupResource :: ServerContext -> G.UnaryHandler LookupResourceRequest ServerNode
handleLookupResource sc@ServerContext{..} _sc req@LookupResourceRequest{..} = catchDefaultEx $ do
-- Log.debug $ "receive lookup resource request: " <> Log.build (show req)
-- case lookupResourceRequestResType of
-- Enumerated (Right rType) -> do
-- validateResourceIdAndThrow rType lookupResourceRequestResId
-- C.lookupResource sc rType lookupResourceRequestResId
-- x -> throwIO $ HE.InvalidResourceType (show x)
Log.info $ "receive lookup resource request: " <> Log.build (show req)
case lookupResourceRequestResType of
Enumerated (Right rType) -> do
Expand Down
35 changes: 20 additions & 15 deletions hstream/test/HStream/ConfigSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust)
import qualified Data.Set as Set
import Data.Text (Text)
Expand All @@ -29,10 +28,7 @@ import qualified Z.Data.CBytes as CB
import HStream.Gossip (GossipOpts (..),
defaultGossipOpts)
import HStream.IO.Types (IOOptions (..))
import HStream.Server.Config (CliOptions (..),
MetaStoreAddr (..),
ServerOpts (..),
TlsConfig (..),
import HStream.Server.Config (ServerOpts (..),
parseHostPorts,
parseJSONToOptions)
import HStream.Server.Configuration.Cli
Expand All @@ -54,12 +50,12 @@ spec = describe "HStream.ConfigSpec" $ do
addr2 = "hstream://127.0.0.1:6571"
listener2 = SAI.Listener{listenerAddress = "127.0.0.1", listenerPort = 6571}

either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1))
`shouldBe` Map.toList (M.singleton "l1" (Set.singleton listener1))
either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2))
`shouldBe` Map.toList (M.fromList [ ("l1", Set.singleton listener1)
, ("l2", Set.singleton listener2)
])
either error M.toList (parseAdvertisedListeners ("l1:" <> addr1))
`shouldBe` M.toList (M.singleton "l1" (Set.singleton listener1))
either error M.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2))
`shouldBe` M.toList (M.fromList [ ("l1", Set.singleton listener1)
, ("l2", Set.singleton listener2)
])

xdescribe "TODO: parseConfig" $ do
it "basic config test" $ do
Expand Down Expand Up @@ -126,6 +122,8 @@ defaultConfig = ServerOpts
, _ioOptions = defaultIOOptions
, _querySnapshotPath = "/data/query_snapshots"
, experimentalFeatures = []
, _enableServerCache = False
, _cacheStorePath = ""
, grpcChannelArgs = []
, serverTokens = []
}
Expand Down Expand Up @@ -249,6 +247,9 @@ emptyCliOptions = CliOptions
, cliQuerySnapshotPath = Nothing

, cliExperimentalFeatures = []

, cliEnableServerCache = False
, cliCacheStorePath = Nothing
}


Expand Down Expand Up @@ -294,6 +295,8 @@ instance Arbitrary ServerOpts where
let experimentalFeatures = []
let grpcChannelArgs = []
let serverTokens = []
let _enableServerCache = False
let _cacheStorePath = ""
pure ServerOpts{..}

instance Arbitrary CliOptions where
Expand All @@ -305,7 +308,7 @@ instance Arbitrary CliOptions where
cliServerGossipAddress <- genMaybe addressGen
cliServerAdvertisedAddress <- genMaybe addressGen
cliServerAdvertisedListeners <- arbitrary
cliListenersSecurityProtocolMap <- M.fromList . zip (Map.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"]
cliListenersSecurityProtocolMap <- M.fromList . zip (M.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"]
cliServerInternalPort <- genMaybe $ fromIntegral <$> portGen
cliServerID <- arbitrary
cliServerLogLevel <- genMaybe $ read <$> logLevelGen
Expand All @@ -328,6 +331,8 @@ instance Arbitrary CliOptions where
cliIoConnectorImages <- listOf5' $ T.pack <$> connectorImageCliOptGen
let cliQuerySnapshotPath = Just "/data/query_snapshots"
let cliExperimentalFeatures = []
let cliEnableServerCache = False
let cliCacheStorePath = Nothing
pure CliOptions{..}

instance Arbitrary Listener where
Expand Down Expand Up @@ -447,8 +452,8 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x {
, _serverInternalPort = fromMaybe _serverInternalPort cliServerInternalPort
, _serverAddress = fromMaybe _serverAddress cliServerAdvertisedAddress
, _serverGossipAddress = fromMaybe _serverGossipAddress cliServerGossipAddress
, _serverAdvertisedListeners = Map.union cliServerAdvertisedListeners _serverAdvertisedListeners
, _listenersSecurityProtocolMap = Map.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap
, _serverAdvertisedListeners = M.union cliServerAdvertisedListeners _serverAdvertisedListeners
, _listenersSecurityProtocolMap = M.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap
, _serverID = fromMaybe _serverID cliServerID
, _metaStore = fromMaybe _metaStore cliMetaStore
, _ldConfigPath = cliStoreConfigPath
Expand All @@ -462,7 +467,7 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x {
, _ldLogLevel = fromMaybe _ldLogLevel cliLdLogLevel
, _ckpRepFactor = fromMaybe _ckpRepFactor cliCkpRepFactor
, _ioOptions = cliIoOptions
, _securityProtocolMap = Map.insert "tls" tlsConfig' _securityProtocolMap}
, _securityProtocolMap = M.insert "tls" tlsConfig' _securityProtocolMap}
where
port = fromMaybe _serverPort cliServerPort
updateSeedsPort = second $ fromMaybe (fromIntegral port)
Expand Down

0 comments on commit 7e853c8

Please sign in to comment.