5050import java .util .concurrent .ExecutorService ;
5151import java .util .concurrent .Future ;
5252import java .util .concurrent .ThreadPoolExecutor ;
53+ import java .util .concurrent .TimeUnit ;
5354import java .util .concurrent .atomic .AtomicBoolean ;
5455import java .util .concurrent .atomic .AtomicInteger ;
5556import java .util .concurrent .atomic .AtomicReference ;
8687import org .apache .accumulo .core .lock .ServiceLockData .ServiceDescriptor ;
8788import org .apache .accumulo .core .lock .ServiceLockData .ServiceDescriptors ;
8889import org .apache .accumulo .core .lock .ServiceLockData .ThriftService ;
90+ import org .apache .accumulo .core .lock .ServiceLockPaths ;
8991import org .apache .accumulo .core .lock .ServiceLockPaths .AddressSelector ;
9092import org .apache .accumulo .core .lock .ServiceLockPaths .ResourceGroupPredicate ;
9193import org .apache .accumulo .core .lock .ServiceLockPaths .ServiceLockPath ;
94+ import org .apache .accumulo .core .lock .ServiceLockSupport ;
9295import org .apache .accumulo .core .lock .ServiceLockSupport .HAServiceLockWatcher ;
9396import org .apache .accumulo .core .logging .ConditionalLogger .DeduplicatingLogger ;
9497import org .apache .accumulo .core .manager .state .tables .TableState ;
115118import org .apache .accumulo .core .zookeeper .ZcStat ;
116119import org .apache .accumulo .manager .compaction .coordinator .CompactionCoordinator ;
117120import org .apache .accumulo .manager .fate .FateManager ;
121+ import org .apache .accumulo .manager .fate .FateWorker ;
118122import org .apache .accumulo .manager .merge .FindMergeableRangeTask ;
119123import org .apache .accumulo .manager .metrics .ManagerMetrics ;
120124import org .apache .accumulo .manager .metrics .fate .FateExecutorMetricsProducer ;
169173 * <p>
170174 * The manager will also coordinate log recoveries and reports general status.
171175 */
176+ // TODO create standalone PrimaryFateEnv class and pull everything into there relatated to
177+ // FateEnv... this will make it much more clear the env is for metadata ops only
172178public class Manager extends AbstractServer
173179 implements LiveTServerSet .Listener , FateEnv , HighlyAvailableService {
174180
@@ -206,6 +212,7 @@ public class Manager extends AbstractServer
206212 private AuthenticationTokenKeyManager authenticationTokenKeyManager ;
207213
208214 ServiceLock managerLock = null ;
215+ ServiceLock primaryManagerLock = null ;
209216 private final BalanceManager balanceManager ;
210217
211218 private ManagerState state = ManagerState .INITIAL ;
@@ -219,7 +226,6 @@ public class Manager extends AbstractServer
219226 private final AtomicReference <Map <FateInstanceType ,Fate <FateEnv >>> fateRefs =
220227 new AtomicReference <>();
221228 private volatile FateManager fateManager ;
222- private volatile ManagerAssistant assitantManager ;
223229
224230 private final ManagerMetrics managerMetrics = new ManagerMetrics ();
225231
@@ -758,7 +764,7 @@ public void run() {
758764 for (TServerInstance server : currentServers ) {
759765 try {
760766 serversToShutdown .add (server );
761- tserverSet .getConnection (server ).fastHalt (managerLock );
767+ tserverSet .getConnection (server ).fastHalt (primaryManagerLock );
762768 } catch (TException e ) {
763769 // its probably down, and we don't care
764770 } finally {
@@ -823,7 +829,7 @@ private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> ts
823829 try {
824830 TServerConnection connection = tserverSet .getConnection (instance );
825831 if (connection != null ) {
826- connection .fastHalt (managerLock );
832+ connection .fastHalt (primaryManagerLock );
827833 }
828834 } catch (TException e ) {
829835 log .error ("{}" , e .getMessage (), e );
@@ -884,7 +890,7 @@ private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> ts
884890 try {
885891 TServerConnection connection2 = tserverSet .getConnection (server );
886892 if (connection2 != null ) {
887- connection2 .halt (managerLock );
893+ connection2 .halt (primaryManagerLock );
888894 }
889895 } catch (TTransportException e1 ) {
890896 // ignore: it's probably down
@@ -955,8 +961,12 @@ public void run() {
955961 CompactionCoordinatorService .Iface wrappedCoordinator =
956962 HighlyAvailableServiceWrapper .service (compactionCoordinator .getThriftService (), this );
957963
964+ // This is not wrapped w/ HighlyAvailableServiceWrapper because it can be run by any manager.
965+ // TODO However, should probably consider upgrade?
966+ FateWorker fateWorker = new FateWorker (context , tserverSet , this ::createFateInstance );
967+
958968 var processor = ThriftProcessorTypes .getManagerTProcessor (this , fateServiceHandler ,
959- wrappedCoordinator , managerClientHandler , getContext ());
969+ wrappedCoordinator , managerClientHandler , fateWorker , getContext ());
960970 try {
961971 updateThriftServer (() -> {
962972 return TServerUtils .createThriftServer (context , getBindAddress (),
@@ -975,14 +985,18 @@ public void run() {
975985
976986 MetricsInfo metricsInfo = getContext ().getMetricsInfo ();
977987
978- // Start manager assistant before getting lock, this allows non primary manager processes to
979- // work on stuff.
980- var shutdownComplete = getShutdownComplete ();
981- assitantManager = new ManagerAssistant (getContext (), getBindAddress (), tserverSet ,
982- this ::createFateInstance , shutdownComplete ::get );
983- assitantManager .start ();
988+ try {
989+ // Acquire the lock that all managers get before the primary lock, this allows non primary
990+ // manager processes to work on stuff.
991+ getManagerLock ();
992+ } catch (KeeperException | InterruptedException e ) {
993+ throw new IllegalStateException ("Unable to get manager lock " , e );
994+ }
995+
996+ fateWorker .setLock (managerLock );
997+
984998 metricsInfo
985- .addMetricsProducers (assitantManager .getMetricsProducers ().toArray (new MetricsProducer [0 ]));
999+ .addMetricsProducers (fateWorker .getMetricsProducers ().toArray (new MetricsProducer [0 ]));
9861000
9871001 metricsInfo .init (MetricsInfo .serviceTags (getContext ().getInstanceName (), getApplicationName (),
9881002 getAdvertiseAddress (), getResourceGroup ()));
@@ -995,7 +1009,7 @@ public void run() {
9951009 // for each of these services.
9961010 ServiceLockData sld ;
9971011 try {
998- sld = getManagerLock (context .getServerPaths ().createManagerPath ());
1012+ sld = getPrimaryManagerLock (context .getServerPaths ().createManagerPath ());
9991013 } catch (KeeperException | InterruptedException e ) {
10001014 throw new IllegalStateException ("Exception getting manager lock" , e );
10011015 }
@@ -1006,7 +1020,7 @@ public void run() {
10061020 recoveryManager = new RecoveryManager (this , timeToCacheRecoveryWalExistence );
10071021
10081022 context .getZooCache ().addZooCacheWatcher (new TableStateWatcher ((tableId , event ) -> {
1009- TableState state = getTableManager ().getTableState (tableId );
1023+ TableState state = context . getTableManager ().getTableState (tableId );
10101024 log .debug ("Table state transition to {} @ {}" , state , event );
10111025 nextEvent .event (tableId , "Table state in zookeeper changed for %s to %s" , tableId , state );
10121026 }));
@@ -1225,9 +1239,9 @@ boolean canSuspendTablets() {
12251239 }
12261240
12271241 sld = new ServiceLockData (descriptors );
1228- log .info ("Setting manager lock data to {}" , sld );
1242+ log .info ("Setting primary manager lock data to {}" , sld );
12291243 try {
1230- managerLock .replaceLockData (sld );
1244+ primaryManagerLock .replaceLockData (sld );
12311245 } catch (KeeperException | InterruptedException e ) {
12321246 throw new IllegalStateException ("Exception updating manager lock" , e );
12331247 }
@@ -1257,7 +1271,6 @@ boolean canSuspendTablets() {
12571271 log .debug ("Shutting down fate." );
12581272 fate (FateInstanceType .META ).close ();
12591273 fateManager .stop (Duration .ZERO );
1260- assitantManager .stop ();
12611274
12621275 splitter .stop ();
12631276
@@ -1315,8 +1328,8 @@ private void setupFate(ServerContext context, MetricsInfo metricsInfo) {
13151328 try {
13161329 Predicate <ZooUtil .LockID > isLockHeld =
13171330 lock -> ServiceLock .isLockHeld (context .getZooCache (), lock );
1318- var metaStore =
1319- new MetaFateStore < FateEnv >( context . getZooSession (), managerLock .getLockID (), isLockHeld );
1331+ var metaStore = new MetaFateStore < FateEnv >( context . getZooSession (),
1332+ primaryManagerLock .getLockID (), isLockHeld );
13201333 var metaInstance = createFateInstance (this , metaStore , context );
13211334 // configure this instance to process all data
13221335 metaInstance .setPartitions (Set .of (FatePartition .all (FateInstanceType .META )));
@@ -1444,16 +1457,63 @@ private long remaining(long deadline) {
14441457 return Math .max (1 , deadline - System .currentTimeMillis ());
14451458 }
14461459
1460+ private void getManagerLock () throws KeeperException , InterruptedException {
1461+ log .info ("trying to get assistant manager lock" );
1462+
1463+ final ZooReaderWriter zoo = getContext ().getZooSession ().asReaderWriter ();
1464+ try {
1465+
1466+ var advertiseAddress = getAdvertiseAddress ();
1467+
1468+ final ServiceLockPaths .ServiceLockPath zLockPath = getContext ().getServerPaths ()
1469+ .createManagerWorkerPath (getResourceGroup (), advertiseAddress );
1470+ ServiceLockSupport .createNonHaServiceLockPath (Type .MANAGER , zoo , zLockPath );
1471+ // TODO use same uuid?
1472+ var serverLockUUID = UUID .randomUUID ();
1473+ managerLock = new ServiceLock (getContext ().getZooSession (), zLockPath , serverLockUUID );
1474+ // TODO is the correct thing being done for shutdown?
1475+ ServiceLock .LockWatcher lw = new ServiceLockSupport .ServiceLockWatcher (Type .MANAGER ,
1476+ () -> getShutdownComplete ().get (),
1477+ (type ) -> getContext ().getLowMemoryDetector ().logGCInfo (getContext ().getConfiguration ()));
1478+
1479+ for (int i = 0 ; i < 120 / 5 ; i ++) {
1480+ zoo .putPersistentData (zLockPath .toString (), new byte [0 ], ZooUtil .NodeExistsPolicy .SKIP );
1481+
1482+ ServiceLockData .ServiceDescriptors descriptors = new ServiceLockData .ServiceDescriptors ();
1483+ for (ServiceLockData .ThriftService svc : new ServiceLockData .ThriftService [] {
1484+ ThriftService .FATE_WORKER }) { // TODO is this thrift service correct?
1485+ descriptors .addService (new ServiceLockData .ServiceDescriptor (serverLockUUID , svc ,
1486+ advertiseAddress .toString (), this .getResourceGroup ()));
1487+ }
1488+
1489+ if (managerLock .tryLock (lw , new ServiceLockData (descriptors ))) {
1490+ log .info ("Obtained manager assistant lock {}" , managerLock .getLockPath ());
1491+ this .getContext ().setServiceLock (managerLock );
1492+ return ;
1493+ }
1494+ log .info ("Waiting for manager assistant lock" );
1495+ sleepUninterruptibly (5 , TimeUnit .SECONDS );
1496+ }
1497+ String msg = "Too many retries, exiting." ;
1498+ log .info (msg );
1499+ throw new RuntimeException (msg );
1500+ } catch (Exception e ) {
1501+ log .info ("Could not obtain manager assistant lock, exiting." , e );
1502+ throw new RuntimeException (e );
1503+ }
1504+ }
1505+
14471506 @ Override
14481507 public ServiceLock getServiceLock () {
1449- return managerLock ;
1508+ return primaryManagerLock ;
14501509 }
14511510
1452- private ServiceLockData getManagerLock (final ServiceLockPath zManagerLoc )
1511+ private ServiceLockData getPrimaryManagerLock (final ServiceLockPath zManagerLoc )
14531512 throws KeeperException , InterruptedException {
14541513 var zooKeeper = getContext ().getZooSession ();
1455- log .info ("trying to get manager lock" );
1514+ log .info ("trying to get primary manager lock" );
14561515
1516+ // TODO do both locks need the same UUID? in #3262 both had the same uuid
14571517 UUID zooLockUUID = UUID .randomUUID ();
14581518
14591519 ServiceDescriptors descriptors = new ServiceDescriptors ();
@@ -1464,13 +1524,13 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
14641524 descriptors .addService (new ServiceDescriptor (zooLockUUID , ThriftService .NONE ,
14651525 ServerOpts .BIND_ALL_ADDRESSES , this .getResourceGroup ()));
14661526 ServiceLockData sld = new ServiceLockData (descriptors );
1467- managerLock = new ServiceLock (zooKeeper , zManagerLoc , zooLockUUID );
1527+ primaryManagerLock = new ServiceLock (zooKeeper , zManagerLoc , zooLockUUID );
14681528 HAServiceLockWatcher managerLockWatcher =
14691529 new HAServiceLockWatcher (Type .MANAGER , () -> getShutdownComplete ().get ());
14701530
14711531 while (true ) {
14721532
1473- managerLock .lock (managerLockWatcher , sld );
1533+ primaryManagerLock .lock (managerLockWatcher , sld );
14741534
14751535 managerLockWatcher .waitForChange ();
14761536
@@ -1483,12 +1543,10 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
14831543 throw new IllegalStateException ("manager lock in unknown state" );
14841544 }
14851545
1486- managerLock .tryToCancelAsyncLockOrUnlock ();
1546+ primaryManagerLock .tryToCancelAsyncLockOrUnlock ();
14871547
14881548 sleepUninterruptibly (TIME_TO_WAIT_BETWEEN_LOCK_CHECKS , MILLISECONDS );
14891549 }
1490-
1491- this .getContext ().setServiceLock (getServiceLock ());
14921550 return sld ;
14931551 }
14941552
@@ -1582,7 +1640,6 @@ public Set<TableId> onlineTables() {
15821640 return result ;
15831641 }
15841642
1585- @ Override
15861643 public Set <TServerInstance > onlineTabletServers () {
15871644 return tserverSet .getSnapshot ().getTservers ();
15881645 }
0 commit comments