Source code for stardate.stardate

import os
import numpy as np
import pandas as pd
import h5py
import tqdm
from .lhf import lnprob, lnlike, nll# , ptform
from isochrones import StarModel, SingleStarModel, get_ichrone
import emcee
import scipy.optimize as spo
# from dynesty import NestedSampler

from isochrones.priors import FlatPrior
from isochrones.mist import MIST_Isochrone
bands = ["B", "V", "J", "H", "K", "BP", "RP", "G"]
# mist = MIST_Isochrone(bands)
mist = get_ichrone("mist", bands=bands)
from isochrones.priors import GaussianPrior


[docs]class Star(object): """The star object. Creates the star object which will be set up with stellar parameters and instructions for saving the MCMC results. Args: iso_params (dict): A dictionary containing all available photometric and spectroscopic parameters for a star, as well as its parallax. All parameters should also have associated uncertainties. This dictionary should be similar to the standard one created for isochrones.py. prot (Optional[float]): The rotation period of the star in days. prot_err (Optional[float]): The uncertainty on the stellar rotation period in days. Av (Optional[float]): The v-band extinction (if known). Av_err (Optional[float]): The v-band extinction uncertainty (if known). savedir (Optional[str]): The name of the directory where the samples will be saved. Default is the current working directory. filename (Optional[str]): The name of the h5 file which the posterior samples will be saved in. """ def __init__(self, iso_params, prot=None, prot_err=None, Av=None, Av_err=None, savedir=".", filename="samples"): if prot is not None: if prot <= 0.: print("WARNING: rotation period <= 0, isochrone likelihood" \ "function will be used") if prot_err is not None: if prot_err <= 0.: print("WARNING: rotation period uncertainty <= 0, isochrone" \ "likelihood function will be used") self.iso_params = iso_params self.prot = prot self.prot_err = prot_err self.Av_mean = Av self.Av_sigma = Av_err self.savedir = savedir self.filename = filename # def dynesty_fit(self, iso_only=False, gyro_only=False, rossby=True, # model="praesepe"): # """ # Run MCMC on a star using dynasty. # Explore the posterior probability density function of the stellar # parameters using MCMC (via dynasty). # Args: # rossby (Optional[bool]): If True, magnetic braking will cease # after Ro = 2. Default is True. # iso_only (Optional[bool]): If true only the isochronal likelihood # function will be used. # gyro_only (Optional[bool]): If true only the gyrochronal # likelihood function will be used. Beware: this may not do what # you might assume it does... In general this setting is not # currently very useful! # rossby (Optional[bool]): If True, magnetic braking will cease # after Ro = 2. Default is True. # model (Optional[bool]): The gyrochronology model to use. The # default is "praesepe" (the Praesepe-based model). Can also be # "angus15" for the Angus et al. (2015) model. # """ # mod = StarModel(mist, **self.iso_params) # StarModel isochrones obj # # mod.set_prior(age=FlatPrior(8, 10.14)) # # lnlike arguments # args = [mod, self.prot, self.prot_err, iso_only, gyro_only, rossby, # model] # self.args = args # ndim = 5 # sampler = NestedSampler(lnlike, ptform, ndim, logl_args=args, # nlive=1500, bound="balls") # sampler.run_nested() # # normalized weights # self.weights = np.exp(sampler.results.logwt # - sampler.results.logz[-1]) # self.samples = sampler.results.samples # df = pd.DataFrame({"samples": [self.samples], # "weights": [self.weights]}) # fname = "{0}/{1}.h5".format(self.savedir, self.filename) # df.to_hdf(fname, key="samples", mode="w")
[docs] def fit(self, inits=[329.58, 9.5596, -.0478, 260, .0045], nwalkers=50, max_n=100000, thin_by=100, burnin=0, iso_only=False, gyro_only=False, optimize=False, rossby=True, model="praesepe", seed=None, save_samples=False): """Run MCMC on a star using emcee. Explore the posterior probability density function of the stellar parameters using MCMC (via emcee). Args: inits (Optional[array-like]): A list of initial values to use for EEP, age (in log10[yrs]), feh, distance (in pc) and Av. nwalkers (Optional[int]): The number of walkers to use with emcee. The default is 50. max_n (Optional[int]): The maximum number of samples to obtain (although not necessarily to save -- see thin_by). The default is 100000. thin_by (Optional[int]): Only one in every thin_by samples will be saved. The default is 100. Set = 1 to save every sample (note this substantially slows down the MCMC process because of the additional I/O time. burnin (Optional[int]): The number of SAVED samples to throw away when accessing the results. This number cannot exceed the number of saved samples (which is max_n/thin_by). Default = 0. iso_only (Optional[bool]): If true only the isochronal likelihood function will be used. gyro_only (Optional[bool]): If true only the gyrochronal likelihood function will be used. Beware: this may not do what you might assume it does... In general this setting is not currently very useful! optimize (Optional[bool]): If True, initial parameters will be found via optimization. Default is False. rossby (Optional[bool]): If True, magnetic braking will cease after Ro = 2. Default is True. model (Optional[bool]): The gyrochronology model to use. The default is "praesepe" (the Praesepe-based model). Can also be "angus15" for the Angus et al. (2015) model. seed (Optional[int]): The random number seed. Set this if you want to regenerate exactly the same samples each time. save_samples (Optional[bool]): saving samples is the computational bottleneck. If you want to save time and don't need to save the samples using the HDF5 backend, set this to False. """ self.max_n = max_n self.nwalkers = nwalkers self.thin_by = thin_by self.save_samples = save_samples if burnin > max_n/thin_by: burnin = int(max_n/thin_by/3) print("Automatically setting burn in to {}".format(burnin)) p_init = [inits[0], inits[1], inits[2], np.log(inits[3]), inits[4]] self.p_init = p_init if seed is not None: np.random.seed(seed) # Create the directory if it doesn't already exist. if save_samples: if not os.path.exists(self.savedir): os.makedirs(self.savedir) # Set up the backend # Don't forget to clear it in case the file already exists ndim = 5 if save_samples: fn = "{0}/{1}.h5".format(self.savedir, self.filename) backend = emcee.backends.HDFBackend(fn) backend.reset(nwalkers, ndim) self.backend = backend # Set up the StarModel object needed to calculate the likelihood. mod = SingleStarModel(mist, **self.iso_params) # StarModel isochrones obj # mod.set_prior(age=FlatPrior(bounds=(8, 10.14))) # Set up a Gaussian prior with the extinction, if provided. if self.Av_mean is not None and self.Av_sigma is not None: mod.set_prior(AV=GaussianPrior(self.Av_mean, self.Av_sigma, bounds=(0, 1))) # lnprob arguments args = [mod, self.prot, self.prot_err, iso_only, gyro_only, rossby, model] self.args = args # Optimize. Try a few inits and pick the best. if optimize: neep, nage = 5, 5 likes1, likes2 = np.zeros(nage), np.zeros(neep) likes = np.zeros((neep, nage)) inds = np.zeros(nage) result_list = np.zeros((neep, nage, 5)) EEPS, AGES = np.meshgrid(np.linspace(200, 500, neep), np.linspace(7, 10., nage), indexing="ij") for i in range(neep): for j in range(nage): inits = [EEPS[i, j], AGES[i, j], 0., np.log(1000.), .01] results = spo.minimize(nll, inits, args=args) likes1[j] = lnprob(results.x, *args)[0] likes[i, j] = likes1[j] result_list[i, j, :] = results.x inds[i] = np.argmax(likes1) likes2[i] = max(likes1) eep_ind = np.argmax(likes2) age_ind = int(inds[eep_ind]) self.p_init = result_list[eep_ind, age_ind, :] # Run the MCMC sampler = self.run_mcmc() self.sampler = sampler nwalkers, nsteps, ndim = np.shape(self.sampler.chain) print("nsteps", nsteps, "burnin", burnin) self.samples = np.reshape(self.sampler.chain[:, burnin:, :], (nwalkers*(nsteps-burnin), ndim))
[docs] def run_mcmc(self): """Runs the MCMC. Runs the MCMC using emcee. Saves progress to the file specified as <filename.h5> in the <savedir> directory. Samples are saved to this file after every <thin_by> samples are obtained. And only one sample in every <thin_by> is saved. Sampling continues until either max_n samples are obtained or convergence is acheived (this usually takes much more than the default 100,000 maximum number of samples. Convergence is achieved if both 1) the change in autocorrelation time is extremely small (less than 0.01) and 2) if 100 or more independent samples have been obtained. Returns: sampler: the emcee sampler object. """ max_n = self.max_n if self.save_samples: max_n = self.max_n//self.thin_by ndim = len(self.p_init) # Should always be 5. Hard code it? p0 = [np.random.randn(ndim)*1e-4 + self.p_init for j in range(self.nwalkers)] # Broader gaussian for EEP initialization # p0 = np.empty((self.nwalkers, ndim)) # p0[:, 0] = np.random.randn(self.nwalkers)*1e-4 + self.p_init[0] # p0[:, 1] = np.random.randn(self.nwalkers)*1e-4 + self.p_init[1] # p0[:, 2] = np.random.randn(self.nwalkers)*1e-4 + self.p_init[2] # p0[:, 3] = np.random.randn(self.nwalkers)*1e-4 + self.p_init[3] # p0[:, 4] = np.random.randn(self.nwalkers)*1e-4 + self.p_init[4] # p0 = list(p0) if self.save_samples: sampler = emcee.EnsembleSampler( self.nwalkers, ndim, lnprob, args=self.args, backend=self.backend) # Copied from https://emcee.readthedocs.io/en/latest/tutorials/monitor/ # ====================================================================== # We'll track how the average autocorrelation time estimate changes index = 0 autocorr = np.empty(max_n) # This will be useful to testing convergence old_tau = np.inf # Now we'll sample for up to max_n steps for sample in sampler.sample(p0, iterations=max_n, thin_by=self.thin_by, store=True, progress=True): # Only check convergence every 100 steps # if sampler.iteration % 100: # continue # Compute the autocorrelation time so far # Using tol=0 means that we'll always get an estimate even # if it isn't trustworthy if sampler.iteration > 100: tau = sampler.get_autocorr_time(tol=0) * self.thin_by autocorr[index] = np.mean(tau) index += 1 # Check convergence converged = np.all(tau * 100 < sampler.iteration) converged &= np.all(np.abs(old_tau - tau) / tau < 0.01) converged &= np.all(tau) > 1 # print("100 samples?", np.all(tau * 100 < sampler.iteration)) # print(tau, tau*100, sampler.iteration) # print("Small delta tau?", np.all(np.abs(old_tau - tau) / tau < 0.01)) # print(np.abs(old_tau - tau)) if converged: break old_tau = tau # ====================================================================== else: sampler = emcee.EnsembleSampler(self.nwalkers, ndim, lnprob, args=self.args) sampler.run_mcmc(p0, max_n, progress=True) return sampler
[docs] def age_results(self, burnin=0): """The age samples. The posterior samples for age, optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median age, its 16th and 84th percentile lower and upper uncertainties and the age samples. Age is log10(Age/yrs). """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, 1] samps = np.reshape(samples, (nwalkers*(nsteps - burnin))) a = np.median((10**samps)*1e-9) errp = np.percentile((10**samps)*1e-9, 84) - a errm = a - np.percentile((10**samps)*1e-9, 16) return a, errm, errp, samps
[docs] def eep_results(self, burnin=0): """The EEP samples. The posterior samples for Equivalent Evolutionary Point, optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median EEP, its 16th and 84th percentile lower and upper uncertainties and the EEP samples. """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, 0] samps = np.reshape(samples, (nwalkers*(nsteps - burnin))) e = np.median(samps) errp = np.percentile(samps, 84) - e errm = e - np.percentile(samps, 16) return e, errm, errp, samps
[docs] def mass_results(self, burnin=0): """The mass samples. The posterior samples for mass, calculated from the EEP, age and feh samples, optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median mass, its 16th and 84th percentile lower and upper uncertainties and the mass 'samples' in units of Solar masses. """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, :] nwalkers, nsteps, ndim = np.shape(samples) samps = np.reshape(samples, (nwalkers*nsteps, ndim)) msamps = mist.interp_value([samps[:, 0], samps[:, 1], samps[:, 2]], ["mass"]) m = np.median(msamps) errp = np.percentile(msamps, 84) - m errm = m - np.percentile(msamps, 16) return m, errm, errp, msamps
[docs] def feh_results(self, burnin=0): """The metallicity samples. The posterior samples for metallicity, optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median metallicity, its 16th and 84th percentile lower and upper uncertainties and the metallicity samples. """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, 2] samps = np.reshape(samples, (nwalkers*(nsteps - burnin))) f = np.median(samps) errp = np.percentile(samps, 84) - f errm = f - np.percentile(samps, 16) return f, errm, errp, samps
[docs] def distance_results(self, burnin=0): """The ln(distance) samples. The posterior samples for distance (in natural log, parsecs), optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median ln(distance), its 16th and 84th percentile lower and upper uncertainties and the ln(distance) samples. """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, 3] samps = np.reshape(samples, (nwalkers*(nsteps - burnin))) d = np.median(samps) errp = np.percentile(samps, 84) - d errm = d - np.percentile(samps, 16) return d, errm, errp, samps
[docs] def Av_results(self, burnin=0): """The Av samples. The posterior samples for V-band extinction, optionally with a specified number of burn in steps thrown away. Args: burnin (Optional[int]): The number of samples to throw away. Default is 0. Returns: The median Av, its 16th and 84th percentile lower and upper uncertainties and the Av samples. """ nwalkers, nsteps, ndim = np.shape(self.sampler.chain) assert nsteps > burnin, "The number of burn in samples to throw "\ "away ({0}) cannot exceed the number of steps that were saved "\ "({1}). Try setting the burnin keyword argument.".format(burnin, nsteps) samples = self.sampler.chain[:, burnin:, 4] samps = np.reshape(samples, (nwalkers*(nsteps - burnin))) a_v = np.median(samps) errp = np.percentile(samps, 84) - a_v errm = a_v - np.percentile(samps, 16) return a_v, errm, errp, samps
def percentiles_from_samps(samps, lp): """ Calculate percentiles and maximum-likelihood values from 1D sample array. Args: samps (array): The 1D sample array for a single star. lp (array): The 1D log-probability array. Returns: med, errm, errp, std, max_like: The median value, lower and upper uncertainties, standard deviation and maximum likelihood value. """ med = np.median(samps) std = np.std(samps) upper = np.percentile(samps, 84) lower = np.percentile(samps, 16) errp = upper - med errm = med - lower max_like = samps[lp == max(lp)][0] return med, errm, errp, std, max_like def load_samples(fname, burnin=0): """ Read in H5 file containing samples and return the best-fit parameters. Args: fname (str): The full path to the H5 file. burnin (Optional[int]): The number of burn in steps to discard. Default is 0. Returns: flatsamps (array): The flattened 2d sample array, useful for making corner plots, with log-probability samples appended. The shape is Nwalkers*(Nsteps - burnin) x ndim + 1. The log-probability samples are the final dimension: augmented (array): The 3d sample array, useful for plotting chains. With log-probs appended as the extra dimension. prior_samples (array): The 3d array of prior samples. """ reader = emcee.backends.HDFBackend(fname, read_only=True) samples = reader.get_chain() lnprobs = reader.get_log_prob() prior_samples = reader.get_blobs() nsteps, nwalkers, ndim = np.shape(samples) augmented = np.zeros((nsteps, nwalkers, ndim+1)) augmented[:, :, :-1] = samples augmented[:, :, -1] = lnprobs nsteps, nwalkers, ndim = np.shape(augmented) flatsamps = np.reshape( augmented[burnin:, :, :], (nwalkers*(nsteps - burnin), ndim)) return flatsamps, augmented, np.reshape(prior_samples, nsteps*nwalkers), \ np.reshape(lnprobs, nsteps*nwalkers) def read_samples(samps, burnin=0): """ Extract best-fit parameters from samples. Args: samples (array): The 2D sample array (steps x dimensions), optionally with log-probability samples appended as an extra dimension. Designed to take the output from load_samples. burnin (Optional[int]): The number of burn in steps to discard. Default is 0. Returns: param_dict (dict): A pandas dataframe of results with columns: EEP_med, EEP_errm, EEP_errp, EEP_std, EEP_ml, age_med, age_errm, age_errp, age_std, age_ml, feh_med, feh_errm, feh_errp, feh_std, feh_ml, distance_med, distance_errm, distance_errp, distance_std, distance_ml, av_med, av_errm, av_errp, av_std, av_ml. Columns ending in "ml" mean the maximum-likelihood value. Columns ending in "med" mean the median value. Recommend using the ml value for stars hotter (lower) than 1.3 in Gaia G_BP - G_RP and the median value for stars cooler (higher) than 1.3. Age is provided in units of Gyrs and distance in parsecs. """ e, em, ep, _estd, eml = percentiles_from_samps( samps[burnin:, 0], samps[burnin:, 5]) a, am, ap, _astd, aml = percentiles_from_samps( (10**samps[burnin:, 1])*1e-9, samps[burnin:, 5]) f, fm, fp, _fstd, fml = percentiles_from_samps( samps[burnin:, 2], samps[burnin:, 5]) d, dm, dp, _dstd, dml = percentiles_from_samps( np.exp(samps[burnin:, 3]), samps[burnin:, 5]) av, avm, avp, _avstd, avml = percentiles_from_samps( samps[burnin:, 4], samps[burnin:, 5]) param_dict = pd.DataFrame(dict({ "EEP_med": e, "EEP_errm": em, "EEP_errp": ep, "EEP_std": _estd, "EEP_ml": eml, "age_med_gyr": a, "age_errm": am, "age_errp": ap, "age_std": _astd, "age_ml_gyr": aml, "feh_med": f, "feh_errm": fm, "feh_errp": fp, "feh_std": _fstd, "feh_ml": fml, "distance_med_pc": d, "distance_errm": dm, "distance_errp": dp, "distance_std_pc": _dstd, "distance_ml": dml, "Av_med": av, "Av_errm": avm, "Av_errp": avp, "Av_std": _avstd, "Av_ml": avml}, index=[0])) return param_dict