Skip to content

Commit

Permalink
Fetch only Org peers for Chaincode operations & Configurable offset r…
Browse files Browse the repository at this point in the history
…eset policy (#135)

* Configurable offset reset policy

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>

* Fetch only Org peers for Chaincode operations

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>

---------

Signed-off-by: “Nithin <nithin.pankaj@walmartlabs.com>
Co-authored-by: “Nithin <nithin.pankaj@walmartlabs.com>
  • Loading branch information
nithin-pankaj and “Nithin authored Feb 23, 2024
1 parent 84c249b commit 5afb0d1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
kafkaConsumerProperties.getSaslJaasConfig());
}

if (StringUtils.isNotBlank(kafkaConsumerProperties.getOffsetResetPolicy())) {
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProperties.getOffsetResetPolicy());
}

// Adding SSL configuration if Kafka Cluster is SSL secured
if (kafkaConsumerProperties.isSslAuthRequired()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static class Consumer extends SSLProperties {
private int topicPartitions = 1;
private boolean enableParallelListenerCapabilities = false;
private String saslJaasConfig;
private String offsetResetPolicy;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public String getCurrentVersion(String networkName, String chaincodeName) {
Network network = gateway.getNetwork(networkName);
Channel channel = network.getChannel();

Collection<Peer> peers = channel.getPeers();
Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

final QueryLifecycleQueryChaincodeDefinitionRequest
queryLifecycleQueryChaincodeDefinitionRequest =
Expand Down Expand Up @@ -146,7 +146,7 @@ public String getCurrentSequence(
Network network = gateway.getNetwork(networkName);
Channel channel = network.getChannel();

Collection<Peer> peers = channel.getPeers();
Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

final QueryLifecycleQueryChaincodeDefinitionRequest
queryLifecycleQueryChaincodeDefinitionRequest =
Expand Down Expand Up @@ -191,9 +191,11 @@ public String getCurrentPackageId(

Network network = gateway.getNetwork(networkName);
Channel channel = network.getChannel();
Collection<Peer> peers = channel.getPeers();

try {

Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

Collection<LifecycleQueryInstalledChaincodesProposalResponse> results =
hfClientWrapper
.getHfClient()
Expand Down Expand Up @@ -237,7 +239,7 @@ public String getCollectionConfig(
Network network = gateway.getNetwork(networkName);
Channel channel = network.getChannel();

Collection<Peer> peers = channel.getPeers();
Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

final QueryLifecycleQueryChaincodeDefinitionRequest
queryLifecycleQueryChaincodeDefinitionRequest =
Expand Down Expand Up @@ -345,29 +347,30 @@ private String approveChaincode(
ChaincodeOperations chaincodeOperationsModel,
Optional<ChaincodeCollectionConfiguration> chaincodeCollectionConfigurationOptional) {

Collection<Peer> peers = channel.getPeers();
try {

Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

if (!CollectionUtils.isEmpty(chaincodeOperationsModel.getPeerNames())) {
if (!CollectionUtils.isEmpty(chaincodeOperationsModel.getPeerNames())) {

Set<String> peerFilter = chaincodeOperationsModel.getPeerNames();
Set<String> peerFilter = chaincodeOperationsModel.getPeerNames();

peers =
peers.stream()
.filter(channelPeer -> peerFilter.contains(channelPeer.getName()))
.collect(Collectors.toList());
peers =
peers.stream()
.filter(channelPeer -> peerFilter.contains(channelPeer.getName()))
.collect(Collectors.toList());

if (CollectionUtils.isEmpty(peers)) {
log.error(
"No Peers identified with the names {} in channel {}. Skipping approval",
peerFilter,
channel.getName());
throw new ServiceException(
ErrorCode.HYPERLEDGER_FABRIC_CHAINCODE_OPERATIONS_REQUEST_REJECTION,
"Invalid Peer details");
if (CollectionUtils.isEmpty(peers)) {
log.error(
"No Peers identified with the names {} in channel {}. Skipping approval",
peerFilter,
channel.getName());
throw new ServiceException(
ErrorCode.HYPERLEDGER_FABRIC_CHAINCODE_OPERATIONS_REQUEST_REJECTION,
"Invalid Peer details");
}
}
}

try {
LifecycleApproveChaincodeDefinitionForMyOrgRequest
lifecycleApproveChaincodeDefinitionForMyOrgRequest =
hfClientWrapper.getHfClient().newLifecycleApproveChaincodeDefinitionForMyOrgRequest();
Expand Down Expand Up @@ -419,8 +422,10 @@ private String commitChaincode(
ChaincodeOperations chaincodeOperationsModel,
Optional<ChaincodeCollectionConfiguration> chaincodeCollectionConfigurationOptional) {

Collection<Peer> peers = channel.getPeers();
try {

Collection<Peer> peers = channel.getPeersForOrganization(gateway.getIdentity().getMspId());

LifecycleCommitChaincodeDefinitionRequest lifecycleCommitChaincodeDefinitionRequest =
hfClientWrapper.getHfClient().newLifecycleCommitChaincodeDefinitionRequest();

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.template
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ kafka:
security-protocol: <Only supports SSL>
ssl-keystore-base64: <if ssl-enabled is true, provide the Base64 encoded value of keystore file>
ssl-truststore-base64: <if ssl-enabled is true, provide the Base64 encoded value of Trsutstore file>
offsetResetPolicy: <possible values are earliest / latest> if not provided, default will be latest.
event-listener:
brokerHost: <Comma separated list of boostrap servers>
topic: <topic to publish Block or Chaincode Events>
Expand Down

0 comments on commit 5afb0d1

Please sign in to comment.