package org.mule.extension.s3.internal.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflexity.software.qflex.mule.policies.jwt.JwtGenerator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import org.mule.extension.s3.api.model.ApiOwner;
import org.mule.extension.s3.api.response.S3ObjectTriggerResponse;
import org.mule.extension.s3.internal.config.S3Configuration;
import org.mule.extension.s3.internal.connection.S3Connection;
import org.mule.extension.s3.internal.error.exception.NotificationConfigurationNotValidException;
import org.mule.extension.s3.internal.error.exception.S3RuntimeException;
import org.mule.extension.s3.internal.source.model.Message;
import org.mule.extension.s3.internal.source.model.Record;
import org.mule.extension.s3.internal.source.param.SourceNotificationConfiguration;
import org.mule.extension.s3.internal.utils.SafeUtils;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.core.api.lifecycle.StartException;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.Event;
import software.amazon.awssdk.services.s3.model.FilterRule;
import software.amazon.awssdk.services.s3.model.FilterRuleName;
import software.amazon.awssdk.services.s3.model.GetBucketNotificationConfigurationResponse;
import software.amazon.awssdk.services.s3.model.NotificationConfiguration;
import software.amazon.awssdk.services.s3.model.NotificationConfigurationFilter;
import software.amazon.awssdk.services.s3.model.QueueConfiguration;
import software.amazon.awssdk.services.s3.model.S3KeyFilter;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

@ClusterSupport(SourceClusterSupport.NOT_SUPPORTED)
/* loaded from: input_file:repository/com/mulesoft/connectors/mule-amazon-s3-connector/6.0.2/mule-amazon-s3-connector-6.0.2-mule-plugin.jar:org/mule/extension/s3/internal/source/AbstractObjectSource.class */
public abstract class AbstractObjectSource extends PollingSource<S3ObjectTriggerResponse, Void> {
    private static final String DEFAULT_NOTIFICATION_QUEUE_NAME = "mule-s3-trigger-%s-queue-%s-folder-%s";
    private static final String DEFAULT_NOTIFICATION_CONFIGURATION_NAME = "mule-s3-trigger-%s-object-conf-%s-folder-%s";
    private static final String POLICY_STRING = "{\n \"Version\": \"2012-10-17\",\n \"Id\": \"__default_policy_ID\",\n \"Statement\": [\n  {\n   \"Sid\": \"__owner_statement\",\n   \"Effect\": \"Allow\",\n   \"Principal\": {\n    \"Service\": \"s3.amazonaws.com\"\n   },\n   \"Action\": [\n    \"SQS:SendMessage\"\n   ],\n   \"Resource\": \"%s\",\n   \"Condition\": {\n      \"ArnLike\": { \"aws:SourceArn\": \"arn:aws:s3:*:*:%s\" }\n   }\n  }\n ]\n}";
    private S3Connection connection;

    @Config
    private S3Configuration config;

    @Connection
    private ConnectionProvider<S3Connection> connectionProvider;

    @Parameter
    @Summary("The name of the bucket to which the source listens for changes")
    private String bucketName;

    @Optional
    @Parameter
    @Summary("The name of the folder in the specified bucket")
    private String folder;

    @ParameterGroup(name = "Notification Configuration")
    private SourceNotificationConfiguration notificationConfiguration;
    private String notificationQueueUrl;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractObjectSource.class);
    private static final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private static HashMap<String, List<QueueConfiguration>> notificationConfigurationsPerBucket = new HashMap<>();
    private static Semaphore mutex = new Semaphore(1);

    abstract String triggerType();

    abstract Event triggerEvent();

    protected void doStart() throws MuleException {
        try {
            this.connection = (S3Connection) this.connectionProvider.connect();
            try {
                mutex.acquire();
                initializeComponentNotificationConfigurationQueue();
                initializeConnectorNotificationConfiguration();
                mutex.release();
                if (!mutex.hasQueuedThreads()) {
                    initializeAllNotificationConfigurationsOnAws();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        } catch (RuntimeException e2) {
            throw new StartException(e2.getCause(), (Startable) null);
        }
    }

    private void initializeComponentNotificationConfigurationQueue() {
        if (StringUtils.isEmpty(this.notificationConfiguration.getConfigurationName())) {
            String str = (String) java.util.Optional.ofNullable(this.notificationConfiguration.getQueueName()).filter(str2 -> {
                return !str2.isEmpty();
            }).orElse(String.format(DEFAULT_NOTIFICATION_QUEUE_NAME, triggerType(), this.bucketName, this.folder));
            this.connection.getQueueUrl(builder -> {
                builder.queueName(str);
            }).thenApply(getQueueUrlResponse -> {
                String queueUrl = getQueueUrlResponse.queueUrl();
                this.notificationQueueUrl = queueUrl;
                return queueUrl;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                if (th == null || !(th.getCause().getCause() instanceof QueueDoesNotExistException)) {
                    return null;
                }
                logger.debug("Specified queue '{}' for notification does not exists. Default queue created with name", str);
                this.connection.createQueue(builder2 -> {
                    builder2.queueName(str);
                }).whenComplete((createQueueResponse, th) -> {
                    logAndThrowIfNeeded(th, "Exception thrown while creating queue " + str);
                    this.notificationQueueUrl = createQueueResponse.queueUrl();
                    logger.debug("Setting policies for default queue '{}' ...", this.notificationQueueUrl);
                    this.connection.getQueueAttributes(builder3 -> {
                        builder3.queueUrl(this.notificationQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN);
                    }).whenComplete((getQueueAttributesResponse, th) -> {
                        this.connection.setQueueAttributes(builder4 -> {
                            builder4.queueUrl(this.notificationQueueUrl).attributes(Collections.singletonMap(QueueAttributeName.POLICY, String.format(POLICY_STRING, getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN), this.bucketName)));
                        }).join();
                    }).join();
                }).join();
                return null;
            }).join();
        }
    }

    private synchronized void initializeAllNotificationConfigurationsOnAws() {
        for (String str : notificationConfigurationsPerBucket.keySet()) {
            logger.debug("processing configurations for bucket " + str);
            this.connection.getBucketNotificationConfiguration(builder -> {
                builder.bucket(str);
            }).whenComplete((getBucketNotificationConfigurationResponse, th) -> {
                logAndThrowIfNeeded(th, "Exception thrown while calling getBucketNotificationConfiguration");
                logger.debug("Bucket with existing configurations: " + getBucketNotificationConfigurationResponse.hasQueueConfigurations());
                List<QueueConfiguration> list = notificationConfigurationsPerBucket.get(str);
                logger.debug("bucket " + str + " has " + list.size() + " notification configurations to add");
                list.addAll(getBucketNotificationConfigurationResponse.queueConfigurations());
                this.connection.putBucketNotificationConfiguration(builder2 -> {
                    builder2.bucket(str).notificationConfiguration((NotificationConfiguration) NotificationConfiguration.builder().queueConfigurations(list).mo8741build());
                }).whenComplete((putBucketNotificationConfigurationResponse, th) -> {
                    logAndThrowIfNeeded(th, "Exception thrown while calling putBucketNotificationConfiguration");
                    logger.debug("Notification configurations successfully added for bucket " + str);
                }).join();
            }).join();
        }
    }

    private void logAndThrowIfNeeded(Throwable th, String str) {
        if (th != null) {
            logger.error(str + ": " + th.getMessage());
            throw new RuntimeException(th);
        }
    }

    public void poll(PollContext<S3ObjectTriggerResponse, Void> pollContext) {
        if (pollContext.isSourceStopping()) {
            return;
        }
        try {
            logger.debug("Request to receive messages from SQS from queue '{}' ...", this.notificationQueueUrl);
            this.connection.receiveMessage(builder -> {
                builder.queueUrl(this.notificationQueueUrl).maxNumberOfMessages(10).visibilityTimeout(10).waitTimeSeconds(1);
            }).whenComplete((receiveMessageResponse, th) -> {
                logAndThrowIfNeeded(th, "Exception while calling receiveMessage");
                if (receiveMessageResponse.hasMessages()) {
                    logger.debug("Message containing messages, will be processed...");
                    receiveMessageResponse.messages().forEach(message -> {
                        try {
                            List<Record> records = ((Message) mapper.readValue(message.body(), Message.class)).getRecords();
                            if (records != null && !records.isEmpty()) {
                                logger.debug("Message containing records, will be processed...");
                                records.forEach(record -> {
                                    pollContext.accept(pollItem -> {
                                        pollItem.setResult(Result.builder().output(responseFromRecord(record)).build()).setId(message.messageId());
                                    });
                                });
                            }
                            this.connection.deleteMessage(builder2 -> {
                                builder2.queueUrl(this.notificationQueueUrl).receiptHandle(message.receiptHandle());
                            }).join();
                        } catch (JsonProcessingException e) {
                            throw new S3RuntimeException("Failed to process record from received event notification ", e);
                        }
                    });
                }
            }).join();
        } catch (Exception e) {
            logger.debug("Handle message request failed with exception: ", (Throwable) e);
            pollContext.onConnectionException(new ConnectionException(e));
        }
    }

    protected void doStop() {
        this.connectionProvider.disconnect(this.connection);
        notificationConfigurationsPerBucket = new HashMap<>();
    }

    public void onRejectedItem(Result<S3ObjectTriggerResponse, Void> result, SourceCallbackContext sourceCallbackContext) {
        logger.debug("Event notification was rejected.");
    }

    private void initializeConnectorNotificationConfiguration() {
        String configurationName = this.notificationConfiguration.getConfigurationName();
        this.connection.getBucketNotificationConfiguration(builder -> {
            builder.bucket(this.bucketName);
        }).whenComplete((getBucketNotificationConfigurationResponse, th) -> {
            logAndThrowIfNeeded(th, "Error calling getBucketNotificationConfiguration");
            if (!StringUtils.isEmpty(configurationName)) {
                logger.debug("Notification configuration is specified by user: '{}'. Validating...", configurationName);
                this.notificationQueueUrl = getConfiguredQueueArn(getBucketNotificationConfigurationResponse, configurationName).orElseThrow(() -> {
                    return new NotificationConfigurationNotValidException(String.format("Specified bucket configuration is not valid, please check the NAME, QUEUE or EVENT TYPE is of TYPE '%s'", triggerEvent()));
                });
                logger.debug("Valid queue notification configuration :: '{}' exists and will be used with queue url :: '{}' ", configurationName, this.notificationQueueUrl);
            } else {
                String format = String.format(DEFAULT_NOTIFICATION_CONFIGURATION_NAME, triggerType(), this.bucketName, this.folder);
                logger.debug("Notification configuration not specified by user. Using default '{}'.", format);
                this.notificationQueueUrl = getConfiguredQueueArn(getBucketNotificationConfigurationResponse, format).orElseGet(() -> {
                    logger.debug("Default notification configuration does not exist yet. Creating new ...");
                    return createAndSetDefaultQueueConfiguration(format);
                });
                logger.debug("Default notification configuration created :: '{}' and will be used with queue url :: '{}'", format, this.notificationQueueUrl);
            }
        }).join();
    }

    private java.util.Optional<String> getConfiguredQueueArn(GetBucketNotificationConfigurationResponse getBucketNotificationConfigurationResponse, String str) {
        return getBucketNotificationConfigurationResponse.queueConfigurations().stream().filter(queueConfiguration -> {
            return queueConfiguration.id().equalsIgnoreCase(str);
        }).findAny().filter(queueConfiguration2 -> {
            return queueConfiguration2.events().contains(triggerEvent());
        }).map((v0) -> {
            return v0.queueArn();
        }).map(this::generateUrlFromARN);
    }

    private String createAndSetDefaultQueueConfiguration(String str) {
        this.connection.getQueueAttributes(builder -> {
            builder.queueUrl(this.notificationQueueUrl).attributeNames(QueueAttributeName.QUEUE_ARN);
        }).whenComplete((getQueueAttributesResponse, th) -> {
            logAndThrowIfNeeded(th, "Exception while calling getQueueAttributes");
            QueueConfiguration queueConfiguration = (QueueConfiguration) QueueConfiguration.builder().id(str).queueArn(getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN)).events(Collections.singleton(triggerEvent())).filter(this.folder != null ? (NotificationConfigurationFilter) NotificationConfigurationFilter.builder().key((S3KeyFilter) S3KeyFilter.builder().filterRules((FilterRule) FilterRule.builder().name(FilterRuleName.PREFIX).value(getSanitizedFolderName()).mo8741build()).mo8741build()).mo8741build() : null).mo8741build();
            if (!notificationConfigurationsPerBucket.containsKey(this.bucketName)) {
                notificationConfigurationsPerBucket.put(this.bucketName, new ArrayList());
            }
            notificationConfigurationsPerBucket.get(this.bucketName).add(queueConfiguration);
            this.notificationQueueUrl = generateUrlFromARN(queueConfiguration.queueArn());
        }).join();
        return this.notificationQueueUrl;
    }

    private String getSanitizedFolderName() {
        return this.folder.endsWith("/") ? this.folder : this.folder + "/";
    }

    private String generateUrlFromARN(String str) {
        String[] split = str.split(":");
        return "https://" + split[2] + JwtGenerator.JWTSeparator + split[3] + ".amazonaws.com/" + split[4] + "/" + split[5];
    }

    private S3ObjectTriggerResponse responseFromRecord(Record record) {
        S3ObjectTriggerResponse.S3ObjectTriggerResponseBuilder builder = S3ObjectTriggerResponse.builder();
        java.util.Optional.ofNullable(record.getS3()).ifPresent(s3 -> {
            java.util.Optional.ofNullable(s3.getBucket()).ifPresent(bucket -> {
                builder.bucketName(bucket.getName()).owner(ApiOwner.builder().id((String) SafeUtils.callNotNull(bucket.getOwnerIdentity(), (v0) -> {
                    return v0.getPrincipalId();
                })).build());
            });
            java.util.Optional.ofNullable(s3.getObject()).ifPresent(s3Object -> {
                builder.key(s3Object.getKey()).eTag(s3Object.getETag()).size(s3Object.getSize()).versionId(s3Object.getVersionId()).sequencer(s3Object.getSequencer());
            });
        });
        return builder.build();
    }
}
