# Copyright 2015 by Justin MacCallum, Alberto Perez, Ken Dill
#

import numpy as np  # type: ignore
import math
from collections import namedtuple
from typing import List, Union, Tuple, Optional

# named tuple to hold the results

"""

Repeat adaptation on a regular schedule with an optional burn-in and
"""

[docs]    def __init__(
self,
growth_factor: float,
burn_in: int,
stop_after: Optional[int] = None,
) -> None:
"""

Args:
growth_factor: increase adapt_every by a factor of growth_factor
burn_in: number of steps to ignore at the beginning
"""
self.growth_factor = growth_factor
self.burn_in: Optional[int] = burn_in
self.stop_after = stop_after

"""

Args:
step: the current simulation step
Returns:
indicating if adaptation or resetting is necessary

"""
if self.stop_after is not None:
if step > self.stop_after:

if self.burn_in:
if step >= self.burn_in:
self.burn_in = None
else:
else:
else:
return result

class _AcceptanceCounter:
def __init__(self, n_replicas: int) -> None:
self.n_replicas = n_replicas
self.successes = np.zeros(self.n_replicas - 1)
self.attemps = np.zeros(self.n_replicas - 1)
self.reset()

def reset(self) -> None:
"""
Reset statistics
"""
self.successes = np.zeros(self.n_replicas - 1)
self.attempts = np.zeros(self.n_replicas - 1)

def update(self, i: int, accepted: bool) -> None:
"""
Update statistics

Args:
i: lower replica index of pair with attempted swap
accepted: :const:`True` if swap was successful
"""
assert i in range(self.n_replicas - 1)
self.attempts[i] += 1
if accepted:
self.successes[i] += 1

def get_acceptance_probabilities(self) -> np.ndarray:
"""
Get acceptance probabilities
"""
return self.successes / (self.attempts + 1e-9)

"""
"""

[docs]    def __init__(self, n_replicas: int) -> None:
"""

Args:
n_replicas: number of replicas
"""
_AcceptanceCounter.__init__(self, n_replicas)
self.reset()

[docs]    def update(self, i: int, accepted: bool) -> None:
_AcceptanceCounter.update(self, i, accepted)

def adapt(self, previous_lambdas: List[float], step: int) -> List[float]:
return previous_lambdas

[docs]    def reset(self) -> None:
_AcceptanceCounter.reset(self)

"""
Adaptor based on making acceptance rates uniform
"""

accept_probs: np.ndarray
t_lens: np.ndarray

[docs]    def __init__(
self,
n_replicas: int,
min_acc_prob: float = 0.1,
) -> None:
"""

Args:
n_replicas: number of replicas
min_acc_prob: floor on the acceptance probability used for adaptation
"""
_AcceptanceCounter.__init__(self, n_replicas)

self.min_acc_prob = min_acc_prob
self.reset()

[docs]    def update(self, i: int, accepted: bool) -> None:
"""
Update statistics

Args:
i: lower replica index of pair with attempted swap
accepted: :const:`True` if swap was successful
"""
_AcceptanceCounter.update(self, i, accepted)

[docs]    def adapt(self, previous_lambdas: List[float], step: int) -> List[float]:
"""
Compute new optimal values of lambda.

Args:
previous_lambdas: the previous lambda values
step: the current simulation step

Returns:
the new, optimized lambda values
"""

self._compute_accept_probs()
self._compute_t_len()

# put the t_lens on a grid
lambda_grid = np.linspace(0.0, 1.0, 5000)
t_lens = np.interp(lambda_grid, previous_lambdas, self.t_lens)

# compute the desired t_lens based on equal spacing
even_spacing = np.linspace(0, t_lens[-1], self.n_replicas)

# compute the values of lambda that will give the desired evenly
# spaced t_lens
new_lambdas = list(np.interp(even_spacing[1:-1], t_lens, lambda_grid))
new_lambdas = [0.0] + new_lambdas + [1.0]
else:
new_lambdas = previous_lambdas

self.reset()

return new_lambdas

[docs]    def reset(self) -> None:
"""

Resets all internal counters and statistics to zero.
"""
_AcceptanceCounter.reset(self)

def _compute_accept_probs(self) -> None:
# default to 50 percent if there hasn't been a trial
self.successes[self.attempts == 0] = 1.0
self.attempts[self.attempts == 0] = 2.0

self.accept_probs = self.successes / self.attempts

# set minimum percentage
index = self.accept_probs < self.min_acc_prob
self.accept_probs[index] = self.min_acc_prob

def _compute_t_len(self) -> None:
# compute the t_len between adjacent pairs
delta_ts = [math.sqrt(-2.0 * math.log(acc)) for acc in self.accept_probs]

# compute a running total
t_lens = [0.0]
total = 0.0
for dt in delta_ts:
total += dt
t_lens.append(total)
self.t_lens = np.array(t_lens)

"""
An adaptor that switches between two strategies at a specified time
"""
[docs]    def __init__(
) -> None:
"""

Args:
"""
self.switching_time = switching_time

[docs]    def update(self, i: int, accepted: bool) -> None:
"""
Update statistics

Args:
i: lower replica index of pair with attempted swap
accepted: :const:`True` if swap was successful
"""

[docs]    def adapt(self, previous_lambdas: List[float], step: int) -> List[float]:
"""
Compute new optimal values of lambda.

Args:
previous_lambdas: the previous lambda values
step: the current simulation step

Returns:
the new, optimized lambda values
"""
if step <= self.switching_time:
return lambdas_from_first
else:
return lambdas_from_second

[docs]    def reset(self) -> None:
"""