diff --git a/examples/offline_inference_blend.py b/examples/offline_inference_blend.py new file mode 100644 index 00000000..4d9adbbe --- /dev/null +++ b/examples/offline_inference_blend.py @@ -0,0 +1,225 @@ +import contextlib +import json +import os +import re +import time +import csv +from dataclasses import asdict + +from vllm.v1.metrics.reader import Counter, Gauge, Histogram, Vector +from tqdm import tqdm +import random +random.seed(0) + +from transformers import AutoTokenizer +from vllm import LLM, SamplingParams +from vllm.config import KVTransferConfig +from vllm.engine.arg_utils import EngineArgs + +from ucm.logger import init_logger +from vllm.inputs import TokensPrompt + +import sys +logger = init_logger(__name__) + +model = "" +data_dir = "" +tokenizer = None +# 28705 is the token id for char in llama model +# 151643 is the pad token id in qwen model +chunk_end_token_id = -1 +chunk_pad_token_id = -1 +block_size = 64 + +def setup_environment_variables(): + os.environ["VLLM_USE_V1"] = "1" + os.environ["PYTHONHASHSEED"] = "123456" + + global model, data_dir, tokenizer, chunk_end_token_id, chunk_pad_token_id + model = os.getenv("MODEL_PATH", "/home/models/mistralai/Mistral-7B-Instruct-v0.2") + if not os.path.isdir(model): + model = input("Enter path to model, e.g./home/models/mistralai/Mistral-7B-Instruct-v0.2: ") + if not os.path.isdir(model): + print("Exiting. Incorrect model_path") + sys.exit(1) + + + data_dir = os.getenv("DATA_DIR", "/home/data/kv_cache") + if not os.path.isdir(data_dir): + data_dir = input( + "Enter the directory for UCMStore to save kv cache, e.g. /home/data/kv_cache: " + ) + create = input(f"Directory {data_dir} dose not exist. Create it? (Y/n): ") + if create.lower() == "y": + os.makedirs(data_dir, exist_ok=True) + else: + print("Exiting. Directory not created.") + sys.exit(1) + + tokenizer = AutoTokenizer.from_pretrained(model, use_chat_template=True) + # as for Qwen model, use pad_token_id for padding block + # as for Llama model, current use unk_token for padding block + chunk_pad_token_id = tokenizer.encode("▁", add_special_tokens=False)[0] + chunk_end_token_id = chunk_pad_token_id + + if tokenizer.pad_token_id is not None: + chunk_pad_token_id = tokenizer.pad_token_id + chunk_end_token_id = tokenizer.pad_token_id + + +@contextlib.contextmanager +def build_llm_with_uc(module_path: str, name: str, model: str): + ktc = KVTransferConfig( + kv_connector=name, + kv_connector_module_path=module_path, + kv_role="kv_both", + kv_connector_extra_config={ + "ucm_connector_name": "UcmNfsStore", + "ucm_connector_config": { + "storage_backends": data_dir, + "kv_block_size": 33554432, + }, + "use_layerwise": True, + "ucm_sparse_config": { + "Blend": { + "chunk_end_token_id": chunk_end_token_id, + "compute_meta": { + "model.layers.1.self_attn.attn": { + "ratio": 0.2, + }, + } + } + }, + }, + ) + + llm_args = EngineArgs( + model=model, + enforce_eager=True, + kv_transfer_config=ktc, + max_model_len=16384 * 2, + max_num_batched_tokens=16384 * 2, + gpu_memory_utilization=0.8, + block_size=block_size, + enable_prefix_caching=False, + distributed_executor_backend="mp", + tensor_parallel_size=1, + trust_remote_code=True, + ) + + llm = LLM(**asdict(llm_args)) + try: + yield llm + finally: + logger.info("LLM engine is exiting.") + + +def get_output( + llm: LLM, + prompt, + sampling_params: SamplingParams, +): + start = time.time() + outputs = llm.generate(prompt, sampling_params) + print("-" * 50) + generated_text = None + for output in outputs: + generated_text = output.outputs[0].text + e2e_time = time.time() - start + print("-" * 50) + return e2e_time, generated_text + + +def pad_rag_chunks(token_ids, block_size, pad_id, end_id): + """ + pad token_ids with pad_id and end up with end_id + """ + # assert pad_id != end_id + remainder = len(token_ids) % block_size + + if remainder == 0 and token_ids[-1] in [pad_id, end_id]: + # no need to pad + token_ids[-1] = end_id + return token_ids + + pad_len = block_size - remainder - 1 + padded = token_ids + [pad_id] * pad_len + [end_id] + return padded + + +def main(): + module_path = "ucm.integration.vllm.uc_connector" + name = "UnifiedCacheConnectorV1" + + setup_environment_variables() + + with build_llm_with_uc(module_path, name, model) as llm: + prefill_sampling_params = SamplingParams(temperature=0.0, top_p=0.95, max_tokens=1) + sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=128) + # choose one data row in LongBenchV1 (wikimqa) + dataset_row = {"input": "Where was the wife of Francis I Rákóczi born?", + "context": "Passage 1:\nWaldrada of Lotharingia\nWaldrada was the mistress, and later the wife, of Lothair II of Lotharingia.\n\nBiography\nWaldrada's family origin is uncertain. The prolific 19th-century French writer Baron Ernouf suggested that Waldrada was of noble Gallo-Roman descent, sister of Thietgaud, the bishop of Trier, and niece of Gunther, archbishop of Cologne. However, these suggestions are not supported by any evidence, and more recent studies have instead suggested she was of relatively undistinguished social origins, though still from an aristocratic milieu.\nThe Vita Sancti Deicoli states that Waldrada was related to Eberhard II, Count of Nordgau (included Strasbourg) and the family of Etichonids, though this is a late 10th-century source and so may not be entirely reliable on this question.In 855 the Carolingian king Lothar II married Teutberga, a Carolingian aristocrat and the daughter of Bosonid Boso the Elder. The marriage was arranged by Lothar's father Lothar I for political reasons. It is very probable that Waldrada was already Lothar II's mistress at this time.Teutberga was allegedly not capable of bearing children and Lothar's reign was chiefly occupied by his efforts to obtain an annulment of their marriage, and his relations with his uncles Charles the Bald and Louis the German were influenced by his desire to obtain their support for this endeavour. Lothair, whose desire for annulment was arguably prompted by his affection for Waldrada, put away Teutberga. However, Hucbert took up arms on his sister's behalf, and after she had submitted successfully to the ordeal of water, Lothair was compelled to restore her in 858. Still pursuing his purpose, he won the support of his brother, Emperor Louis II, by a cession of lands and obtained the consent of the local clergy to the annulment and to his marriage with Waldrada, which took place in 862. However, Pope Nicholas I was suspicious of this and sent legates to investigate at the Council of Metz in 863. The Council found in favour of Lothair's divorce, which led to rumours that the papal legates may have bribed and thus meant that Nicholas order Lothair to take Teutberga back or face excommunication. \nWith the support of Charles the Bald and Louis the German, Teutberga appealed the annulment to Pope Nicholas. Nicholas refused to recognize the annulment and excommunicated Waldrada in 866, forcing Lothair to abandon Waldrada in favour of Teutberga. Lothair accepted this begrudgingly for a time, but shortly afterward at the end of 867 Pope Nicholas I died. Thus, Lothair began to seek the permission of the newly appointed Pope Adrian II to again put Teutberga aside and marry Waldrada, riding to Rome to speak with him on the matter in 869. However, on his way home, Lothair died.\n\nChildren\nWaldrada and Lothair II had some sons and probably three daughters, all of whom were declared illegitimate:\n\nHugh (c. 855–895), Duke of Alsace (867–885)\nGisela (c. 865–908), who in 883 married Godfrey, the Viking leader ruling in Frisia, who was murdered in 885\nBertha (c. 863–925), who married Theobald of Arles (c. 854–895), count of Arles, nephew of Teutberga. They had two sons, Hugh of Italy and Boso of Tuscany. After Theobald's death, between 895 and 898 she married Adalbert II of Tuscany (c. 875–915) They had at least three children: Guy, who succeeded his father as count and duke of Lucca and margrave of Tuscany, Lambert succeeded his brother in 929, but lost the titles in 931 to his half-brother Boso of Tuscany, and Ermengard.\nErmengarde (d. 90?)\nOdo (d. c.879)\nPassage 2:\nFrancis I Rákóczi\nFrancis I Rákóczi (February 24, 1645, Gyulafehérvár, Transylvania – July 8, 1676, Zboró, Royal Hungary) was a Hungarian aristocrat, elected prince of Transylvania and father of Hungarian national hero Francis Rákóczi II.Francis Rákóczi was the son of George Rákóczi II, prince of Transylvania, and Sophia Báthory. He was elected prince by the Transylvanian Diet in 1652, during his father's life. However, because of the disastrous Polish campaign of 1657 and its consequences, the Ottoman Empire removed his father from the throne in 1660, and prohibited any Rákóczi to ascend the Transylvanian throne. This left Francis unable to come by his father's legacy; he therefore withdrew to his estates in Royal Hungary.\nNotably, the Rákóczi family was Calvinist, and they were staunch supporters of the Reformed Church in Hungary. However, Francis' mother, Sophia Báthory, had converted to Calvinism merely for the sake of her marriage. After her husband's death, she returned to Catholicism and supported the Counter Reformation. Francis Rákóczi also became a Catholic, thus acquiring favour with the Catholic Habsburg Court. His mother converted him to Catholicism. He was made a count in 1664.\nIn 1666 Francis married Jelena Zrinska (Hungarian: Zrínyi Ilona), a Croatian countess, and joined the Wesselényi conspiracy (Zrinski-Frankopan conspiracy in Croatia), one leader of which was Jelena's father, Petar Zrinski (Hungarian: Zrínyi Péter). Francis soon became the leader of the conspiracy, and, as a culmination of their anti-Habsburg stratagems, started an armed uprising of nobles in Upper Hungary, while the other conspirators were supposed to start the fight in Croatia. Due to poor organization and discord between the conspirators, however, the Austrian authorities were well informed; they quickly suppressed the Croatian branch of the revolt.\nWhen Rákóczi learned that Petar Zrinski had been captured by the Austrians, he laid down his arms and applied for mercy. All other leaders of the conspiracy were executed for high treason; Rákóczi, due to his mother's intervention, and for a ransom of 300,000 forints and several castles, was pardoned.\n\nIssue\nFrancis I had three children:\n\nGyörgy (1667)\nJulianna Borbála (1672–1717), married Count Ferdinand Gobert von Aspremont-Lynden (1643-1708)\nFrancis Rákóczi II (1676–1735)Francis II was born only three months before his father's death. He led a rebellion against Austrian rule (Rákóczi's War of Independence) and died in exile.\nPassage 3:\nMary Fiennes (lady-in-waiting)\nMary Fiennes (1495–1531) was an English courtier. She was the wife of Henry Norris. Norris was executed for treason as one of the alleged lovers of her cousin, Anne Boleyn, the second wife of King Henry VIII of England. Mary lived for six years at the French court as a Maid of Honour to queens consort Mary Tudor, wife of Louis XII; and Claude of France, wife of Francis I.\n\nFamily and early years\nMary was born at Herstmonceux Castle in Sussex in 1495, the only daughter of Thomas Fiennes, 8th Baron Dacre and Anne Bourchier. By both her father and mother she was descended from Edward III. She had two younger brothers, Sir Thomas and John. Her mother was an elder half-sister of Elizabeth Howard and Lord Edmund Howard, making queen consorts Anne Boleyn and Catherine Howard a cousin of Mary. Her paternal grandmother, Alice FitzHugh, was sister to Elizabeth FitzHugh, grandmother of Catherine Parr, making her cousin to yet another queen consort of Henry VIII.\nIn 1514, Mary was appointed a Maid of Honour to Princess Mary Tudor and accompanied her to France when the latter married King Louis XII of France; afterwards she served in the capacity to Queen Mary's successor, Queen Claude, consort of the new king Francis I of France. Among her fellow Maids of Honour were her cousins, Mary (a mistress of Henry VIII) and Anne Boleyn.\n\nMarriage and issue\nIn 1520 upon her return to England, she married the courtier, Henry Norreys (1491 – 17 May 1536) of Yattendon in Berkshire, whom she had met that same year at the Field of the Cloth of Gold in France.\nNorris served King Henry VIII of England as a Gentleman of the Bedchamber, and was held in high favour by the King. He was later appointed Groom of the Stool and continued to enjoy the King's favour. According to biographer Eric Ives, Norris was \"perhaps the nearest thing Henry had to a friend.\" Norris had control of King Henry's Privy chamber.\nHenry and Mary had three children:\nEdward Norris (died 1529)\nHenry Norris, 1st Baron Norreys (c. 1525 – 1601), married Margaret Williams of Rycote, by whom he had issue.\nMary Norris, married firstly Sir George Carew, and secondly Sir Arthur Champernowne, by whom she had issue.\n\nDeath\nMary died in 1531, a year after her mother. Five years later her husband was attainted and executed for treason as one of the five alleged lovers of her cousin Queen Anne Boleyn, who herself was beheaded at the Tower of London on 19 May 1536.\nTheir four orphaned children were raised by Norris's brother Sir John Norris.\n\nAncestry\nPassage 4:\nAgatha (wife of Samuel of Bulgaria)\nAgatha (Bulgarian: Агата, Greek: Άγάθη; fl. late 10th century) was the wife of Emperor Samuel of Bulgaria.\n\nBiography\nAccording to a later addition to the history of the late-11th-century Byzantine historian John Skylitzes, Agatha was a captive from Larissa, and the daughter of the magnate of Dyrrhachium, John Chryselios. Skylitzes explicitly refers to her as the mother of Samuel's heir Gavril Radomir, which means that she was probably Samuel's wife. On the other hand, Skylitzes later mentions that Gavril Radomir himself also took a beautiful captive, named Irene, from Larissa as his wife. According to the editors of the Prosopographie der mittelbyzantinischen Zeit, this may have been a source of confusion for a later copyist, and Agatha's real origin was not Larissa, but Dyrrhachium. According to the same work, it is likely that she had died by ca. 998, when her father surrendered Dyrrhachium to the Byzantine emperor Basil II.Only two of Samuel's and Agatha's children are definitely known by name: Gavril Radomir and Miroslava. Two further, unnamed, daughters are mentioned in 1018, while Samuel is also recorded as having had a bastard son.Agatha is one of the central characters in Dimitar Talev's novel Samuil.\nPassage 5:\nEmpress Shōken\nEmpress Dowager Shōken (昭憲皇太后, Shōken-kōtaigō, 9 May 1849 – 9 April 1914), born Masako Ichijō (一条勝子, Ichijō Masako), was the wife of Emperor Meiji of Japan. She is also known under the technically incorrect name Empress Shōken (昭憲皇后, Shōken-kōgō). She was one of the founders of the Japanese Red Cross Society, whose charity work was known throughout the First Sino-Japanese War.\n\nEarly life\nLady Masako Ichijō was born on 9 May 1849, in Heian-kyō, Japan. She was the third daughter of Tadayoshi Ichijō, former Minister of the Left and head of the Fujiwara clan's Ichijō branch. Her adoptive mother was one of Prince Fushimi Kuniie's daughters, but her biological mother was Tamiko Niihata, the daughter of a doctor from the Ichijō family. Unusually for the time, she had been vaccinated against smallpox. As a child, Masako was somewhat of a prodigy: she was able to read poetry from the Kokin Wakashū by the age of 4 and had composed some waka verses of her own by the age of 5. By age seven, she was able to read some texts in classical Chinese with some assistance and was studying Japanese calligraphy. By the age of 12, she had studied the koto and was fond of Noh drama. She excelled in the studies of finances, ikebana and Japanese tea ceremony.The major obstacle to Lady Masako's eligibility to become empress consort was the fact that she was 3 years older than Emperor Meiji, but this issue was resolved by changing her official birth date from 1849 to 1850. They became engaged on 2 September 1867, when she adopted the given name Haruko (美子), which was intended to reflect her \nserene beauty and diminutive size.\nThe Tokugawa Bakufu promised 15,000 ryō in gold for the wedding and assigned her an annual income of 500 koku, but as the Meiji Restoration occurred before the wedding could be completed, the promised amounts were never delivered. The wedding was delayed partly due to periods of mourning for Emperor Kōmei, for her brother Saneyoshi, and the political disturbances around Kyoto between 1867 and 1868.\n\nEmpress of Japan\nLady Haruko and Emperor Meiji's wedding was finally officially celebrated on 11 January 1869. She was the first imperial consort to receive the title of both nyōgō and of kōgō (literally, the emperor's wife, translated as \"empress consort\"), in several hundred years. However, it soon became clear that she was unable to bear children. Emperor Meiji already had 12 children by 5 concubines, though: as custom in Japanese monarchy, Empress Haruko adopted Yoshihito, her husband's eldest son by Lady Yanagihara Naruko, who became Crown Prince. On 8 November 1869, the Imperial House departed from Kyoto for the new capital of Tokyo. In a break from tradition, Emperor Meiji insisted that the Empress and the senior ladies-in-waiting should attend the educational lectures given to the Emperor on a regular basis about national conditions and developments in foreign nations.\n\nInfluence\nOn 30 July 1886, Empress Haruko attended the Peeresses School's graduation ceremony in Western clothing. On 10 August, the imperial couple received foreign guests in Western clothing for the first time when hosting a Western Music concert.From this point onward, the Empress' entourage wore only Western-style clothes in public, to the point that in January 1887 \nEmpress Haruko issued a memorandum on the subject: traditional Japanese dress was not only unsuited to modern life, but Western-style dress was closer than the kimono to clothes worn by Japanese women in ancient times.In the diplomatic field, Empress Haruko hosted the wife of former US President Ulysses S. Grant during his visit to Japan. She was also present for her husband's meetings with Hawaiian King Kalākaua in 1881. Later that same year, she helped host the visit of the sons of future British King Edward VII: Princes Albert Victor and George (future George V), who presented her with a pair of pet wallabies from Australia.On 26 November 1886, Empress Haruko accompanied her husband to Yokosuka, Kanagawa to observe the new Imperial Japanese Navy cruisers Naniwa and Takachiho firing torpedoes and performing other maneuvers. From 1887, the Empress was often at the Emperor's side in official visits to army maneuvers. When Emperor Meiji fell ill in 1888, Empress Haruko took his place in welcoming envoys from Siam, launching warships and visiting Tokyo Imperial University. In 1889, Empress Haruko accompanied Emperor Meiji on his official visit to Nagoya and Kyoto. While he continued on to visit naval bases at Kure and Sasebo, she went to Nara to worship at the principal Shinto shrines.Known throughout her tenure for her support of charity work and women's education during the First Sino-Japanese War (1894–95), Empress Haruko worked for the establishment of the Japanese Red Cross Society. She participated in the organization's administration, especially in their peacetime activities in which she created a money fund for the International Red Cross. Renamed \"The Empress Shōken Fund\", it is presently used for international welfare activities. After Emperor Meiji moved his military headquarters from Tokyo to Hiroshima to be closer to the lines of communications with his troops, Empress Haruko joined her husband in March 1895. While in Hiroshima, she insisted on visiting hospitals full of wounded soldiers every other day of her stay.\n\nDeath\nAfter Emperor Meiji's death in 1912, Empress Haruko was granted the title Empress Dowager (皇太后, Kōtaigō) by her adoptive son, Emperor Taishō. She died in 1914 at the Imperial Villa in Numazu, Shizuoka and was buried in the East Mound of the Fushimi Momoyama Ryo in Fushimi, Kyoto, next to her husband. Her soul was enshrined in Meiji Shrine in Tokyo. On 9 May 1914, she received the posthumous name Shōken Kōtaigō (昭憲皇太后). Her railway-carriage can be seen today in the Meiji Mura Museum, in Inuyama, Aichi prefecture.\n\nHonours\nNational\nGrand Cordon of the Order of the Precious Crown, 1 November 1888\n\nForeign\nShe received the following orders and decorations:\n Russian Empire: Grand Cross of the Order of St. Catherine, 13 December 1887\n Spain: Dame of the Order of Queen Maria Luisa, 29 November 1889\n Siam: Dame of the Order of the Royal House of Chakri, 12 October 1899\n German Empire: Dame of the Order of Louise, 1st Class, 19 May 1903\n Kingdom of Bavaria: Dame of Honour of the Order of Theresa, 29 February 1904\n Korean Empire: Grand Cordon of the Order of the Auspicious Phoenix, 27 July 1908\n\nAncestry\nSee also\nEmpress of Japan\nŌmiya Palace\n\nNotes\nPassage 6:\nEunoë (wife of Bogudes)\nEunoë Maura was the wife of Bogudes, King of Western Mauretania. Her name has also been spelled Euries or Euryes or Eunoa.\n\nBiography\nEarly life\nEunoë Maura was thought to be descended from Berbers, but her name is Greek so it appears she might have been from there or had Greek ancestry. She was likely of very high status, as she is mentioned by historian Suetonius in the same context as Cleopatra.\n\nMarriage\nAt an unspecified early date in her marriage to her husband Bogud he mounted an expedition along the Atlantic coast, seemingly venturing into the tropics. When he returned he presented his wife Eunoë with gigantic reeds and asparagus he had found on the journey.She is believed to have been a mistress of Julius Caesar. She may have replaced Cleopatra in Caesar's affections, when he arrived in North Africa prior to the Battle of Thapsus on 6 April 46 BC, the two were among several queens courted by Caesar. It is also possible that they first met in Spain if she accompanied her husband there on a campaign. Only a brief romance for the Roman, both Eunoe and Bogudes profited through gifts bestowed on them by Caesar. Caesar departed from Africa in June 46 BC, five and a half months after he landed.\n\nCultural depictions\nEunoë and Caesar's affair is greatly exaggerated and expanded on in the Medieval French prose work Faits des Romains. Jeanette Beer in her book A Medieval Caesar states that the Roman general is \"transformed into Caesar, the medieval chevalier\" in the text, and that the author is more interested in Caesar's sexual dominance over the queen than the political dominance he held over her husband Bogud. The text describes her; \"Eunoe was the most beautiful woman in four kingdoms — nevertheless, she was Moorish\", which Beer further analysed as being indicative of the fact that it was unimaginable to audiences of the time to believe that a lover of Caesar could be ugly, but that Moors still represented everything that was ugly to them.Eunoë has also been depicted in several novels about Caesar, as well as serialized stories in The Cornhill Magazine. In such fiction her character often serves as a foil for the relationship between Caesar and another woman, mostly Cleopatra, such as in The Memoirs of Cleopatra, The Bloodied Toga and When We Were Gods. In Song of the Nile she also plays a posthumous role as a person of interest for Cleopatra's daughter Selene II who became queen of Mauritania after her.Eunoe has also been depicted in a numismatic drawing by Italian artist and polymath Jacopo Strada, who lived in the 16th century. There is however no archaeological evidence of a coin that bears her name or picture.\n\nSee also\nWomen in ancient Rome\nPassage 7:\nCatherine Exley\nCatherine Exley (1779–1857) was an English diarist. She was the wife of a soldier who accompanied her husband when he served in Portugal, Spain, and Ireland during the Napoleonic Wars. Exley is best known as the author of a diary that gives an account of military life in that era from the viewpoint of the wife of a common soldier.\n\nBackground\nCatherine Whitaker was born at Leeds in 1779 and married Joshua Exley there in 1806. Between 1805 and 1815, Joshua served in the Second Battalion of the 34th Regiment of Foot, initially as a private and then for a little over two years, as a corporal. Exley accompanied her husband for a substantial portion of this time and in due course wrote an account that is probably unique in that it records and reflects on life in the British Army from the perspective of the wife of a soldier who did not reach the rank of an officer.\n\nThe diary\nCatherine's diary was first published as a booklet issued shortly after her death. A single copy of the booklet is known to exist, it was also reprinted in The Dewsbury Reporter during August 1923. The text of the diary is included in full in a more recently issued book, edited by Professor Rebecca Probert, along with essays on its military and religious context, the treatment of prisoners of war and the role of women in the British, French and Spanish armed forces during the Peninsular War.\nThe diary unfolds the hardships that both Catherine and her husband suffered during his military service, including one period when they both wrongly thought that the other had died. There are detailed accounts of the births and deaths of children, the cold, hunger and filthy conditions of military life and the horror of the aftermaths of battles. Details of the author's religious experiences which led her to membership of the Methodist church also appear. Exley wrote the diary during the last 20 years before her death, which took place in 1857 at Batley, Yorkshire.\nPassage 8:\nIlona Zrínyi\nCountess Ilona Zrínyi (Croatian: Jelena Zrinska, Hungarian: Zrínyi Ilona) (1643, Ozalj – 18 February 1703, Izmit) was a noblewoman and heroine. She was one of the last surviving members of the Croatian-Hungarian Zrinski/Zrínyi noble family. She was the daughter of Petar Zrinski, Ban (viceroy) of Croatia, the niece of both Miklós Zrínyi and Fran Krsto Frankopan and the wife of Francis Rákóczi I and Imre Thököly, as well as the mother of Francis Rákóczi II. She is remembered in history for her Defense of Palanok Castle against the Imperial army in 1685-1688, an act for which she was regarded a heroine in Hungary.\n\nLife\nEarly years and family\nIlona was born Ilona Zrínyi in Ozalj, present day Croatia. She was the eldest child of Croatian Ban, Peter Zrinyi, and his wife Katarina Zrinyi née Frankopan, a Croatian poet. Later her parents had two daughters, Judita Petronila (1652-1699), and Aurora Veronika (1658-1735), as well as a son, Ivan Antun (1651-1703). Ilona and her siblings were the last generation of descendants of the once-powerful Zrinski family.\nFrom her childhood, she was known for her beauty and good education. There is little information on her schooling; it is known though that she acquired a high level of knowledge within her family, not only from her father and mother, Croatian writers and erudite persons but from her uncle Nikola VII Zrinski as well.\n\nMarriages\nOn 1 March 1666, she married Francis Rákóczi, with whom she had three children: György, born in 1667, who died in infancy; Julianna, born in 1672; and Ferenc (commonly known as Francis Rákóczi II), born in 1676. On June 8, 1676, not long after Francis II's birth, the elder Francis died. The widowed Ilona requested guardianship of her children and was granted it, against the advice of Emperor Leopold I's advisers and against Francis I's will. In this way she also retained control over the vast Rákóczi estates, which included among them the castles of Regéc, Sárospatak, Makovica, and Munkács. In 1682 she married Imre Thököly and became an active partner in her second husband's Kuruc uprising against the Habsburgs.\n\nDefense of Munkács (Palanok) Castle\nAfter their defeat at the 1683 Battle of Vienna, both the Ottoman forces and Thököly's allied Kuruc fighters had no choice but to retreat, and Thököly quickly lost one Rákóczi castle after another. At the end of 1685, the Imperial army surrounded the last remaining stronghold, Munkacs Castle in today's Ukraine. Ilona Zrínyi alone defended the castle for three years (1685–1688) against the forces of General Antonio Caraffa.\n\nInternment, exile and death\nAfter the recapture of Buda, the situation became untenable, and on 17 January 1688, Ilona had no choice but to surrender the castle, with the understanding that the defenders would receive amnesty from the Emperor, and that the Rákóczi estates would remain in her children's name. Under this agreement, she and her children traveled immediately to Vienna, where in violation of the pact the children were taken from her. Ilona lived until 1691 in the convent of the Ursulines, where her daughter Julianna was also raised. Her son Francis was immediately taken to the Jesuit school in Neuhaus.\nAt the time, her husband, Thököly, was still fighting with his Kuruc rebels against the Habsburg army in Upper Hungary. When Habsburg General Heisler was captured by Thököly, a prisoner exchange was arranged, and Ilona joined her husband in Transylvania. In 1699, however, after the Treaty of Karlowitz was signed, both spouses, having found themselves on the losing side, had to go into exile in the Ottoman Empire. The countess lived in Galata, district of Constantinople, and later in Izmit, where she died on 18 February 1703. She was buried in the French church of Saint Benoit in Galata.\n\nDescendants\nFrom her first marriage with Francis Rákóczi, Ilona had three children:\n\nGyörgy (1667–1667)\nJulianna Borbála (September 1672 – 1717); married Count Ferdinand Gobert von Aspremont-Lynden (1643-1708)\nFrancis II (27 March 1676 – 8 April 1735)From her second marriage with Imre Thököly, Ilona had three children, all of whom died at a young age (including one she was pregnant with during the siege of Munkács).\n\nLegacy\nIlona Zrínyi is celebrated in Croatia and Hungary as one of the greatest national heroines, patriots and fighters for freedom, who opposed, although unsuccessfully, the autocracy and absolutism aspirations of the Habsburgs. Her even more famous son Francis II Rákóczi continued the struggle for the independence of Hungary (1703–1711).\nIn October 1906 the remains of the Croatian countess were reinterred with her son's in the St Elisabeth Cathedral in present-day Košice, Slovakia.\n\nHonors\nPostage stamp issued by Hungary on 28 September 1952.\n\nSee also\nHouse of Zrinski\nZrinski family tree\nZrinski–Frankopan conspiracy\nKuruc\nRákóczi's War for Independence\nWesselényi conspiracy\nPassage 9:\nArtaynte\nArtaynte (f. 478 BC), was the wife of the Crown Prince Darius.\n\nLife\nDaughter of an unnamed woman and Prince Masistes, a marshall of the armies during the invasion of Greece in 480-479 BC, and the brother of King Xerxes I.\nDuring the Greek campaign Xerxes developed a passionate desire for the wife of Masistes, but she would constantly resist and would not bend to his will. Upon his return to Sardis, the king endeavoured to bring about the marriage of his son Daris to Artaynte, the daughter of this woman the wife of Masistes, supposing that by doing so he could obtain her more easily.\nAfter moving to Susa he brought Artaynte to the royal house with him for his son Daris, but fell in love with her himself, and after obtaining her they became lovers. \nAt the behest of Xerxes, Artaynte committed adultery with him (Xerxes). When queen Amestris found out, she did not seek revenge against Artaynte, but against her mother, Masistes' wife, as Amestris thought that it was her connivance. On Xerxes' birthday, Amestris sent for his guards and mutilated Masistes' wife by cutting off her breasts and threw them to dogs, and her nose and ears and lips also, and cutting out her tongue as well. On seeing this, Masistes fled to Bactria to start a revolt, but was intercepted by Xerxes' army who killed him and his sons.\nPassage 10:\nHafsa Hatun\nHafsa Hatun (Ottoman Turkish: حفصه خاتون, \"young lioness\") was a Turkish princess, and a consort of Bayezid I, Sultan of the Ottoman Empire.\n\nLife\nHafsa Hatun was the daughter of Isa Bey, the ruler of the Aydinids. She was married to Bayezid in 1390, upon his conquest of the Aydinids. Her father had surrendered without a fight, and a marriage was arranged between her and Bayezid. Thereafter, Isa was sent into exile in Iznik, shorn of his power, where he subsequently died. Her marriage strengthened the bonds between the two families.\n\nCharities\nHafsa Hatun's public works are located within her father's territory and may have been built before she married Bayezid. She commissioned a fountain in Tire city and a Hermitage in Bademiye, and a mosque known as \"Hafsa Hatun Mosque\" between 1390 and 1392 from the money she received in her dowry.\n\nSee also\nOttoman dynasty\nOttoman Empire", + "answers": ["Ozalj"], "dataset": "2wikimqa"} + + systemPrompt = """ + You are a helpful assistant. + Please read the following Passages and answer the Question below. + """ + passages = re.findall(r"Passage\s+(\d+):(.*?)(?=Passage\s+\d+:|$)", dataset_row['context'], re.S) + chunks = [f"Passage {i}:{passages[i][1]}" for i in range(len(passages))] + # 构造问答请求 + question = ( + '\n Question: ' + + dataset_row['input'] + + 'Answer within 5 words.' + ) + origin_sys_prompt_ids = tokenizer.encode(systemPrompt) + padded_sys_prompt_ids = pad_rag_chunks(origin_sys_prompt_ids, block_size, chunk_pad_token_id, chunk_end_token_id) + # 1. sys prompt warm up + print(f"---------------1. sys prompt: warm up---------------") + get_output(llm, TokensPrompt(prompt_token_ids=padded_sys_prompt_ids), prefill_sampling_params) + time.sleep(0.5) + + padded_contexts_ids = [] + origin_prompt_ids = origin_sys_prompt_ids + padded_prompt_ids = padded_sys_prompt_ids + for text_chunk in chunks: + un_pad_ids = tokenizer.encode(text_chunk, add_special_tokens=False) + padded_ids = pad_rag_chunks(un_pad_ids, block_size, chunk_pad_token_id, chunk_end_token_id) + padded_prompt_ids = padded_prompt_ids + padded_ids + origin_prompt_ids = origin_prompt_ids + un_pad_ids + padded_contexts_ids.append(padded_ids) + + question_ids = tokenizer.encode(question, add_special_tokens=False) + padded_prompt_ids = padded_prompt_ids + question_ids + origin_prompt_ids = origin_prompt_ids + question_ids + + print(f"--------------- baseline with no cache blend ---------------") + # for nfs store,you should clean up DATA_DIR(/home/data/kv_cache) to get the full prefill compute time + baseline_time , baseline_gen_text = get_output(llm, TokensPrompt(prompt_token_ids=padded_prompt_ids), sampling_params) + time.sleep(0.5) + + print(f"--------------- cache rag chunks ---------------") + llm.generate([TokensPrompt(prompt_token_ids=ids) for ids in padded_contexts_ids], sampling_params) + time.sleep(0.5) + + print(f"--------------- warm up blend code ---------------") + warm_up_blend_prompt_ids = padded_sys_prompt_ids + for ids in reversed(padded_contexts_ids): + warm_up_blend_prompt_ids = warm_up_blend_prompt_ids + ids + warm_up_blend_prompt_ids = warm_up_blend_prompt_ids + question_ids + llm.generate(TokensPrompt(prompt_token_ids=warm_up_blend_prompt_ids), sampling_params) + time.sleep(0.5) + + print(f"--------------- rag chunks cache blend ---------------") + blend_time , blend_gen_text = get_output(llm, TokensPrompt(prompt_token_ids=padded_prompt_ids), sampling_params) + + print(f"Baseline generated text: {baseline_gen_text!r}") + print(f"Baseline generated cost time: {baseline_time:.2f} seconds") + print(f"Blend generated text: {blend_gen_text!r}") + print(f"Blend generated cost time: {blend_time:.2f} seconds") + print(f"Question:{dataset_row['input']}") + print(f"Golden answer:{dataset_row["answers"]}") + +if __name__ == "__main__": + main() diff --git a/ucm/integration/vllm/patch/patch_funcs/v092/vllm_adapt.py b/ucm/integration/vllm/patch/patch_funcs/v092/vllm_adapt.py index 2e84e52f..420e5a87 100644 --- a/ucm/integration/vllm/patch/patch_funcs/v092/vllm_adapt.py +++ b/ucm/integration/vllm/patch/patch_funcs/v092/vllm_adapt.py @@ -55,6 +55,8 @@ def _apply_adapt_patches() -> None: _patch_request_succeed_dumped_blocks() _patch_multi_connector() _patch_multiproc_executor() + _patch_llama_model() + _patch_qwen_model() except Exception as e: logger.error(f"Failed to apply aggre patch: {e}", exc_info=True) @@ -807,25 +809,110 @@ def _patch_attention_layer() -> None: from ucm.sparse.state import get_ucm_sparse, has_ucm_sparse + def attn_forward( + self, + query: torch.Tensor, + key: torch.Tensor, + value: torch.Tensor, + # For some alternate attention backends like MLA the attention output + # shape does not match the query shape, so we optionally let the model + # definition specify the output tensor shape. + output_shape: Optional[torch.Size] = None, + ) -> torch.Tensor: + """ + The KV cache is stored inside this class and is accessed via + `self.kv_cache`. + + Attention metadata (`attn_metadata`) is set using a context manager in + the model runner's `execute_model` method. It is accessed via forward + context using + `vllm.forward_context.get_forward_context().attn_metadata`. + """ + if self.calculate_kv_scales: + attn_metadata = get_forward_context().attn_metadata + if attn_metadata.enable_kv_scales_calculation: + self.calc_kv_scales(query, key, value) + if self.use_output: + output_shape = (output_shape + if output_shape is not None else query.shape) + output = torch.zeros(output_shape, + dtype=query.dtype, + device=query.device) + hidden_size = output_shape[-1] + # We skip reshaping query, key and value tensors for the MLA + # backend since these tensors have different semantics and are + # processed differently. + if not self.use_mla: + # Reshape the query, key, and value tensors. + # NOTE(woosuk): We do this outside the custom op to minimize the + # CPU overheads from the non-CUDA-graph regions. + query = query.view(-1, self.num_heads, self.head_size) + output = output.view(-1, self.num_heads, self.head_size) + if key is not None: + key = key.view(-1, self.num_kv_heads, self.head_size) + if value is not None: + value = value.view(-1, self.num_kv_heads, self.head_size) + if self.use_direct_call: + forward_context: ForwardContext = get_forward_context() + attn_metadata = forward_context.attn_metadata + if isinstance(attn_metadata, dict): + attn_metadata = attn_metadata[self.layer_name] + self_kv_cache = self.kv_cache[forward_context.virtual_engine] + self.impl.forward(self, + query, + key, + value, + self_kv_cache, + attn_metadata, + output=output) + else: + torch.ops.vllm.unified_attention_with_output( + query, key, value, output, self.layer_name) + ###################### + ### UCM PATCH START### + output = maybe_execute_sparse_attention_end(output, self.layer_name) + ### UCM PATCH END ### + ###################### + return output.view(-1, hidden_size) + else: + if self.use_direct_call: + forward_context = get_forward_context() + attn_metadata = forward_context.attn_metadata + if isinstance(attn_metadata, dict): + attn_metadata = attn_metadata[self.layer_name] + self_kv_cache = self.kv_cache[forward_context.virtual_engine] + return self.impl.forward(self, query, key, value, + self_kv_cache, attn_metadata) + else: + return torch.ops.vllm.unified_attention( + query, key, value, self.layer_name) + + def maybe_execute_sparse_attention_end(output: torch.Tensor, layer_name: str): + if not has_ucm_sparse(): + return output + ucm_sparse = get_ucm_sparse() + return ucm_sparse.attention_end(output, layer_name) + def maybe_execute_sparse_attention_begin( query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, layer_name: str, forward_context: ForwardContext, + output: Optional[torch.Tensor] = None, phase: Optional[str] = None, ): if not has_ucm_sparse(): - return + return query, key, value, output ucm_sparse = get_ucm_sparse() attn_metadata = forward_context.attn_metadata if attn_metadata is None: - return + return query, key, value, output - ucm_sparse.attention_begin( - query, key, value, layer_name, forward_context, phase + return ucm_sparse.attention_begin( + query, key, value, layer_name, forward_context, output, phase ) def maybe_execute_sparse_attention_finished( @@ -881,7 +968,7 @@ def unified_attention_impl( attn_metadata = attn_metadata[layer_name] self = forward_context.no_compile_layers[layer_name] kv_cache = self.kv_cache[forward_context.virtual_engine] - maybe_execute_sparse_attention_begin( + query, key, value, _ = maybe_execute_sparse_attention_begin( query, key, value, layer_name, forward_context ) output = self.impl.forward(self, query, key, value, kv_cache, attn_metadata) @@ -907,8 +994,8 @@ def unified_attention_with_output_impl( self = forward_context.no_compile_layers[layer_name] kv_cache = self.kv_cache[forward_context.virtual_engine] if not self.use_mla: - maybe_execute_sparse_attention_begin( - query, key, value, layer_name, forward_context + query, key, value, output = maybe_execute_sparse_attention_begin( + query, key, value, layer_name, forward_context, output ) self.impl.forward( self, @@ -941,6 +1028,8 @@ def unified_attention_with_output_impl( layer.maybe_execute_sparse_attention_finished = ( maybe_execute_sparse_attention_finished ) + layer.maybe_execute_sparse_attention_end = maybe_execute_sparse_attention_end + layer.Attention.forward = attn_forward layer.unified_attention = unified_attention_impl layer.unified_attention_with_output = unified_attention_with_output_impl @@ -2303,17 +2392,23 @@ def maybe_execute_ucm_sparse_begin( ): if not has_ucm_sparse(): return + + if has_kv_transfer_group(): + uc_connector = get_kv_transfer_group() + # maybe should move to "maybe_setup_kv_connector" + uc_connector.setup_model(self.model) + ucm_sparse = get_ucm_sparse() ucm_sparse.build_sparse_meta( scheduler_output, self.requests, self.input_batch, attn_metadata ) ucm_sparse.execute_begin(scheduler_output) - def maybe_execute_ucm_sparse_finished(self): + def maybe_execute_ucm_sparse_finished(self, logits_indices): if not has_ucm_sparse(): - return + return logits_indices ucm_sparse = get_ucm_sparse() - ucm_sparse.execute_finished() + return ucm_sparse.execute_finished(logits_indices) def ucm_sparse_request_finished_in_worker(self, request_id: str | int): if not has_ucm_sparse(): @@ -2918,7 +3013,7 @@ def execute_model( ) finished_dumping = self.maybe_wait_for_kv_save() - self.maybe_execute_ucm_sparse_finished() + logits_indices = self.maybe_execute_ucm_sparse_finished(logits_indices) finished_sending, finished_recving = self.get_finished_kv_transfers( scheduler_output @@ -3162,3 +3257,202 @@ def patched_init_worker_distributed_environment( ) except ImportError: logger.warning("Could not patch gpu worker - module not found") + +# ==================== vllm/model_executor/models/llama.py ==================== +def _patch_llama_model() -> None: + """Patch gpu worker to add UCM sparse support.""" + try: + from typing import Optional, Union + from vllm.distributed import get_pp_group + from vllm.sequence import IntermediateTensors + from vllm.config import VllmConfig + from vllm.model_executor.models.llama import LlamaDecoderLayer, LlamaModel + import torch + + from ucm.sparse.state import has_ucm_sparse, get_ucm_sparse + def maybe_execute_sparse_layer_finished(positions: torch.Tensor, hidden_states: torch.Tensor): + if not has_ucm_sparse(): + return positions + ucm_spare = get_ucm_sparse() + # after sparse, n_tokens of source tensor is larger than target + return ucm_spare.layer_finished(positions, hidden_states) + + def maybe_execute_sparse_self_attention_finished(residual: torch.Tensor, hidden_states: torch.Tensor): + if not has_ucm_sparse(): + return residual + ucm_spare = get_ucm_sparse() + # after sparse, n_tokens of source tensor is larger than target + return ucm_spare.self_attention_finished(residual, hidden_states) + + def llamaDecoderLayer_forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + residual: Optional[torch.Tensor], + ) -> tuple[torch.Tensor, torch.Tensor]: + # Self Attention + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm( + hidden_states, residual) + hidden_states = self.self_attn(positions=positions, + hidden_states=hidden_states) + ###################### + ### UCM PATCH START### + residual = maybe_execute_sparse_self_attention_finished(residual, hidden_states) + ### UCM PATCH END ### + ###################### + # Fully Connected + hidden_states, residual = self.post_attention_layernorm( + hidden_states, residual) + hidden_states = self.mlp(hidden_states) + return hidden_states, residual + + LlamaDecoderLayer.forward = llamaDecoderLayer_forward + + def llamaModel_forward( + self, + input_ids: Optional[torch.Tensor], + positions: torch.Tensor, + intermediate_tensors: Optional[IntermediateTensors], + inputs_embeds: Optional[torch.Tensor] = None, + ) -> Union[torch.Tensor, IntermediateTensors, tuple[torch.Tensor, + list[torch.Tensor]]]: + if get_pp_group().is_first_rank: + if inputs_embeds is not None: + hidden_states = inputs_embeds + else: + hidden_states = self.get_input_embeddings(input_ids) + residual = None + else: + assert intermediate_tensors is not None + hidden_states = intermediate_tensors["hidden_states"] + residual = intermediate_tensors["residual"] + + aux_hidden_states = [] + for idx, layer in enumerate( + self.layers[self.start_layer:self.end_layer]): + if idx in self.aux_hidden_state_layers: + aux_hidden_states.append(hidden_states + residual) + hidden_states, residual = layer(positions, hidden_states, residual) + ###################### + ### UCM PATCH START### + positions = maybe_execute_sparse_layer_finished(positions, hidden_states) + ### UCM PATCH END ### + ###################### + + if not get_pp_group().is_last_rank: + return IntermediateTensors({ + "hidden_states": hidden_states, + "residual": residual + }) + + hidden_states, _ = self.norm(hidden_states, residual) + + if len(aux_hidden_states) > 0: + return hidden_states, aux_hidden_states + return hidden_states + + LlamaModel.forward = llamaModel_forward + + except ImportError: + logger.warning("Could not patch llama modelr - module not found") + +# ==================== vllm/model_executor/models/qwen2.py ==================== +def _patch_qwen_model() -> None: + """Patch gpu worker to add UCM sparse support.""" + try: + from typing import Optional, Union + from vllm.distributed import get_pp_group + from vllm.sequence import IntermediateTensors + from vllm.config import VllmConfig + from vllm.model_executor.models.qwen2 import Qwen2DecoderLayer, Qwen2Model + import torch + + from ucm.sparse.state import has_ucm_sparse, get_ucm_sparse + def maybe_execute_sparse_layer_finished(positions: torch.Tensor, hidden_states: torch.Tensor): + if not has_ucm_sparse(): + return positions + ucm_spare = get_ucm_sparse() + # after sparse, n_tokens of source tensor is larger than target + return ucm_spare.layer_finished(positions, hidden_states) + + def maybe_execute_sparse_self_attention_finished(residual: torch.Tensor, hidden_states: torch.Tensor): + if not has_ucm_sparse(): + return residual + ucm_spare = get_ucm_sparse() + # after sparse, n_tokens of source tensor is larger than target + return ucm_spare.self_attention_finished(residual, hidden_states) + + def qwen2DecoderLayer_forward( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor, + residual: Optional[torch.Tensor], + ) -> tuple[torch.Tensor, torch.Tensor]: + # Self Attention + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm( + hidden_states, residual) + hidden_states = self.self_attn( + positions=positions, + hidden_states=hidden_states, + ) + ###################### + ### UCM PATCH START### + residual = maybe_execute_sparse_self_attention_finished(residual, hidden_states) + ### UCM PATCH END ### + ###################### + # Fully Connected + hidden_states, residual = self.post_attention_layernorm( + hidden_states, residual) + hidden_states = self.mlp(hidden_states) + return hidden_states, residual + + Qwen2DecoderLayer.forward = qwen2DecoderLayer_forward + + def qwen2Model_forward( + self, + input_ids: torch.Tensor, + positions: torch.Tensor, + intermediate_tensors: Optional[IntermediateTensors] = None, + inputs_embeds: Optional[torch.Tensor] = None, + ) -> Union[torch.Tensor, IntermediateTensors]: + if get_pp_group().is_first_rank: + if inputs_embeds is not None: + hidden_states = inputs_embeds + else: + hidden_states = self.get_input_embeddings(input_ids) + residual = None + else: + assert intermediate_tensors is not None + hidden_states = intermediate_tensors["hidden_states"] + residual = intermediate_tensors["residual"] + for layer in self.layers[self.start_layer:self.end_layer]: + hidden_states, residual = layer( + positions, + hidden_states, + residual, + ) + ###################### + ### UCM PATCH START### + positions = maybe_execute_sparse_layer_finished(positions, hidden_states) + ### UCM PATCH END ### + ###################### + if not get_pp_group().is_last_rank: + return IntermediateTensors({ + "hidden_states": hidden_states, + "residual": residual + }) + hidden_states, _ = self.norm(hidden_states, residual) + return hidden_states + + Qwen2Model.forward = qwen2Model_forward + + except ImportError: + logger.warning("Could not patch llama modelr - module not found") diff --git a/ucm/integration/vllm/uc_connector.py b/ucm/integration/vllm/uc_connector.py index 2271e5e2..b7f066ef 100644 --- a/ucm/integration/vllm/uc_connector.py +++ b/ucm/integration/vllm/uc_connector.py @@ -44,6 +44,7 @@ from ucm.logger import init_logger from ucm.store.factory import UcmConnectorFactory from ucm.store.ucmstore import Task +from ucm.sparse.blend.chunk_processor import ChunkProcessor, ChunkMetaData, hash_token_ids if TYPE_CHECKING: from vllm.attention.backends.abstract import AttentionMetadata @@ -78,6 +79,8 @@ class ReqMeta: dump_blocks: list[tuple[str, int]] = field(default_factory=list) # Whether use load_async load_async: bool = False + # blend rag cache + chunks_load_meta: list[ChunkMetaData] = field(default_factory=list) @dataclass @@ -160,6 +163,18 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): ] ) + self.enable_blend = False + self.req2rag_load_chunks: dict[str, list[ChunkMetaData]] = {} + end_token_id = 0 + if (("ucm_sparse_config" in self._vllm_config.kv_transfer_config.kv_connector_extra_config) + and "Blend" in self._vllm_config.kv_transfer_config.kv_connector_extra_config["ucm_sparse_config"]): + ucm_blend_config = self._vllm_config.kv_transfer_config.kv_connector_extra_config[ "ucm_sparse_config" ]["Blend"] + self.enable_blend = True + end_token_id = ucm_blend_config["chunk_end_token_id"] + self.chunk_processor = ChunkProcessor( + config={'chunk_end_token_id': end_token_id, 'block_size': self.block_size} + ) + def _init_kv_caches_from_forward_context(self, forward_context: "ForwardContext"): for layer_name in forward_context.no_compile_layers: attn_layer = forward_context.no_compile_layers[layer_name] @@ -303,6 +318,10 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None: self._load_failed_reqs.add(req_id) logger.error(f"Failed to load blocks for req {req_id}") + def setup_model(self, model) -> None: + # get cos_sin_embedding cache for kv cache load post process (block-wise delta rope) + self.chunk_processor.setup_rotary_emb(model) + def wait_for_layer_load(self, layer_name: str) -> None: """ Block until the KV for a specific layer is loaded into vLLM's @@ -333,6 +352,18 @@ def wait_for_layer_load(self, layer_name: str) -> None: ) continue logger.debug(f"Load tasks for {request_id} on layer {layer_name} finished.") + # prepare rerope for rag chunk k cache + k_cache = self.kv_caches[layer_name][0] + all_hits_vllm_ids = [] + positions = [] + for reqMeta in self._connector_metadata.requests: + for meta in reqMeta.chunks_load_meta: + all_hits_vllm_ids.extend(meta.hits_vllm_blk_ids) + positions.extend([meta.position_offset]*len(meta.hits_vllm_blk_ids)) + if all_hits_vllm_ids: + vllm_ids = torch.tensor(all_hits_vllm_ids, device=k_cache.device) + positions = torch.tensor(positions, device=k_cache.device) + self.chunk_processor.process_chunk_cache(k_cache, vllm_ids, positions) def save_kv_layer( self, @@ -548,8 +579,8 @@ def hash_request_tokens( hash_value = hash_function( (parent_block_hash_value, block_token_ids_tuple) ) - parent_block_hash_value = hash_value - ret.append(str(hash_value)) + parent_block_hash_value = str(hash_value) + ret.append(parent_block_hash_value) return ret @@ -562,7 +593,10 @@ def hash_request_tokens( # Calculate start position (exclude blocks already in HBM) start_position = num_computed_tokens // self.block_size - block_operations = [BlockOperation.NONE] * len(block_hashes) + # state machine + # default to dump all blks, when blk allocation fail, turn to be NONE, when cache hit, turn to be LOAD + block_operations = ([BlockOperation.NONE] * start_position + + [BlockOperation.DUMP] * (len(block_hashes) - start_position)) remain_hashes = block_hashes[start_position:] if not remain_hashes: @@ -580,10 +614,57 @@ def hash_request_tokens( else: # TODO we will fix hole match later break + + num_rag_lookup_hits = 0 + if self.enable_blend: + # for unmatched blocks, further match the rag chunk + rag_start_blk_idx = start_position + num_lookup_hits + rag_chunks_meta, is_build_cache = self.chunk_processor.process_request(request, md5, rag_start_blk_idx) + + if not is_build_cache: + # blend stage + final_rag_chunks_meta = [] + old_lookup_results = [False] + old_chunk_meta = None + for chunk_meta in rag_chunks_meta: + lookup_results = self.connector.lookup(chunk_meta.chunk_blks_hash) + chunk_meta.store_hits = lookup_results + final_rag_chunks_meta.append(chunk_meta) + if sum(lookup_results) == 0 and old_lookup_results[-1]: + # current whole chunk is miss and last chunk's last blk hit, try to merge chunk + merge_tokens = request.prompt_token_ids[chunk_meta.start_idx_in_req:chunk_meta.end_idx_in_req] + merge_chunk_blks_hash = hash_token_ids(md5, self.block_size, merge_tokens, old_chunk_meta.chunk_blks_hash[-1]) + merge_lookup_results = self.connector.lookup(merge_chunk_blks_hash) + if merge_lookup_results[0]: + # current chunk meta need to merge into old chunk meta + chunk_meta.store_hits = merge_lookup_results + chunk_meta.chunk_blks_hash = merge_chunk_blks_hash + self.chunk_processor.merge_chunks(old_chunk_meta, chunk_meta) + final_rag_chunks_meta.pop() + for i, hit in enumerate(chunk_meta.store_hits): + # replace the origin pc hash with chunk pc hash + # maybe we should also invalid the block hash in HBM's block manager, cause after cache blend, + # the kv cache of rag chunk in HBM is recomputed, they can contact all chunks in this req. + block_hashes[rag_start_blk_idx] = chunk_meta.chunk_blks_hash[i] + if hit: + num_rag_lookup_hits += 1 + block_operations[rag_start_blk_idx] = BlockOperation.LOAD + else: + # cache blend can recompute the missing hole, but this cache is no longer context independent + block_operations[rag_start_blk_idx] = BlockOperation.NONE + pass + rag_start_blk_idx += 1 + old_chunk_meta = final_rag_chunks_meta[-1] + old_lookup_results = old_chunk_meta.store_hits + + if num_rag_lookup_hits: + self.req2rag_load_chunks[request.request_id] = final_rag_chunks_meta + logger.info( f"num_total_blocks: {len(block_hashes)}, " f"num_lookup_hits on hbm: {start_position}, " - f"num_lookup_hits on storage except hbm: {num_lookup_hits}" + f"num_lookup_hits on storage except hbm: {num_lookup_hits}, " + f"num_lookup_hits on rag chunk: {num_rag_lookup_hits}" ) # Load async when Decode instance need to load @@ -643,17 +724,20 @@ def update_state_after_alloc( block_operations = request_block_info.block_operations block_hashes = request_block_info.block_hashes start_create_pos = start_position + num_external_tokens // self.block_size - remaining_hashes = block_hashes[start_create_pos:] - if remaining_hashes: - create_results = self.connector.create(remaining_hashes) + need_dump_blks = [] + need_dump_blks_idx = [] + for idx in range(start_create_pos, len(block_hashes)): + # for chunk cache hit, no need to save + if block_operations[idx] == BlockOperation.DUMP: + need_dump_blks.append(block_hashes[idx]) + need_dump_blks_idx.append(idx) + if need_dump_blks: + create_results = self.connector.create(need_dump_blks) if any(ret != 0 for ret in create_results): logger.warning(f"\ncreate_results on storage: {create_results}\n") - for j, ret in enumerate(create_results): - idx = start_create_pos + j - block_operations[idx] = ( - BlockOperation.DUMP if ret == 0 else BlockOperation.NONE - ) - # set start_position to 0, so that we can process from the beginning + for i, ret in enumerate(create_results): + block_operations[need_dump_blks_idx[i]] = BlockOperation.DUMP if ret == 0 else BlockOperation.NONE + # set start_position to 0, so that we can process from the beginning request_block_info.start_position = 0 def build_connector_meta( @@ -694,11 +778,16 @@ def build_connector_meta( vllm_block_ids, block_info ) if load_blocks or dump_blocks: + chunks_load_meta = self.req2rag_load_chunks.pop(req_id, []) + for chunk_meta in chunks_load_meta: + chunk_meta.vllm_blk_ids = vllm_block_ids[chunk_meta.start_idx_in_req_blks: chunk_meta.end_idx_in_req_blks] + meta.requests.append( ReqMeta( request_id=req_id, load_blocks=load_blocks, dump_blocks=dump_blocks, + chunks_load_meta=chunks_load_meta, ) ) diff --git a/ucm/sparse/base.py b/ucm/sparse/base.py index ed62ab30..dae8fc65 100644 --- a/ucm/sparse/base.py +++ b/ucm/sparse/base.py @@ -23,7 +23,7 @@ import enum from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, List, Optional, Union, Tuple if TYPE_CHECKING: from vllm.v1.core.sched.output import SchedulerOutput @@ -117,11 +117,11 @@ def execute_begin(self, scheduler_output: SchedulerOutput): """ pass - def execute_finished(self): + def execute_finished(self, logits_indices :torch.Tensor) -> torch.Tensor: """ This is called at the end of "ModelRunner->execute_model" function. """ - pass + return logits_indices def attention_begin( self, @@ -130,8 +130,9 @@ def attention_begin( value: torch.Tensor, layer_name: str, forward_context: ForwardContext, + output: Optional[torch.Tensor] = None, phase: Optional[str] = None, - ) -> None: + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """ This is called at the beginning of "unified_attention". Sparse attention algorithm can modify forward_context.attn_metadata if necessary. @@ -154,6 +155,38 @@ def attention_finished( """ pass + def attention_end( + self, + attn_output: torch.Tensor, + layer_name: str + ) -> torch.Tensor: + """ + This is called at the end of Attention Forward. + For Blend, we "sparse" the prefill cached tokens + """ + return attn_output + + def self_attention_finished( + self, + residual: torch.Tensor, + hidden_states: torch.Tensor + ) -> torch.Tensor: + """ + This is called at the end of Self Attention Forward for each DecodeLayer. + """ + return residual + + def layer_finished( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor + ) -> torch.Tensor: + """ + This is called at the end of Self Attention Forward for each DecodeLayer. + """ + return positions + + def request_finished_in_worker(self, request_id: Union[int, str]): """ This function releases the resources of finished requests at worker-side. diff --git a/ucm/sparse/cache_blend/README.md b/ucm/sparse/blend/README.md similarity index 100% rename from ucm/sparse/cache_blend/README.md rename to ucm/sparse/blend/README.md diff --git a/ucm/sparse/blend/__init__.py b/ucm/sparse/blend/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ucm/sparse/blend/blend.py b/ucm/sparse/blend/blend.py new file mode 100644 index 00000000..3b201617 --- /dev/null +++ b/ucm/sparse/blend/blend.py @@ -0,0 +1,282 @@ +from typing import TYPE_CHECKING, Dict, List, Optional, Union, Tuple + +import torch +import time +from sympy import false +from torch import Tensor + +from ucm.logger import init_logger +from dataclasses import dataclass, field + +from ucm.store.dramstore.dramstore_connector import device + +logger = init_logger(__name__) + +from vllm.forward_context import ForwardContext +from vllm.v1.core.sched.output import SchedulerOutput +from vllm.v1.request import Request +from vllm.config import VllmConfig +from ucm.sparse.base import ( + INVALID_SLOT, + UcmSparseBase, + UcmSparseMetadata, + UcmSparseRole, +) +from ucm.sparse.blend.chunk_processor import ChunkMetaData +from ucm.sparse.utils import round_up + +def get_num_blks(num_tokens, block_size): + return (num_tokens + block_size - 1) // block_size + +@dataclass +class ReqMeta: + req_idx: int = 0 + need_blend: bool = false + + prefix_len: int = 0 + prefix_blk_len: int = 0 + + chunks_len: int = 0 + chunks_blk_len: int = 0 + + suffix_len: int = 0 + suffix_blk_len: int = 0 + + chunk_hit_mask: List[bool] = field(default_factory=list) + + chunk_hit_blk_len : int = 0 + + +@dataclass +class BlendMetaData(UcmSparseMetadata): + requests: list[ReqMeta] = field(default_factory=list) + compute_mask: Tensor = None + chunk_blks_hit_mask: Tensor = None + query_lens: Tensor = None + blend_start_req_idx: int = 0 + need_re_index: bool = False + + def add_request( + self, + idx: int, + chunks_load_meta: list[ChunkMetaData], + seq_lens: Tensor, + block_size: int + ) -> None: + if chunks_load_meta: + hit_mask = [] + req_idx = idx + self.blend_start_req_idx + for meta in chunks_load_meta: + hit_mask.extend(meta.store_hits) + reqMeta = ReqMeta( + req_idx=req_idx, + prefix_len=chunks_load_meta[0].start_idx_in_req, + prefix_blk_len=get_num_blks(chunks_load_meta[0].start_idx_in_req, block_size), + chunks_len=len(hit_mask) * block_size, + chunks_blk_len=len(hit_mask), + chunk_hit_mask=hit_mask, + chunk_hit_blk_len=sum(hit_mask) + ) + reqMeta.need_blend = reqMeta.chunk_hit_blk_len > 0 + reqMeta.suffix_len = seq_lens[req_idx].item() - reqMeta.prefix_len - reqMeta.chunks_len + reqMeta.suffix_blk_len = get_num_blks(reqMeta.suffix_len, block_size) + + self.requests.append(reqMeta) + + def reset_compute_mask(self) -> None: + self.compute_mask.fill_(False) + # for decode req in the front of the batch + self.compute_mask[:self.blend_start_req_idx] = True + + def update_req_compute_mask(self,req_query_start, req_chunk_end, req_query_end, chunk_hit_mask, top_k_indices): + chunks = self.compute_mask[req_query_start:req_chunk_end] + chunks = chunks.reshape(len(chunk_hit_mask), -1) + + # for chunk block cache miss part, just recompute + chunks.masked_fill_(~chunk_hit_mask.unsqueeze(1), True) + + flat = chunks.view(-1) + # for chunk block cache hit part, just recompute HKVD(highest KV deviation) tokens + flat[top_k_indices] = True + + # for question part, default + self.compute_mask[req_chunk_end:req_query_end].fill_(True) + + +class Blend(UcmSparseBase): + def __init__(self, vllm_config: VllmConfig, role: UcmSparseRole): + super().__init__(vllm_config, role) + self.blend_config = vllm_config.kv_transfer_config.kv_connector_extra_config[ + "ucm_sparse_config" + ]["Blend"] + + max_model_len = vllm_config.model_config.max_model_len + self.block_size = vllm_config.cache_config.block_size + + self.device = vllm_config.device_config.device + self.forward_mask = torch.zeros(max_model_len, device=self.device).bool() + self.mask_idx = torch.arange(round_up(max_model_len, self.block_size), device=self.device) + self.mask_idx = self.mask_idx.reshape(-1, self.block_size) + + # for multi batch, ignore the decode-stage req at the beginning + self.blend_start_req_idx = 0 + + self.compute_meta = self.blend_config['compute_meta'] + self.blend_req_metas: BlendMetaData = BlendMetaData( + need_re_index=False, + chunk_blks_hit_mask = torch.zeros(round_up(max_model_len, self.block_size), device=self.device).bool() + ) + self.attn_metadata = None + + def reset_blend_meta(self): + self.blend_req_metas.need_re_index = False + self.blend_req_metas.requests=[] + + + def build_sparse_meta( + self, scheduler_output, requests, input_batch, attn_metadata + ) -> UcmSparseMetadata: + + if isinstance(attn_metadata, dict): + attn_metadata = next(iter(attn_metadata.values())) + self.attn_metadata = attn_metadata + + # current not support chunk prefill + # for multi req in one batch, we should discard the decode req + self.reset_blend_meta() + self.blend_req_metas.compute_mask = self.forward_mask[:attn_metadata.query_start_loc[-1]] + + self.blend_req_metas.query_lens = attn_metadata.query_start_loc[1:] - attn_metadata.query_start_loc[:-1] + self.blend_req_metas.blend_start_req_idx = len(scheduler_output.scheduled_cached_reqs.req_ids) + + for idx, req in enumerate(scheduler_output.kv_connector_metadata.requests): + self.blend_req_metas.add_request(idx, req.rag_chunks_load_meta, attn_metadata.seq_lens, self.block_size) + + return self.blend_req_metas + + def update_attn_metadata(self): + # update attn_metadata, cause we sparse the prefill tokens + self.attn_metadata.slot_mapping = self.attn_metadata.slot_mapping[self.blend_req_metas.compute_mask] + self.attn_metadata.query_start_loc[1:] = torch.cumsum(self.blend_req_metas.query_lens, dim=0) + self.attn_metadata.max_query_len = self.blend_req_metas.query_lens.max().item() + self.attn_metadata.num_actual_tokens = self.blend_req_metas.query_lens.sum().item() + + def estimate_num_slots_sparsed(self, request: Request) -> int: + """ + This is called by "Scheduler->schedule" function to estimate the number of required blocks. + """ + return INVALID_SLOT + + def request_begin(self, request_id: Union[int, str], prompt_token_ids: List[int]): + pass + + def attention_begin( + self, + query: torch.Tensor, + key: torch.Tensor, + value: torch.Tensor, + layer_name: str, + forward_context: ForwardContext, + output: Optional[torch.Tensor] = None, + phase: Optional[str] = None, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: + attn = forward_context.no_compile_layers[layer_name] + kv_cache = attn.kv_cache[forward_context.virtual_engine] + start_time = time.perf_counter() + if layer_name in self.compute_meta.keys(): + need_update = False + self.blend_req_metas.reset_compute_mask() + + # maybe we can use triton kernel + for req_meta in self.blend_req_metas.requests: + req_idx = req_meta.req_idx + req_query_start = self.attn_metadata.query_start_loc[req_idx].item() + req_query_end = self.attn_metadata.query_start_loc[req_idx + 1].item() + + if not req_meta.need_blend: + self.blend_req_metas.compute_mask[req_query_start:req_query_end].fill_(True) + continue + req_chunk_end = req_query_start + req_meta.chunks_len + + # HBM prefix cache is not supported now + # UC store prefix cache can be fully reused + his_vllm_blk_ids = self.attn_metadata.block_table[req_idx][req_meta.prefix_blk_len: + req_meta.prefix_blk_len + req_meta.chunks_blk_len] + # only compute topk of chunk's hits block + chunk_hit_mask = self.blend_req_metas.chunk_blks_hit_mask[:len(req_meta.chunk_hit_mask)] + src = torch.as_tensor(req_meta.chunk_hit_mask, dtype=chunk_hit_mask.dtype, device=chunk_hit_mask.device) + chunk_hit_mask.copy_(src) + + his_vllm_blk_ids = his_vllm_blk_ids[chunk_hit_mask] + his_k = kv_cache[0, his_vllm_blk_ids] + candidate_len = req_meta.chunk_hit_blk_len * self.block_size + his_k = his_k.reshape(candidate_len, -1) + + req_key = key[req_query_start:req_chunk_end] + + # req_key does not contain prefix cache + golden_k = req_key.reshape(req_meta.chunks_blk_len, self.block_size, -1)[chunk_hit_mask] + golden_k = golden_k.reshape(candidate_len, -1) + + diff_k = torch.sum((his_k - golden_k).abs(), dim=[1]) + topK_num = int(candidate_len * self.compute_meta[layer_name]["ratio"]) + + topK_indices = torch.topk(diff_k, k=topK_num).indices + + # get origin idx in req_key + topK_indices = self.mask_idx[:req_meta.chunks_blk_len][chunk_hit_mask].reshape(-1)[topK_indices] + + # update compute_mask + self.blend_req_metas.update_req_compute_mask(req_query_start, req_chunk_end, req_query_end, + chunk_hit_mask, topK_indices) + + self.blend_req_metas.query_lens[req_idx] -= (candidate_len - topK_num) + need_update = True + + if need_update: + logger.info(f"[blend] compute_mask time: {(time.perf_counter() - start_time) * 1000}ms") + self.blend_req_metas.need_re_index = True + self.update_attn_metadata() + + indexed_query = query[self.blend_req_metas.compute_mask] + indexed_key = key[self.blend_req_metas.compute_mask] + indexed_value = value[self.blend_req_metas.compute_mask] + indexed_output = None + if output is not None: + indexed_output = output[:self.blend_req_metas.compute_mask.sum()] + logger.info(f"[blend] compute_mask time + index time: {(time.perf_counter() - start_time) * 1000}ms") + return indexed_query, indexed_key, indexed_value, indexed_output + return query, key, value, output + + def attention_end(self, output: torch.Tensor, layer_name: str): + if layer_name in self.compute_meta.keys() and self.blend_req_metas.need_re_index: + return output[:self.attn_metadata.num_actual_tokens] + return output + + def self_attention_finished( + self, + residual: torch.Tensor, + hidden_states: torch.Tensor + ) -> torch.Tensor: + if len(residual) != len(hidden_states): + return self.index_tensor(residual) + return residual + + def layer_finished( + self, + positions: torch.Tensor, + hidden_states: torch.Tensor + ) -> torch.Tensor: + if len(positions) != len(hidden_states): + return self.index_tensor(positions) + return positions + + def execute_finished(self, logits_indices :torch.Tensor): + if self.blend_req_metas.need_re_index: + return self.attn_metadata.query_start_loc[1:] - 1 + return logits_indices + + def index_tensor(self, tensor: torch.Tensor): + if self.blend_req_metas.need_re_index: + return tensor[self.blend_req_metas.compute_mask] + return tensor diff --git a/ucm/sparse/blend/blockwise_rope.py b/ucm/sparse/blend/blockwise_rope.py new file mode 100644 index 00000000..ef6370bc --- /dev/null +++ b/ucm/sparse/blend/blockwise_rope.py @@ -0,0 +1,135 @@ +import torch +import triton +import triton.language as tl + + +@triton.jit +def _triton_rope_blockwise_kernel( + k_ptr, # (total_blocks, seq_len, n_kv_head, hd) + vllm_ids, # (bs,) block id for each batch + positions, # (bs,) delta angle for each batch + cos_sin_cache, # (1, seq_len, hd) + k_row_stride, + k_head_stride, + cos_sin_row_stride, + sl, + bs: tl.constexpr, + n_kh: tl.constexpr, + hd: tl.constexpr, + pad_hd: tl.constexpr, +): + """ + each program/batch process a single head of kcache (batch_idx, seq_idx, head_idx) + """ + pid = tl.program_id(0) + + heads_per_seq = n_kh + tokens_per_batch = sl * n_kh + batch_idx = pid // tokens_per_batch + seq_head_idx = pid % tokens_per_batch + seq_idx = seq_head_idx // n_kh + head_idx = seq_head_idx % n_kh + + # block id & position + block_id = tl.load(vllm_ids + batch_idx) + pos_idx = tl.load(positions + batch_idx) + + # k offset + k_offset = block_id * k_row_stride + seq_idx * (n_kh * hd) + head_idx * hd + k_ptr = k_ptr + k_offset + + # fetch cos sin from cos_sin_cache + cos_base = pos_idx * cos_sin_row_stride + sin_base = cos_base + hd // 2 # sin just behind cos + + offs = tl.arange(0, pad_hd // 2) + mask = offs < hd // 2 + + cos_row = tl.load(cos_sin_cache + cos_base + offs, mask=mask, other=0) + sin_row = tl.load(cos_sin_cache + sin_base + offs, mask=mask, other=0) + + k_tile_1 = tl.load(k_ptr + offs, mask=mask, other=0).to(cos_row.dtype) + k_tile_2 = tl.load(k_ptr + offs + hd // 2, mask=mask, other=0).to(cos_row.dtype) + + new_k_tile_1 = k_tile_1 * cos_row - k_tile_2 * sin_row + new_k_tile_2 = k_tile_2 * cos_row + k_tile_1 * sin_row + + tl.store(k_ptr + offs, new_k_tile_1, mask=mask) + tl.store(k_ptr + offs + hd // 2, new_k_tile_2, mask=mask) + + +def block_wise_rope_forward(k_cache, vllm_ids, positions, cos_sin_cache): + """ + Args: + k_cache: torch.Tensor (total_blocks, seq_len, n_kv_heads, hd), vllm owned. + vllm_ids: torch.LongTensor (batch_size,), vllm block id + positions: torch.LongTensor (batch_size,), delta angle of each block for rope + cos_sin_cache: torch.Tensor (1, seq_len, hd),same as the tensor in rotary_emb + """ + total_blocks, seq_len, n_kv_head, head_dim = k_cache.shape + batch_size = vllm_ids.shape[0] + pad_hd = triton.next_power_of_2(head_dim) + + k_cache = k_cache.contiguous() + vllm_ids = vllm_ids.contiguous() + positions = positions.contiguous() + cos_sin_cache = cos_sin_cache.contiguous() + + n_row = batch_size * seq_len * n_kv_head + + _triton_rope_blockwise_kernel[(n_row,)]( + k_cache, + vllm_ids, + positions, + cos_sin_cache, + k_cache.stride(0), + k_cache.stride(-2), + cos_sin_cache.stride(-2), + seq_len, + batch_size, + n_kv_head, + head_dim, + pad_hd, + ) + + return k_cache + +if __name__ == "__main__": + # just prepare the dumped_tensors from real setting + import time + + base_dir = "/vllm-workspace/dumped_tensors/kvcache" + kcacche = torch.load(f"{base_dir}/kcache.pt") + cos_sin_cache = torch.load(f"{base_dir}/cos_sin_cache.pt") + positions = torch.load(f"{base_dir}/positions.pt") + vllm_ids = torch.load(f"{base_dir}/vllm_ids.pt") + baseline_rope_kcache = torch.load(f"{base_dir}/baseline_rope_kcache.pt") + device = "cuda" + torch.manual_seed(42) + + block_wise_rope_forward(kcacche, vllm_ids, positions, cos_sin_cache) + # precision compare + diff = (kcacche - baseline_rope_kcache).abs() + max_err = diff.max().item() + mean_err = diff.mean().item() + print(f"MAE : {mean_err:.6f}, max diff: {max_err:.6f}") + + def bench(fn, n_iter=50): + torch.cuda.synchronize() + t0 = time.time() + for _ in range(n_iter): + fn() + torch.cuda.synchronize() + dt = (time.time() - t0) / n_iter + return dt * 1e3 # ms + + ms = bench(lambda: block_wise_rope_forward(kcacche, vllm_ids, positions, cos_sin_cache)) + print(f"kernel avg latency: {ms:.3f} ms") + bs = vllm_ids.shape[0] + _, sl, nh, hd = kcacche.shape + # load K,load cos,sin -> dump K + bytes_total =( bs * sl * nh * hd * 2 * kcacche.dtype.itemsize # K load dump + + bs * positions.dtype.itemsize # positions load + + bs * hd * cos_sin_cache.dtype.itemsize) # cos_sin_cache load + bw = bytes_total / (ms / 1e3) / (1024**3) + print(f"Estimated memory BW: {bw:.1f} GiB/s") \ No newline at end of file diff --git a/ucm/sparse/blend/chunk_processor.py b/ucm/sparse/blend/chunk_processor.py new file mode 100644 index 00000000..e07f45c2 --- /dev/null +++ b/ucm/sparse/blend/chunk_processor.py @@ -0,0 +1,205 @@ +from dataclasses import dataclass, field + + +from ucm.logger import init_logger +from typing import List, Optional, Tuple, Any + +from ucm.sparse.blend.blockwise_rope import block_wise_rope_forward +from ucm.sparse.blend.utils import get_rotary_emb_ops + +logger = init_logger(__name__) + +import itertools + +@dataclass +class ChunkMetaData: + # [start, start + len) + start_idx_in_req: int + chunk_tokens_len: int + + start_idx_in_req_blks: int + chunk_blks_len: int + + cached_start_position: int + + vllm_blk_ids: List[int] = field(default_factory=list) + chunk_blks_hash : List[str] = field(default_factory=list) + store_hits: List[bool] = field(default_factory=list) + + @property + def end_idx_in_req(self) -> int: + return self.start_idx_in_req + self.chunk_tokens_len + + @property + def end_idx_in_req_blks(self) -> int: + return self.start_idx_in_req_blks + self.chunk_blks_len + + @property + def cached_end_position(self) -> int: + return self.cached_start_position + self.chunk_tokens_len + + @property + def position_offset(self) -> int: + return self.start_idx_in_req - self.cached_start_position + + @property + def hits_vllm_blk_ids(self) -> List[int]: + return list(itertools.compress(self.vllm_blk_ids, self.store_hits)) + + @property + def hits_chunk_blks_hash(self) -> List[str]: + return list(itertools.compress(self.chunk_blks_hash, self.store_hits)) + + + +def hash_token_ids( + hash_function: Any, block_size: int, token_ids: list[int], parent_block_hash_value = None + ) -> List[str]: + """ + process token_ids into prefix blk hashes with parent_block_hash_value + """ + ret = [] + + if not parent_block_hash_value: + parent_block_hash_value = hash_function("UCMHASHSEED") + + for start in range(0, len(token_ids), block_size): + end = start + block_size + block_token_ids = token_ids[start:end] + # Do not hash the block if it is not full. + if len(block_token_ids) < block_size: + break + + block_token_ids_tuple = tuple(block_token_ids) + hash_value = hash_function( + (parent_block_hash_value, block_token_ids_tuple) + ) + parent_block_hash_value = str(hash_value) + ret.append(parent_block_hash_value) + + return ret + + +class ChunkProcessor(): + """ + """ + + def __init__(self, config): + self.block_size = config['block_size'] + + self.chunk_end_token_id = config['chunk_end_token_id'] + self.hash_function = None + self.rotary_emb: Optional[callable] = None + + def update_meta4_partial_pc(self, rag_chunk_meta: ChunkMetaData, num_pc_part_blks: int) -> None: + rag_chunk_meta.start_idx_in_req += num_pc_part_blks * self.block_size + rag_chunk_meta.chunk_tokens_len -= num_pc_part_blks * self.block_size + + rag_chunk_meta.start_idx_in_req_blks += num_pc_part_blks + rag_chunk_meta.chunk_blks_len -= num_pc_part_blks + + rag_chunk_meta.chunk_blks_hash = rag_chunk_meta.chunk_blks_hash[num_pc_part_blks:] + rag_chunk_meta.cached_start_position += num_pc_part_blks * self.block_size + + def get_stage(self,all_prefill_tokens): + build_cache = True + if all_prefill_tokens[-1] == self.chunk_end_token_id and len(all_prefill_tokens) % self.block_size == 0: + return build_cache + return not build_cache + + def process_request( + self, + request, + hash_function, + rag_start_blk_idx, + ) -> Tuple[List[ChunkMetaData], bool] : + """ + Process the request to split prompt tokens into RAG chunks and construct metadata. + + Args: + request: Request object containing prompt_token_ids. + hash_function: Function used to compute prefix hash in each rag chunk, should be in line with kv store. + rag_start_blk_idx: Start idx of vllm blocks where prefix cache matches end. + + Returns: + Tuple of: + - rag_chunks_meta: List of ChunkMetaData parsed from req. + - is_build_cache: whether current req is in build cache stage . + """ + if self.hash_function is None: + self.hash_function = hash_function + all_prefill_tokens = request.prompt_token_ids + rag_chunks_meta: List[ChunkMetaData] = [] + + # first judge current req is whether in build chunk cache stage or in use chunk cache stage + # for future work, this two stage should not be exposed to user + is_build_cache = self.get_stage(all_prefill_tokens) + if is_build_cache: + chunk_blks_hash = hash_token_ids(hash_function, self.block_size, all_prefill_tokens) + chunk_tokens_len = len(all_prefill_tokens) + chunk_blks_len = len(all_prefill_tokens) // self.block_size + + rag_chunk_meta = ChunkMetaData( + start_idx_in_req=0, + chunk_tokens_len=chunk_tokens_len, + start_idx_in_req_blks=0, + chunk_blks_len=chunk_blks_len, + chunk_blks_hash=chunk_blks_hash, + cached_start_position=0) + return [rag_chunk_meta] , is_build_cache + + start_blk_idx = 0 + start_token_dix = 0 + for end_blk_idx, end_token_idx in enumerate(range(self.block_size - 1, len(all_prefill_tokens), self.block_size)): + # only compare the last token id in each blk + if all_prefill_tokens[end_token_idx] == self.chunk_end_token_id: + chunk_token_ids = all_prefill_tokens[start_token_dix: end_token_idx + 1] + chunk_blks_hash = hash_token_ids(hash_function, self.block_size, chunk_token_ids) + chunk_blks_len = end_blk_idx - start_blk_idx + 1 + chunk_tokens_len = chunk_blks_len * self.block_size + + rag_chunk_meta = ChunkMetaData( + start_idx_in_req=start_token_dix, + chunk_tokens_len=chunk_tokens_len, + start_idx_in_req_blks=start_blk_idx, + chunk_blks_len=chunk_blks_len, + chunk_blks_hash = chunk_blks_hash, + cached_start_position=0) + + # update for next rag chunk + start_blk_idx = end_blk_idx + 1 + start_token_dix = end_token_idx + 1 + + if rag_chunk_meta.end_idx_in_req_blks <= rag_start_blk_idx: + # current rag chunk is fully in prefix cache, no need to process + continue + + # rag chunk is partly in prefix cache + num_pc_part_blks = 0 + if rag_chunk_meta.start_idx_in_req_blks < rag_start_blk_idx: + num_pc_part_blks = rag_start_blk_idx - rag_chunk_meta.start_idx_in_req_blks + # blend stage, rag chunk in PC is no need to recompute + self.update_meta4_partial_pc(rag_chunk_meta, num_pc_part_blks) + rag_chunks_meta.append(rag_chunk_meta) + + return rag_chunks_meta, is_build_cache + + def merge_chunks(self, old_chunk_meta: ChunkMetaData, chunk_meta:ChunkMetaData): + # current we use a fix pattern(end with a fix token id) to recognize the text token chunk + # in some special situation, one text chunk maybe split as multi text chunk, so we should merge them into one + old_chunk_meta.chunk_tokens_len += chunk_meta.chunk_tokens_len + old_chunk_meta.chunk_blks_len += chunk_meta.chunk_blks_len + old_chunk_meta.chunk_blks_hash += chunk_meta.chunk_blks_hash + old_chunk_meta.store_hits += chunk_meta.store_hits + + def setup_rotary_emb(self, model): + self.rotary_emb = get_rotary_emb_ops(model) + + def process_chunk_cache(self, k_cache, vllm_ids, positions): + """ + post process loaded chunk kcache + """ + if self.rotary_emb is None: + raise "Please call setup_rotary_emb first." + # triton kernl for block-wise delta rope + block_wise_rope_forward(k_cache, vllm_ids, positions, self.rotary_emb.cos_sin_cache) \ No newline at end of file diff --git a/ucm/sparse/blend/utils.py b/ucm/sparse/blend/utils.py new file mode 100644 index 00000000..3f411f9b --- /dev/null +++ b/ucm/sparse/blend/utils.py @@ -0,0 +1,28 @@ +from vllm.model_executor.models.qwen2 import Qwen2ForCausalLM +from vllm.model_executor.models.llama import LlamaForCausalLM + + +def get_rotary_emb_ops(model): + if isinstance(model, Qwen2ForCausalLM): + return model.model.layers[0].self_attn.rotary_emb + if isinstance(model, LlamaForCausalLM): + return model.model.layers[0].self_attn.rotary_emb + else: + raise "get model rotary emb failed! current not implemented for this model" + + +import time +import functools +from ucm.logger import init_logger + +logger = init_logger(__name__) + +def timeit(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed = time.perf_counter() - start + logger.info(f"{func.__name__} exec time: {elapsed:.6f}s") + return result + return wrapper diff --git a/ucm/sparse/esa/esa.py b/ucm/sparse/esa/esa.py index d8316cc6..0bbfee86 100644 --- a/ucm/sparse/esa/esa.py +++ b/ucm/sparse/esa/esa.py @@ -4,7 +4,7 @@ from collections import defaultdict from dataclasses import dataclass from functools import cache -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Tuple import numpy as np import torch @@ -564,8 +564,9 @@ def attention_begin( value: torch.Tensor, layer_name: str, forward_context: ForwardContext, + output: Optional[torch.Tensor] = None, phase: Optional[str] = None, - ) -> None: + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: if not self.is_mla: for req_meta in self._sparse_metadata.requests: self.create_req_state_attention_begin( @@ -583,6 +584,8 @@ def attention_begin( req_meta, layer_name, query, key, value, forward_context ) + return query, key, value, output + def update_req_state_attention_end( self, req_meta, layer_name, query, key, value, attn_output, forward_context ): diff --git a/ucm/sparse/factory.py b/ucm/sparse/factory.py index cb1b43ae..aafffc26 100644 --- a/ucm/sparse/factory.py +++ b/ucm/sparse/factory.py @@ -50,3 +50,5 @@ def create_sparse_method( UcmSparseFactory.register_sparse_method( "KVStarMultiStep", "ucm.sparse.kvstar.multistep", "KVStarMultiStep" ) +UcmSparseFactory.register_sparse_method("Blend", "ucm.sparse.blend.blend", "Blend") + diff --git a/ucm/sparse/gsa/gsa.py b/ucm/sparse/gsa/gsa.py index a0b430f0..7f370fb4 100644 --- a/ucm/sparse/gsa/gsa.py +++ b/ucm/sparse/gsa/gsa.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from functools import cache, wraps from itertools import accumulate -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Tuple import torch from vllm.config import VllmConfig @@ -586,8 +586,9 @@ def attention_begin( value: torch.Tensor, layer_name: str, forward_context: ForwardContext, + output: Optional[torch.Tensor] = None, phase: Optional[str] = None, - ) -> None: + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: current_layer_id = int(layer_name.split(".")[2]) if self.prefetch_engine.atb_gsa_enable and self.prefetch_engine.is_topk_cal: if not self.use_mla: @@ -638,6 +639,8 @@ def attention_begin( ] ) + return query, key, value, output + def attention_finished( self, query: torch.Tensor, @@ -856,7 +859,7 @@ def execute_begin(self, scheduler_output: SchedulerOutput): self.gsa_stats = self.gsa_metadata.gsa_stats self._start_topk_cal() - def execute_finished(self): + def execute_finished(self, logits_indices :torch.Tensor): kv_caches = [None] * self.layer_num forward_context = get_forward_context() attn = forward_context.no_compile_layers @@ -873,6 +876,8 @@ def execute_finished(self): else: self.prefetch_engine.deal_async_prefetch(self.gsa_metadata, kv_caches, None) + return logits_indices + def build_sparse_meta( self, scheduler_output: SchedulerOutput, requests, input_batch, attn_metadata ) -> None: diff --git a/ucm/sparse/kvstar/multistep.py b/ucm/sparse/kvstar/multistep.py index 39e52fba..15f6dbfb 100644 --- a/ucm/sparse/kvstar/multistep.py +++ b/ucm/sparse/kvstar/multistep.py @@ -1,7 +1,7 @@ import enum import math from dataclasses import dataclass, field -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Tuple import torch from vllm.config import VllmConfig @@ -726,7 +726,9 @@ def attention_begin( value: torch.Tensor, layer_name: str, forward_context: ForwardContext, - ) -> None: + output: Optional[torch.Tensor] = None, + phase: Optional[str] = None, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """ This is called at the beginning of "unified_attention". Sparse attention algorithm can modify forward_context.attn_metadata if necessary. @@ -740,6 +742,8 @@ def attention_begin( req_layerwise_state.update_meta(req_meta, forward_context) req_layerwise_state.attention_begin(query, key, value, forward_context) + return query, key, value, output + def attention_finished( self, query: torch.Tensor,