// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
	"context"
	"errors"
	"sync"
	"time"

	"github.com/rs/zerolog/log"
	"golang.org/x/exp/slices"
)

var (
	ErrClosed = errors.New("pubsub: subscriber is closed")
)

type InMemory struct {
	config   Config
	mutex    sync.Mutex
	registry []*inMemorySubscriber
}

// NewInMemory create an instance of memory pubsub implementation.
func NewInMemory(options ...Option) *InMemory {
	config := Config{
		App:            "app",
		Namespace:      "default",
		HealthInterval: 3 * time.Second,
		SendTimeout:    60,
		ChannelSize:    100,
	}

	for _, f := range options {
		f.Apply(&config)
	}
	return &InMemory{
		config:   config,
		registry: make([]*inMemorySubscriber, 0, 16),
	}
}

// Subscribe consumer to process the event with payload.
func (r *InMemory) Subscribe(
	ctx context.Context,
	topic string,
	handler func(payload []byte) error,
	options ...SubscribeOption,
) Consumer {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	config := SubscribeConfig{
		topics:      make([]string, 0, 8),
		app:         r.config.App,
		namespace:   r.config.Namespace,
		sendTimeout: r.config.SendTimeout,
		channelSize: r.config.ChannelSize,
	}

	for _, f := range options {
		f.Apply(&config)
	}

	// create subscriber and map it to the registry
	subscriber := &inMemorySubscriber{
		config:  &config,
		handler: handler,
	}

	config.topics = append(config.topics, topic)
	subscriber.topics = subscriber.formatTopics(config.topics...)

	// start subscriber
	go subscriber.start(ctx)

	// register subscriber
	r.registry = append(r.registry, subscriber)

	return subscriber
}

// Publish event to message broker with payload.
func (r *InMemory) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error {
	if len(r.registry) == 0 {
		log.Ctx(ctx).Warn().Msg("in pubsub Publish: no subscribers registered")
		return nil
	}
	pubConfig := PublishConfig{
		app:       r.config.App,
		namespace: r.config.Namespace,
	}
	for _, f := range opts {
		f.Apply(&pubConfig)
	}

	topic = formatTopic(pubConfig.app, pubConfig.namespace, topic)
	wg := sync.WaitGroup{}
	for _, sub := range r.registry {
		if slices.Contains(sub.topics, topic) && !sub.isClosed() {
			wg.Add(1)
			go func(subscriber *inMemorySubscriber) {
				defer wg.Done()
				// timer is based on subscriber data
				t := time.NewTimer(subscriber.config.sendTimeout)
				defer t.Stop()
				select {
				case <-ctx.Done():
					return
				case subscriber.channel <- payload:
					log.Ctx(ctx).Trace().Msgf("in pubsub Publish: message %v sent to topic %s", string(payload), topic)
				case <-t.C:
					// channel is full for topic (message is dropped)
					log.Ctx(ctx).Warn().Msgf("in pubsub Publish: %s topic is full for %s (message is dropped)",
						topic, subscriber.config.sendTimeout)
				}
			}(sub)
		}
	}

	// Wait for all subscribers to complete
	// Otherwise, we might fail notifying some subscribers due to context completion.
	wg.Wait()

	return nil
}

func (r *InMemory) Close(_ context.Context) error {
	for _, subscriber := range r.registry {
		if err := subscriber.Close(); err != nil {
			return err
		}
	}
	return nil
}

type inMemorySubscriber struct {
	config  *SubscribeConfig
	handler func([]byte) error
	channel chan []byte
	once    sync.Once
	mutex   sync.RWMutex
	topics  []string
	closed  bool
}

func (s *inMemorySubscriber) start(ctx context.Context) {
	s.channel = make(chan []byte, s.config.channelSize)
	for {
		select {
		case <-ctx.Done():
			return
		case msg, ok := <-s.channel:
			if !ok {
				return
			}
			if err := s.handler(msg); err != nil {
				// TODO: bump err to caller
				log.Ctx(ctx).Err(err).Msgf("in pubsub start: error while running handler for topic")
			}
		}
	}
}

func (s *inMemorySubscriber) Subscribe(_ context.Context, topics ...string) error {
	s.mutex.RLock()
	defer s.mutex.RUnlock()
	topics = s.formatTopics(topics...)
	for _, ch := range topics {
		if slices.Contains(s.topics, ch) {
			continue
		}
		s.topics = append(s.topics, ch)
	}
	return nil
}

func (s *inMemorySubscriber) Unsubscribe(_ context.Context, topics ...string) error {
	s.mutex.RLock()
	defer s.mutex.RUnlock()
	topics = s.formatTopics(topics...)
	for i, ch := range topics {
		if slices.Contains(s.topics, ch) {
			s.topics[i] = s.topics[len(s.topics)-1]
			s.topics = s.topics[:len(s.topics)-1]
		}
	}
	return nil
}

func (s *inMemorySubscriber) Close() error {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	if s.closed {
		return ErrClosed
	}
	s.closed = true
	s.once.Do(func() {
		close(s.channel)
	})
	return nil
}

func (s *inMemorySubscriber) isClosed() bool {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return s.closed
}

func (s *inMemorySubscriber) formatTopics(topics ...string) []string {
	result := make([]string, len(topics))
	for i, topic := range topics {
		result[i] = formatTopic(s.config.app, s.config.namespace, topic)
	}
	return result
}