import { interval, Observable, timer, Subject, EMPTY, Subscription } from 'rxjs';
import { Injectable } from '@angular/core';
import { QueueService } from '../queue.service';
import { AuthenticationService } from '../../services/authentication.service';
import { HttpClient } from '@angular/common/http';
import { catchError, finalize, shareReplay, switchMap, takeUntil } from 'rxjs/operators';
import { delayedRetry } from '../../shared/generic-retry-strategy';

@Injectable()
export class PollService extends QueueService {
  private intervalTime = 2000;
  private previousSubscription: Subscription;

  constructor(
    private http: HttpClient,
    private authenticationService: AuthenticationService
  ) {
    super(authenticationService);
  }

  push(obj: any) { }

  pull(url: string): Observable<any> {


    // Unsubscribe from previous subscription if it exists and is not closed
    if (this.previousSubscription && !this.previousSubscription.closed) {
      this.previousSubscription.unsubscribe();
      this.previousSubscription = null; // Clear previous subscription
    }

    const options = this.getOptions();
    const stopPolling$ = new Subject<void>();

    // Create the new subscription stream
    const polling$ = interval(this.intervalTime)
      .pipe(
        takeUntil(stopPolling$),
        switchMap(() => this.http.get(url, options)),
        delayedRetry(
          1000, this.maxRetries,
          () => {
            this.retryTimerValue.next(3000);
            return timer(3000);
          },
          () => {
            const retrying = this.retrying.getValue();
            if (!retrying) {
              this.retrying.next(true);
            }
            console.log('retrying...');
            let num_retries = this.retries.getValue();
            this.retries.next(++num_retries);
          }
        ),
        catchError(error => {
          if (error.status === 404) {
            this.forceLogout.next(true);
          }

          console.log("logout", error);
          throw error;
        }),
        shareReplay(),
        finalize(() => {
          console.log('Doing something at the end here.');
          stopPolling$.next(); // Notify to stop polling
          stopPolling$.complete(); // Complete the stopPolling$ subject
        })
      );

    // Assign the subscription to the previousSubscription property
    this.previousSubscription = polling$.subscribe();

    // Return the polling observable
    return polling$;
  }
}
