Skip to content

Commit

Permalink
prettier changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaiden Ashmore authored and Jaiden Ashmore committed Oct 6, 2024
1 parent 592d4f3 commit aeca30b
Show file tree
Hide file tree
Showing 60 changed files with 241 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
import com.jashmore.sqs.util.annotation.AnnotationUtils;
import com.jashmore.sqs.util.identifier.IdentifierUtils;
import com.jashmore.sqs.util.string.StringUtils;
import lombok.Builder;
import org.immutables.value.Value;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Builder;
import org.immutables.value.Value;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

/**
* {@link MessageListenerContainerFactory} that can be used to build against an annotated method.
*
* @param <A> annotation that is applied on the method
*/
public class AnnotationMessageListenerContainerFactory<A extends Annotation> implements MessageListenerContainerFactory {

private final Class<A> annotationClass;
private final Function<A, String> identifierMapper;
private final Function<A, String> sqsClientIdentifier;
Expand All @@ -52,15 +52,17 @@ public class AnnotationMessageListenerContainerFactory<A extends Annotation> imp
* @param argumentResolverService to map the parameters of the method to values in the message
* @param containerFactory converts details about the annotation to the final {@link MessageListenerContainer}
*/
public AnnotationMessageListenerContainerFactory(final Class<A> annotationClass,
final Function<A, String> identifierMapper,
final Function<A, String> sqsClientIdentifierMapper,
final Function<A, String> queueNameOrUrlMapper,
final QueueResolver queueResolver,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory,
final ArgumentResolverService argumentResolverService,
final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory) {
public AnnotationMessageListenerContainerFactory(
final Class<A> annotationClass,
final Function<A, String> identifierMapper,
final Function<A, String> sqsClientIdentifierMapper,
final Function<A, String> queueNameOrUrlMapper,
final QueueResolver queueResolver,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory,
final ArgumentResolverService argumentResolverService,
final Function<AnnotationDetails<A>, MessageListenerContainer> containerFactory
) {
this.annotationClass = annotationClass;
this.identifierMapper = identifierMapper;
this.sqsClientIdentifier = sqsClientIdentifierMapper;
Expand All @@ -73,56 +75,66 @@ public AnnotationMessageListenerContainerFactory(final Class<A> annotationClass,
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method) throws MessageListenerContainerInitialisationException {
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
throws MessageListenerContainerInitialisationException {
return AnnotationUtils
.findMethodAnnotation(method, this.annotationClass)
.map(annotation -> {
final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation);
final QueueProperties queueProperties = QueueProperties.builder()
.queueUrl(queueResolver.resolveQueueUrl(sqsAsyncClient, queueNameOrUrlMapper.apply(annotation)))
.build();
final String identifier = IdentifierUtils.buildIdentifierForMethod(identifierMapper.apply(annotation), bean.getClass(), method);
.findMethodAnnotation(method, this.annotationClass)
.map(annotation -> {
final SqsAsyncClient sqsAsyncClient = getSqsAsyncClient(annotation);
final QueueProperties queueProperties = QueueProperties
.builder()
.queueUrl(queueResolver.resolveQueueUrl(sqsAsyncClient, queueNameOrUrlMapper.apply(annotation)))
.build();
final String identifier = IdentifierUtils.buildIdentifierForMethod(
identifierMapper.apply(annotation),
bean.getClass(),
method
);

final Supplier<MessageProcessor> messageProcessorSupplier = () ->
decoratingMessageProcessorFactory.decorateMessageProcessor(
sqsAsyncClient,
identifier,
queueProperties,
bean,
method,
new CoreMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, method, bean)
);
final Supplier<MessageProcessor> messageProcessorSupplier = () ->
decoratingMessageProcessorFactory.decorateMessageProcessor(
sqsAsyncClient,
identifier,
queueProperties,
bean,
method,
new CoreMessageProcessor(argumentResolverService, queueProperties, sqsAsyncClient, method, bean)
);

return containerFactory.apply(AnnotationDetails.<A>builder()
.identifier(identifier)
.queueProperties(queueProperties)
.sqsAsyncClient(sqsAsyncClient)
.messageProcessorSupplier(messageProcessorSupplier)
.annotation(annotation)
.build());
});
return containerFactory.apply(
AnnotationDetails
.<A>builder()
.identifier(identifier)
.queueProperties(queueProperties)
.sqsAsyncClient(sqsAsyncClient)
.messageProcessorSupplier(messageProcessorSupplier)
.annotation(annotation)
.build()
);
});
}

private SqsAsyncClient getSqsAsyncClient(final A annotation) {
final String sqsClient = sqsClientIdentifier.apply(annotation);

if (!StringUtils.hasText(sqsClient)) {
return sqsAsyncClientProvider
.getDefaultClient()
.orElseThrow(() -> new MessageListenerContainerInitialisationException("Expected the default SQS Client but there is none")
);
.getDefaultClient()
.orElseThrow(() -> new MessageListenerContainerInitialisationException("Expected the default SQS Client but there is none")
);
}

return sqsAsyncClientProvider
.getClient(sqsClient)
.orElseThrow(() ->
new MessageListenerContainerInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")
);
.getClient(sqsClient)
.orElseThrow(() ->
new MessageListenerContainerInitialisationException("Expected a client with id '" + sqsClient + "' but none were found")
);
}

@Value
@Builder
public static class AnnotationDetails<A extends Annotation> {

public String identifier;
public SqsAsyncClient sqsAsyncClient;
public QueueProperties queueProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainer;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;

import java.lang.reflect.Method;
import java.util.Optional;

Expand All @@ -23,13 +22,14 @@ public class BasicAnnotationMessageListenerContainerFactory implements MessageLi
private final AnnotationMessageListenerContainerFactory<QueueListener> delegate;

public BasicAnnotationMessageListenerContainerFactory(
final ArgumentResolverService argumentResolverService,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final QueueResolver queueResolver,
final QueueListenerParser queueListenerParser,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
final ArgumentResolverService argumentResolverService,
final SqsAsyncClientProvider sqsAsyncClientProvider,
final QueueResolver queueResolver,
final QueueListenerParser queueListenerParser,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
) {
this.delegate = new AnnotationMessageListenerContainerFactory<>(
this.delegate =
new AnnotationMessageListenerContainerFactory<>(
QueueListener.class,
QueueListener::identifier,
QueueListener::sqsClient,
Expand All @@ -38,21 +38,22 @@ public BasicAnnotationMessageListenerContainerFactory(
sqsAsyncClientProvider,
decoratingMessageProcessorFactory,
argumentResolverService,
(details) -> {
details -> {
final BatchingMessageListenerContainerProperties properties = queueListenerParser.parse(details.annotation);
return new BatchingMessageListenerContainer(
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
);
}
);
);
}

@Override
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method) throws MessageListenerContainerInitialisationException {
public Optional<MessageListenerContainer> buildContainer(final Object bean, final Method method)
throws MessageListenerContainerInitialisationException {
return this.delegate.buildContainer(bean, method);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBroker;
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.processor.CoreMessageProcessor;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetriever;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import com.jashmore.documentation.annotations.Nullable;
import com.jashmore.documentation.annotations.Positive;
import com.jashmore.documentation.annotations.PositiveOrZero;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.container.batching.BatchingMessageListenerContainerProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.util.string.StringUtils;

import java.time.Duration;
import java.util.function.Supplier;

Expand Down Expand Up @@ -153,7 +152,8 @@ protected Supplier<Duration> batchingPeriodSupplier(final QueueListener annotati
if (!StringUtils.hasText(annotation.batchingPeriodInMsString())) {
batchingPeriod = Duration.ofMillis(annotation.batchingPeriodInMs());
} else {
batchingPeriod = Duration.ofMillis(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchingPeriodInMsString())));
batchingPeriod =
Duration.ofMillis(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.batchingPeriodInMsString())));
}
return () -> batchingPeriod;
}
Expand Down Expand Up @@ -192,7 +192,9 @@ protected Supplier<Duration> messageVisibilityTimeoutSupplier(final QueueListene
}
} else {
messageVisibilityTimeout =
Duration.ofSeconds(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
Duration.ofSeconds(
Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString()))
);
}

return () -> messageVisibilityTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

import com.jashmore.sqs.annotations.container.AnnotationMessageListenerContainerFactory;
import com.jashmore.sqs.argument.ArgumentResolverService;
import com.jashmore.sqs.client.QueueResolver;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.container.MessageListenerContainerInitialisationException;
import com.jashmore.sqs.container.fifo.FifoMessageListenerContainer;
import com.jashmore.sqs.container.fifo.FifoMessageListenerContainerProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainerFactory;
import com.jashmore.sqs.processor.DecoratingMessageProcessorFactory;
import com.jashmore.sqs.client.QueueResolver;

import java.lang.reflect.Method;
import java.util.Optional;

Expand All @@ -36,7 +35,8 @@ public FifoAnnotationMessageListenerContainerFactory(
final FifoQueueListenerParser annotationParser,
final DecoratingMessageProcessorFactory decoratingMessageProcessorFactory
) {
this.delegate = new AnnotationMessageListenerContainerFactory<>(
this.delegate =
new AnnotationMessageListenerContainerFactory<>(
FifoQueueListener.class,
FifoQueueListener::identifier,
FifoQueueListener::sqsClient,
Expand All @@ -45,21 +45,22 @@ public FifoAnnotationMessageListenerContainerFactory(
sqsAsyncClientProvider,
decoratingMessageProcessorFactory,
argumentResolverService,
(details) -> {
details -> {
final FifoMessageListenerContainerProperties properties = annotationParser.parse(details.annotation);
return new FifoMessageListenerContainer(
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
details.identifier,
details.queueProperties,
details.sqsAsyncClient,
details.messageProcessorSupplier,
properties
);
}
);
);
}

@Override
public Optional<MessageListenerContainer> buildContainer(Object bean, Method method) throws MessageListenerContainerInitialisationException {
public Optional<MessageListenerContainer> buildContainer(Object bean, Method method)
throws MessageListenerContainerInitialisationException {
return this.delegate.buildContainer(bean, method);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import com.jashmore.sqs.QueueProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.aws.AwsConstants;
import com.jashmore.sqs.broker.grouping.GroupingMessageBroker;
import com.jashmore.sqs.broker.grouping.GroupingMessageBrokerProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import com.jashmore.sqs.container.MessageListenerContainer;
import com.jashmore.sqs.container.fifo.FifoMessageListenerContainerProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverProperties;
import com.jashmore.sqs.client.SqsAsyncClientProvider;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.jashmore.sqs.annotations.core.fifo;

import com.jashmore.documentation.annotations.Nullable;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.container.fifo.FifoMessageListenerContainerProperties;
import com.jashmore.sqs.placeholder.PlaceholderResolver;
import com.jashmore.sqs.util.string.StringUtils;

import java.time.Duration;
import java.util.function.Supplier;

Expand Down Expand Up @@ -140,7 +139,8 @@ protected Supplier<Integer> maximumCachedMessageGroupsSupplier(final FifoQueueLi
if (!StringUtils.hasText(annotation.maximumCachedMessageGroupsString())) {
maximumCachedMessageGroups = annotation.maximumCachedMessageGroups();
} else {
maximumCachedMessageGroups = Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.maximumCachedMessageGroupsString()));
maximumCachedMessageGroups =
Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.maximumCachedMessageGroupsString()));
}
return () -> maximumCachedMessageGroups;
}
Expand Down Expand Up @@ -178,7 +178,9 @@ protected Supplier<Duration> messageVisibilityTimeoutSupplier(final FifoQueueLis
}
} else {
messageVisibilityTimeout =
Duration.ofSeconds(Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString())));
Duration.ofSeconds(
Integer.parseInt(placeholderResolver.resolvePlaceholders(annotation.messageVisibilityTimeoutInSecondsString()))
);
}

return () -> messageVisibilityTimeout;
Expand Down
Loading

0 comments on commit aeca30b

Please sign in to comment.