Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PSL - first functions WIP #13

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ description = "SolarMonitor Active Regtion Tracking (SMART)"
requires-python = ">=3.9"
readme = { file = "README.rst", content-type = "text/x-rst" }
license = { file = "licenses/LICENSE.rst", content-type = "text/plain" }
dependencies = []
dependencies = [
"numpy",
"sunpy",
"scikit-image",
"matplotlib"
]
dynamic = ["version"]

[project.optional-dependencies]
Expand Down
34 changes: 34 additions & 0 deletions smart/characterization/PSL.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import numpy as np
import skimage as ski
from skimage.morphology import disk

import astropy.units as u


@u.quantity_input
def separate_polarities(im_map, feature_map, dilation_radius: u.Quantity[u.arcsec] = 2 * u.arcsec):
"""
Separate feature masks into a negative mask and positive mask, and dilating both masks.

Parameters
----------
im_map : Map
Processed SunPy magnetogram map.
feature_map :
"""
posmask = (feature_map > 0).astype(int)
negmask = (feature_map < 0).astype(int)

arcsec_to_pixel = ((im_map.scale[0] + im_map.scale[1]) / 2) ** (-1)
dilation_radius = (np.round(dilation_radius * arcsec_to_pixel)).to_value(u.pix)

dilated_posmask = ski.morphology.binary_dilation(posmask, disk(dilation_radius))
dilated_negmask = ski.morphology.binary_dilation(negmask, disk(dilation_radius))
return dilated_posmask, dilated_negmask


def PSL_length(dilated_posmask, dilated_negmask):
PSL_mask = ((dilated_posmask + dilated_negmask) > 1).astype(int)
PSL_thinmask = ski.morphology.thin(PSL_mask)
PSL_length = np.sum(PSL_thinmask)
return PSL_length
117 changes: 117 additions & 0 deletions smart/segmentation/IGM.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import matplotlib.pyplot as plt
import numpy as np
import skimage as ski
from skimage.morphology import disk

import astropy.units as u

from sunpy.map import Map

from smart.segmentation.map_processing import smooth_los_threshold


def index_and_grow_mask(
current_map: Map, previous_map: Map, dilation_radius: u.Quantity[u.arcsec] = 2.5 * u.arcsec
):
"""
Performing indexing and growing of the Mask.

Transient features are removed by comparing the mask at time 't' and the mask differentially
rotated to time 't'. ARs are then assigned ascending integer values (starting from one) in
order of decreasing size.

Parameters
----------
current_map : Map
Processed magnetogram map from time 't'.
previous_map : Map
Processed magnetogtam map from time 't - delta_t' differentially rotated to time t.
dilation_radius : int, optional
Radius of the disk for binary dilation (default is 2.5 arcsecs).

Returns
-------
sorted_labels : numpy.ndarray
Individual contiguous features are indexed by assigning ascending integer
values (beginning with one) in order of decreasing feature size.

"""
pixel_size = (current_map.scale[0] + current_map.scale[1]) / 2
dilation_radius = (np.round(dilation_radius / pixel_size)).to_value(u.pix)

filtered_labels = smooth_los_threshold(current_map)[1]
filtered_labels_dt = smooth_los_threshold(previous_map)[1]

dilated_mask = ski.morphology.binary_dilation(filtered_labels, disk(dilation_radius))
dilated_mask_dt = ski.morphology.binary_dilation(filtered_labels_dt, disk(dilation_radius))

transient_features = dilated_mask_dt & ~dilated_mask
final_mask = dilated_mask & ~transient_features
final_labels = ski.measure.label(final_mask)

regions = ski.measure.regionprops(final_labels)
region_sizes = [(region.label, region.area) for region in regions]

sorted_region_sizes = sorted(region_sizes, key=lambda x: x[1], reverse=True)
sorted_labels = np.zeros_like(final_labels)
for new_label, (old_label, _) in enumerate(sorted_region_sizes, start=1):
sorted_labels[final_labels == old_label] = new_label

return sorted_labels


def plot_indexed_grown_mask(im_map: Map, sorted_labels, contours=True, labels=True, figtext=True):
"""
Plotting the fully processed and segmented magnetogram with labels and AR contours optionally displayed.

Parameters
----------
im_map : Map
Processed magnetogram map from time 't'.
diff_map : Map
Processed magnetogtam map from time 't - delta_t' differentially rotated to time t.
contours : bool, optional
If True, contours of the detected regions displayed on map (default is True).
labels : bool, optional
If True, labels with the region numbers will be overlaid on the regions (default is True).
figtext : bool, optional
If True, figtext with the total number of detected regions is displayed on the map (default is True).

Returns
-------
None.

"""

fig = plt.figure()
ax = fig.add_subplot(projection=im_map)
im_map.plot(axes=ax)

unique_labels = np.unique(sorted_labels)
unique_labels = unique_labels[unique_labels != 0]

if contours:
for label_value in unique_labels:
region_mask = sorted_labels == label_value
ax.contour(region_mask, colors="purple", linewidths=1)

if labels:
for label_value in unique_labels:
region_mask = sorted_labels == label_value
region = ski.measure.regionprops(region_mask.astype(int))[0]
centroid = region.centroid
ax.text(
centroid[1],
centroid[0],
str(label_value),
color="red",
fontsize=12,
weight="bold",
ha="center",
va="center",
)

if figtext:
plt.figtext(0.47, 0.2, f"Number of regions = {len(unique_labels)}", color="white")

plt.show()
27 changes: 27 additions & 0 deletions smart/segmentation/differential_rotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from sunpy.coordinates import propagate_with_solar_surface
from sunpy.map import Map


def diff_rotation(im_map: Map, delta_im_map: Map):
"""
Performing differential rotation on a map in order to to correct for feature
motions due to solar rotation.

Parameters
----------
im_map : Map
Processed SunPy magnetogram map.
delta_im_map : Map
Processed SunPy magnetogram taken at time Δt before im_map.

Returns
-------
diff_map : Map
delta_im_map differentially rotated to match im_map.

"""

with propagate_with_solar_surface():
diff_map = delta_im_map.reproject_to(im_map.wcs)

return diff_map, delta_im_map.date
160 changes: 160 additions & 0 deletions smart/segmentation/map_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import numpy as np
import skimage as ski
from matplotlib import colors
from skimage.morphology import disk

import astropy.units as u

from sunpy.map import Map, all_coordinates_from_map, coordinate_is_on_solar_disk


def map_threshold(im_map):
"""
Creating a map from a fits file and processing said map.

Parameters
----------
file : Map
Unprocessed magnetogram map.

Returns
-------
im_map : Map
Processed magnetogram map.

"""
im_map.data[~coordinate_is_on_solar_disk(all_coordinates_from_map(im_map))] = np.nan
im_map.cmap.set_bad("k")
im_map.plot_settings["norm"] = colors.Normalize(vmin=-200, vmax=200)
return im_map


@u.quantity_input
def smooth_los_threshold(
im_map: Map,
thresh: u.Quantity[u.Gauss] = 100 * u.Gauss,
dilation_radius: u.Quantity[u.arcsec] = 2 * u.arcsec,
sigma: u.Quantity[u.arcsec] = 10 * u.arcsec,
min_size: u.Quantity[u.arcsec] = 2250 * u.arcsec,
):
"""

We apply Smoothing, a noise Threshold, and an LOS correction, respectively, to the data.

Parameters
----------
im_map : Map
Processed SunPy magnetogram map.
thresh : int, optional
Threshold value to identify regions of interest (default is 100 Gauss).
dilation_radius : int, optional
Radius of the disk for binary dilation (default is 2 arcsecs).
sigma : int, optional
Standard deviation for Gaussian smoothing (default is 10 arcsecs).
min_size : int, optional
Minimum size of regions to keep in final mask (default is 2250 arcsecs**2).

Returns
-------
smooth_map : Map
Map after applying Gaussian smoothing.
filtered_labels : numpy.ndarray
2D array with each pixel labelled.
mask_sizes : numpy.ndarray
Boolean array indicating the sizes of each labeled region.

"""

pixel_size = (im_map.scale[0] + im_map.scale[1]) / 2
dilation_radius = (np.round(dilation_radius / pixel_size)).to_value(u.pix)
sigma = (np.round(sigma / pixel_size)).to_value(u.pix)
min_size = (np.round(min_size / pixel_size)).to_value(u.pix)

cosmap_data = cosine_correction(im_map)

negmask = cosmap_data < -thresh
posmask = cosmap_data > thresh
mask = negmask | posmask

dilated_mask = ski.morphology.binary_dilation(mask, disk(dilation_radius))
smoothed_data = ski.filters.gaussian(np.nan_to_num(cosmap_data) * ~dilated_mask, sigma)
smooth_map = Map(smoothed_data, im_map.meta)

labels = ski.measure.label(dilated_mask)
label_sizes = np.bincount(labels.ravel())
mask_sizes = label_sizes > min_size
mask_sizes[0] = 0
filtered_labels = mask_sizes[labels]
return smooth_map, filtered_labels, mask_sizes


def get_cosine_correction(im_map: Map):
"""
Get the cosine map and off-limb pixel map using WCS.

Parameters
----------
im_map : Map
Processed SunPy magnetogram map.

Returns
-------
cos_cor : numpy.ndarray
Array of cosine correction factors for each pixel. Values greater than a threshold (edge) are set to 1.
d_angular : numpy.ndarray
Array of angular distances from the disk center in radians.
off_limb : numpy.ndarray
Binary array where pixels on the disk are 1 and pixels off the disk are 0.
"""

# Take off an extra percent from the disk to get rid of limb effects
edge = 0.99

coordinates = all_coordinates_from_map(im_map)
x = coordinates.Tx.to(u.arcsec)
y = coordinates.Ty.to(u.arcsec)
d_radial = np.sqrt(x**2 + y**2)

cos_cor_ratio = d_radial / im_map.rsun_obs
cos_cor_ratio = np.clip(cos_cor_ratio, -1, 1)
d_angular = np.arcsin(cos_cor_ratio)
cos_cor = 1 / np.cos(d_angular)

off_disk = d_radial > (im_map.rsun_obs * edge)
cos_cor[off_disk] = 1

off_limb = np.zeros_like(d_radial.value)
off_limb[off_disk] = 0
off_limb[~off_disk] = 1
return cos_cor, d_angular, off_limb


def cosine_correction(im_map: Map, cosmap=None):
"""
Perform magnetic field cosine correction.

Parameters
----------
inmap : Map
Processed SunPy magnetogram map.
cosmap : numpy.ndarray, optional
An array of the cosine correction factors for each pixel.
If not provided, computed using get_cosine_correction.

Returns
-------
corrected_data : numpy.ndarray
The magnetic field data after applying the cosine correction (units = Gauss).

"""
if cosmap is None:
cosmap = get_cosine_correction(im_map)[0]

scale = (im_map.scale[0] + im_map.scale[1]) / 2

angle_limit = np.arcsin(1 - (scale / im_map.rsun_obs).value)
cos_limit = 1 / np.cos(angle_limit)
cosmap = np.clip(cosmap, None, cos_limit)

corrected_data = im_map.data * cosmap * u.Gauss
return corrected_data
Loading
Loading