001/* 002 * VM-Operator 003 * Copyright (C) 2023, 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.openapi.Configuration; 025import java.io.IOException; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.time.Instant; 029import java.util.Comparator; 030import java.util.Optional; 031import java.util.logging.Level; 032import org.jdrupes.vmoperator.common.Constants.Crd; 033import org.jdrupes.vmoperator.common.Constants.Status; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 036import org.jdrupes.vmoperator.common.VmDefinition.Assignment; 037import org.jdrupes.vmoperator.common.VmDefinitionStub; 038import org.jdrupes.vmoperator.common.VmPool; 039import org.jdrupes.vmoperator.manager.events.AssignVm; 040import org.jdrupes.vmoperator.manager.events.ChannelManager; 041import org.jdrupes.vmoperator.manager.events.Exit; 042import org.jdrupes.vmoperator.manager.events.GetPools; 043import org.jdrupes.vmoperator.manager.events.GetVms; 044import org.jdrupes.vmoperator.manager.events.GetVms.VmData; 045import org.jdrupes.vmoperator.manager.events.ModifyVm; 046import org.jdrupes.vmoperator.manager.events.PodChanged; 047import org.jdrupes.vmoperator.manager.events.UpdateAssignment; 048import org.jdrupes.vmoperator.manager.events.VmChannel; 049import org.jdrupes.vmoperator.manager.events.VmPoolChanged; 050import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 051import org.jgrapes.core.Channel; 052import org.jgrapes.core.Component; 053import org.jgrapes.core.annotation.Handler; 054import org.jgrapes.core.events.HandlingError; 055import org.jgrapes.core.events.Start; 056import org.jgrapes.util.events.ConfigurationUpdate; 057 058/** 059 * Implements a controller as defined in the 060 * [Operator Whitepaper](https://github.com/cncf/tag-app-delivery/blob/eece8f7307f2970f46f100f51932db106db46968/operator-wg/whitepaper/Operator-WhitePaper_v1-0.md#operator-components-in-kubernetes). 061 * 062 * The implementation splits the controller in two components. The 063 * {@link VmMonitor} and the {@link Reconciler}. The former watches 064 * the VM definitions (CRs) and generates {@link VmResourceChanged} events 065 * when they change. The latter handles the changes and reconciles the 066 * resources in the cluster. 067 * 068 * The controller itself supports a single configuration property: 069 * ```yaml 070 * "/Manager": 071 * "/Controller": 072 * namespace: vmop-dev 073 * ``` 074 * This may only be set when running the Manager (and thus the Controller) 075 * outside a container during development. 076 * 077 *  078 * 079 * @startuml controller-components.svg 080 * skinparam component { 081 * BackGroundColor #FEFECE 082 * BorderColor #A80036 083 * BorderThickness 1.25 084 * BackgroundColor<<internal>> #F1F1F1 085 * BorderColor<<internal>> #181818 086 * BorderThickness<<internal>> 1 087 * } 088 * 089 * [Controller] 090 * [Controller] *--> [VmWatcher] 091 * [Controller] *--> [Reconciler] 092 * @enduml 093 */ 094public class Controller extends Component { 095 096 private String namespace; 097 private final ChannelManager<String, VmChannel, ?> chanMgr; 098 099 /** 100 * Creates a new instance. 101 */ 102 @SuppressWarnings("PMD.ConstructorCallsOverridableMethod") 103 public Controller(Channel componentChannel) { 104 super(componentChannel); 105 // Prepare component tree 106 chanMgr = new ChannelManager<>(name -> { 107 try { 108 return new VmChannel(channel(), newEventPipeline(), 109 new K8sClient()); 110 } catch (IOException e) { 111 logger.log(Level.SEVERE, e, () -> "Failed to create client" 112 + " for handling changes: " + e.getMessage()); 113 return null; 114 } 115 }); 116 attach(new VmMonitor(channel(), chanMgr)); 117 attach(new DisplaySecretMonitor(channel(), chanMgr)); 118 // Currently, we don't use the IP assigned by the load balancer 119 // to access the VM's console. Might change in the future. 120 // attach(new ServiceMonitor(channel()).channelManager(chanMgr)); 121 attach(new Reconciler(channel())); 122 attach(new PoolMonitor(channel())); 123 attach(new PodMonitor(channel(), chanMgr)); 124 } 125 126 /** 127 * Special handling of {@link ApiException} thrown by handlers. 128 * 129 * @param event the event 130 */ 131 @Handler(channels = Channel.class) 132 public void onHandlingError(HandlingError event) { 133 if (event.throwable() instanceof ApiException exc) { 134 logger.log(Level.WARNING, exc, 135 () -> "Problem accessing kubernetes: " + exc.getResponseBody()); 136 event.stop(); 137 } 138 } 139 140 /** 141 * Configure the component. 142 * 143 * @param event the event 144 */ 145 @Handler 146 public void onConfigurationUpdate(ConfigurationUpdate event) { 147 event.structured(componentPath()).ifPresent(c -> { 148 if (c.containsKey("namespace")) { 149 namespace = (String) c.get("namespace"); 150 } 151 }); 152 } 153 154 /** 155 * Handle the start event. Has higher priority because it configures 156 * the default Kubernetes client. 157 * 158 * @param event the event 159 * @throws IOException 160 * @throws ApiException 161 */ 162 @Handler(priority = 100) 163 public void onStart(Start event) throws IOException, ApiException { 164 // Make sure to use thread specific client 165 // https://github.com/kubernetes-client/java/issues/100 166 Configuration.setDefaultApiClient(null); 167 168 // Verify that a namespace has been configured 169 if (namespace == null) { 170 var path = Path 171 .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); 172 if (Files.isReadable(path)) { 173 namespace = Files.lines(path).findFirst().orElse(null); 174 fire(new ConfigurationUpdate().add(componentPath(), "namespace", 175 namespace)); 176 } 177 } 178 if (namespace == null) { 179 logger.severe(() -> "Namespace to control not configured and" 180 + " no file in kubernetes directory."); 181 event.cancel(true); 182 fire(new Exit(2)); 183 return; 184 } 185 logger.config(() -> "Controlling namespace \"" + namespace + "\"."); 186 } 187 188 /** 189 * Returns the VM data. 190 * 191 * @param event the event 192 */ 193 @Handler 194 public void onGetVms(GetVms event) { 195 event.setResult(chanMgr.channels().stream() 196 .filter(c -> event.name().isEmpty() 197 || c.vmDefinition().name().equals(event.name().get())) 198 .filter(c -> event.user().isEmpty() && event.roles().isEmpty() 199 || !c.vmDefinition().permissionsFor(event.user().orElse(null), 200 event.roles()).isEmpty()) 201 .filter(c -> event.fromPool().isEmpty() 202 || c.vmDefinition().assignment().map(Assignment::pool) 203 .map(p -> p.equals(event.fromPool().get())).orElse(false)) 204 .filter(c -> event.toUser().isEmpty() 205 || c.vmDefinition().assignment().map(Assignment::user) 206 .map(u -> u.equals(event.toUser().get())).orElse(false)) 207 .map(c -> new VmData(c.vmDefinition(), c)) 208 .toList()); 209 } 210 211 /** 212 * Assign a VM if not already assigned. 213 * 214 * @param event the event 215 * @throws ApiException the api exception 216 * @throws InterruptedException 217 */ 218 @Handler 219 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 220 public void onAssignVm(AssignVm event) 221 throws ApiException, InterruptedException { 222 while (true) { 223 // Search for existing assignment. 224 var vmQuery = chanMgr.channels().stream() 225 .filter(c -> c.vmDefinition().assignment().map(Assignment::pool) 226 .map(p -> p.equals(event.fromPool())).orElse(false)) 227 .filter(c -> c.vmDefinition().assignment().map(Assignment::user) 228 .map(u -> u.equals(event.toUser())).orElse(false)) 229 .findFirst(); 230 if (vmQuery.isPresent()) { 231 var vmDef = vmQuery.get().vmDefinition(); 232 event.setResult(new VmData(vmDef, vmQuery.get())); 233 return; 234 } 235 236 // Get the pool definition for checking possible assignment 237 VmPool vmPool = newEventPipeline().fire(new GetPools() 238 .withName(event.fromPool())).get().stream().findFirst() 239 .orElse(null); 240 if (vmPool == null) { 241 return; 242 } 243 244 // Find available VM. 245 vmQuery = chanMgr.channels().stream() 246 .filter(c -> vmPool.isAssignable(c.vmDefinition())) 247 .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() 248 .assignment().map(Assignment::lastUsed) 249 .orElse(Instant.ofEpochSecond(0))) 250 .thenComparing(preferRunning)) 251 .findFirst(); 252 253 // None found 254 if (vmQuery.isEmpty()) { 255 return; 256 } 257 258 // Assign to user 259 var chosenVm = vmQuery.get(); 260 if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment( 261 vmPool, event.toUser())).get()).orElse(false)) { 262 var vmDef = chosenVm.vmDefinition(); 263 event.setResult(new VmData(vmDef, chosenVm)); 264 265 // Make sure that a newly assigned VM is running. 266 chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running")); 267 return; 268 } 269 } 270 } 271 272 private static Comparator<VmChannel> preferRunning 273 = new Comparator<>() { 274 @Override 275 public int compare(VmChannel ch1, VmChannel ch2) { 276 if (ch1.vmDefinition().conditionStatus("Running").orElse(false) 277 && !ch2.vmDefinition().conditionStatus("Running") 278 .orElse(false)) { 279 return -1; 280 } 281 return 0; 282 } 283 }; 284 285 /** 286 * When s pool is deleted, remove all related assignments. 287 * 288 * @param event the event 289 * @throws InterruptedException 290 */ 291 @Handler 292 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 293 public void onPoolChanged(VmPoolChanged event) throws InterruptedException { 294 if (!event.deleted()) { 295 return; 296 } 297 var vms = newEventPipeline() 298 .fire(new GetVms().assignedFrom(event.vmPool().name())).get(); 299 for (var vm : vms) { 300 vm.channel().fire(new UpdateAssignment(event.vmPool(), null)); 301 } 302 } 303 304 /** 305 * Remove runner version from status when pod is deleted 306 * 307 * @param event the event 308 * @param channel the channel 309 * @throws ApiException the api exception 310 */ 311 @Handler 312 public void onPodChange(PodChanged event, VmChannel channel) 313 throws ApiException { 314 if (event.type() == ResponseType.DELETED) { 315 // Remove runner info from status 316 var vmDef = channel.vmDefinition(); 317 var vmStub = VmDefinitionStub.get(channel.client(), 318 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 319 vmDef.namespace(), vmDef.name()); 320 vmStub.updateStatus(from -> { 321 JsonObject status = from.statusJson(); 322 status.remove(Status.RUNNER_VERSION); 323 return status; 324 }); 325 } 326 } 327}