001/* 002 * VM-Operator 003 * Copyright (C) 2023, 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.openapi.Configuration; 025import java.io.IOException; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.time.Instant; 029import java.util.Comparator; 030import java.util.Optional; 031import java.util.logging.Level; 032import org.jdrupes.vmoperator.common.Constants.Crd; 033import org.jdrupes.vmoperator.common.Constants.Status; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 036import org.jdrupes.vmoperator.common.VmDefinition.Assignment; 037import org.jdrupes.vmoperator.common.VmDefinitionStub; 038import org.jdrupes.vmoperator.common.VmPool; 039import org.jdrupes.vmoperator.manager.events.AssignVm; 040import org.jdrupes.vmoperator.manager.events.ChannelManager; 041import org.jdrupes.vmoperator.manager.events.Exit; 042import org.jdrupes.vmoperator.manager.events.GetPools; 043import org.jdrupes.vmoperator.manager.events.GetVms; 044import org.jdrupes.vmoperator.manager.events.GetVms.VmData; 045import org.jdrupes.vmoperator.manager.events.ModifyVm; 046import org.jdrupes.vmoperator.manager.events.PodChanged; 047import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 048import org.jdrupes.vmoperator.manager.events.VmChannel; 049import org.jdrupes.vmoperator.manager.events.VmPoolChanged; 050import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 051import org.jgrapes.core.Channel; 052import org.jgrapes.core.Component; 053import org.jgrapes.core.EventPipeline; 054import org.jgrapes.core.annotation.Handler; 055import org.jgrapes.core.events.HandlingError; 056import org.jgrapes.core.events.Start; 057import org.jgrapes.util.events.ConfigurationUpdate; 058 059/** 060 * Implements a controller as defined in the 061 * [Operator Whitepaper](https://github.com/cncf/tag-app-delivery/blob/eece8f7307f2970f46f100f51932db106db46968/operator-wg/whitepaper/Operator-WhitePaper_v1-0.md#operator-components-in-kubernetes). 062 * 063 * The implementation splits the controller in two components. The 064 * {@link VmMonitor} and the {@link Reconciler}. The former watches 065 * the VM definitions (CRs) and generates {@link VmResourceChanged} events 066 * when they change. The latter handles the changes and reconciles the 067 * resources in the cluster. 068 * 069 * The controller itself supports a single configuration property: 070 * ```yaml 071 * "/Manager": 072 * "/Controller": 073 * namespace: vmop-dev 074 * ``` 075 * This may only be set when running the Manager (and thus the Controller) 076 * outside a container during development. 077 * 078 *  079 * 080 * @startuml controller-components.svg 081 * skinparam component { 082 * BackGroundColor #FEFECE 083 * BorderColor #A80036 084 * BorderThickness 1.25 085 * BackgroundColor<<internal>> #F1F1F1 086 * BorderColor<<internal>> #181818 087 * BorderThickness<<internal>> 1 088 * } 089 * 090 * [Controller] 091 * [Controller] *--> [VmWatcher] 092 * [Controller] *--> [Reconciler] 093 * @enduml 094 */ 095public class Controller extends Component { 096 097 private String namespace; 098 private final ChannelManager<String, VmChannel, EventPipeline> chanMgr; 099 100 /** 101 * Creates a new instance. 102 */ 103 @SuppressWarnings("PMD.ConstructorCallsOverridableMethod") 104 public Controller(Channel componentChannel) { 105 super(componentChannel); 106 // Prepare component tree 107 chanMgr = new ChannelManager<>(name -> { 108 try { 109 return new VmChannel(channel(), newEventPipeline(), 110 new K8sClient()); 111 } catch (IOException e) { 112 logger.log(Level.SEVERE, e, () -> "Failed to create client" 113 + " for handling changes: " + e.getMessage()); 114 return null; 115 } 116 }); 117 attach(new VmMonitor(channel(), chanMgr)); 118 attach(new DisplaySecretMonitor(channel(), chanMgr)); 119 // Currently, we don't use the IP assigned by the load balancer 120 // to access the VM's console. Might change in the future. 121 // attach(new ServiceMonitor(channel()).channelManager(chanMgr)); 122 attach(new Reconciler(channel())); 123 attach(new PoolMonitor(channel())); 124 attach(new PodMonitor(channel(), chanMgr)); 125 } 126 127 /** 128 * Special handling of {@link ApiException} thrown by handlers. 129 * 130 * @param event the event 131 */ 132 @Handler(channels = Channel.class) 133 public void onHandlingError(HandlingError event) { 134 if (event.throwable() instanceof ApiException exc) { 135 logger.log(Level.WARNING, exc, 136 () -> "Problem accessing kubernetes: " + exc.getResponseBody()); 137 event.stop(); 138 } 139 } 140 141 /** 142 * Configure the component. 143 * 144 * @param event the event 145 */ 146 @Handler 147 public void onConfigurationUpdate(ConfigurationUpdate event) { 148 event.structured(componentPath()).ifPresent(c -> { 149 if (c.containsKey("namespace")) { 150 namespace = (String) c.get("namespace"); 151 } 152 }); 153 } 154 155 /** 156 * Handle the start event. Has higher priority because it configures 157 * the default Kubernetes client. 158 * 159 * @param event the event 160 * @throws IOException 161 * @throws ApiException 162 */ 163 @Handler(priority = 100) 164 public void onStart(Start event) throws IOException, ApiException { 165 // Make sure to use thread specific client 166 // https://github.com/kubernetes-client/java/issues/100 167 Configuration.setDefaultApiClient(null); 168 169 // Verify that a namespace has been configured 170 if (namespace == null) { 171 var path = Path 172 .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); 173 if (Files.isReadable(path)) { 174 namespace = Files.lines(path).findFirst().orElse(null); 175 fire(new ConfigurationUpdate().add(componentPath(), "namespace", 176 namespace)); 177 } 178 } 179 if (namespace == null) { 180 logger.severe(() -> "Namespace to control not configured and" 181 + " no file in kubernetes directory."); 182 event.cancel(true); 183 fire(new Exit(2)); 184 return; 185 } 186 logger.config(() -> "Controlling namespace \"" + namespace + "\"."); 187 } 188 189 /** 190 * Returns the VM data. 191 * 192 * @param event the event 193 */ 194 @Handler 195 public void onGetVms(GetVms event) { 196 event.setResult(chanMgr.channels().stream() 197 .filter(c -> event.name().isEmpty() 198 || c.vmDefinition().name().equals(event.name().get())) 199 .filter(c -> event.user().isEmpty() && event.roles().isEmpty() 200 || !c.vmDefinition().permissionsFor(event.user().orElse(null), 201 event.roles()).isEmpty()) 202 .filter(c -> event.fromPool().isEmpty() 203 || c.vmDefinition().assignment().map(Assignment::pool) 204 .map(p -> p.equals(event.fromPool().get())).orElse(false)) 205 .filter(c -> event.toUser().isEmpty() 206 || c.vmDefinition().assignment().map(Assignment::user) 207 .map(u -> u.equals(event.toUser().get())).orElse(false)) 208 .map(c -> new VmData(c.vmDefinition(), c)) 209 .toList()); 210 } 211 212 /** 213 * Assign a VM if not already assigned. 214 * 215 * @param event the event 216 * @throws ApiException the api exception 217 * @throws InterruptedException 218 */ 219 @Handler 220 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 221 public void onAssignVm(AssignVm event) 222 throws ApiException, InterruptedException { 223 while (true) { 224 // Search for existing assignment. 225 var vmQuery = chanMgr.channels().stream() 226 .filter(c -> c.vmDefinition().assignment().map(Assignment::pool) 227 .map(p -> p.equals(event.fromPool())).orElse(false)) 228 .filter(c -> c.vmDefinition().assignment().map(Assignment::user) 229 .map(u -> u.equals(event.toUser())).orElse(false)) 230 .findFirst(); 231 if (vmQuery.isPresent()) { 232 var vmDef = vmQuery.get().vmDefinition(); 233 event.setResult(new VmData(vmDef, vmQuery.get())); 234 return; 235 } 236 237 // Get the pool definition for checking possible assignment 238 VmPool vmPool = newEventPipeline().fire(new GetPools() 239 .withName(event.fromPool())).get().stream().findFirst() 240 .orElse(null); 241 if (vmPool == null) { 242 return; 243 } 244 245 // Find available VM. 246 vmQuery = chanMgr.channels().stream() 247 .filter(c -> vmPool.isAssignable(c.vmDefinition())) 248 .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() 249 .assignment().map(Assignment::lastUsed) 250 .orElse(Instant.ofEpochSecond(0))) 251 .thenComparing(preferRunning)) 252 .findFirst(); 253 254 // None found 255 if (vmQuery.isEmpty()) { 256 return; 257 } 258 259 // Assign to user 260 var chosenVm = vmQuery.get(); 261 if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment( 262 vmPool, event.toUser())).get()).orElse(false)) { 263 var vmDef = chosenVm.vmDefinition(); 264 event.setResult(new VmData(vmDef, chosenVm)); 265 266 // Make sure that a newly assigned VM is running. 267 chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running")); 268 return; 269 } 270 } 271 } 272 273 private static Comparator<VmChannel> preferRunning 274 = new Comparator<>() { 275 @Override 276 public int compare(VmChannel ch1, VmChannel ch2) { 277 if (ch1.vmDefinition().conditionStatus("Running").orElse(false) 278 && !ch2.vmDefinition().conditionStatus("Running") 279 .orElse(false)) { 280 return -1; 281 } 282 return 0; 283 } 284 }; 285 286 /** 287 * When s pool is deleted, remove all related assignments. 288 * 289 * @param event the event 290 * @throws InterruptedException 291 */ 292 @Handler 293 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 294 public void onPoolChanged(VmPoolChanged event) throws InterruptedException { 295 if (!event.deleted()) { 296 return; 297 } 298 var vms = newEventPipeline() 299 .fire(new GetVms().assignedFrom(event.vmPool().name())).get(); 300 for (var vm : vms) { 301 vm.channel().fire(new UpdateAssignment(event.vmPool(), null)); 302 } 303 } 304 305 /** 306 * Remove runner version from status when pod is deleted 307 * 308 * @param event the event 309 * @param channel the channel 310 * @throws ApiException the api exception 311 */ 312 @Handler 313 public void onPodChange(PodChanged event, VmChannel channel) 314 throws ApiException { 315 if (event.type() == ResponseType.DELETED) { 316 // Remove runner info from status 317 var vmDef = channel.vmDefinition(); 318 var vmStub = VmDefinitionStub.get(channel.client(), 319 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 320 vmDef.namespace(), vmDef.name()); 321 vmStub.updateStatus(from -> { 322 JsonObject status = from.statusJson(); 323 status.remove(Status.RUNNER_VERSION); 324 return status; 325 }); 326 } 327 } 328}